import asyncio import websockets import json import queue # refer to https://docs.python.org/3/library/queue.html for cross threading communication import uuid from typing import Dict from threading import Thread class MultiplayerService(): _thread = None _receive_piece_queue = queue.Queue() _send_piece_queue = queue.Queue() _current_game_id = None _client_id = str(uuid.uuid4()) @classmethod def init(cls) -> None: thread = Thread(target=asyncio.get_event_loop().run_until_complete,\ args=(_NetworkConnectionService.init(),)) thread.start() @classmethod def enter_game(cls, game_id: str) -> None: cls._current_game_id = game_id _NetworkConnectionService._join_game = True # TODO: change this to a function @classmethod def send_piece(cls, piece: "PieceDto") -> None: cls._send_piece_queue.put(piece) @classmethod def try_receive_piece(cls) -> "PieceDto": result = cls._receive_piece_queue.get() cls._receive_piece_queue.task_done() return result @classmethod def quit(cls) -> None: _NetworkConnectionService.close_connection() class _NetworkConnectionService(): _URI = "ws://localhost:5001" # TODO get from config file _websocket = None _is_closed = False _join_game = False _pending_receive_task = None @classmethod async def init(cls) -> None: await cls._connect_to_server() await cls._run_network_loop() @classmethod def close_connection(cls) -> None: cls._is_closed = True @classmethod async def _run_network_loop(cls) -> None: while True: await asyncio.sleep(16e-3) # TODO add clock tick instead await cls._try_enter_game() await cls._try_send_piece() await cls._try_receive_message() # if conection is closed, exit loop if cls._is_closed: await cls._websocket.close() break @classmethod async def _try_enter_game(cls) -> None: if not cls._join_game: return json_message = json.dumps({"action": "enter_game",\ "clientId": MultiplayerService._client_id,\ "gameId": MultiplayerService._current_game_id}) await cls._websocket.send(json_message) cls._join_game = False @classmethod async def _try_send_piece(cls) -> None: # if no messages to proccess, return if MultiplayerService._send_piece_queue.empty(): return # get next piece to send and send to server piece = MultiplayerService._send_piece_queue.get() # construct json message json_message = json.dumps({"action": "send_piece",\ "clientId": MultiplayerService._client_id,\ "gameId": MultiplayerService._current_game_id,\ "piece": piece.__dict__}) await cls._websocket.send(json_message) MultiplayerService._send_piece_queue.task_done() @classmethod async def _try_receive_message(cls) -> None: try: task = cls._pending_receive_task if cls._pending_receive_task else asyncio.create_task(cls._websocket.recv()) done, pending = await asyncio.wait({task}, timeout=8e-3) # TODO experiment with the timeout if task in done: data = json.loads(await task) print(data) if data["type"] == "wait_for_opponent": print("Wait for my opponent!") if data["type"] == "start_game": print("Start the game!") if data["type"] == "receive_piece": print("Receive a piece!") MultiplayerService._receive_piece_queue.put(PieceDto.create(data["piece"])) if data["type"] == "exit_game": print("Exit the game!") cls.close_connection() cls._pending_receive_task = None elif len(pending): cls._pending_receive_task = pending.pop() finally: pass # TODO handle connection closed exception @classmethod async def _connect_to_server(cls) -> None: print("Connecting to server...") # TODO replace with logging cls._websocket = await websockets.connect(cls._URI, ping_interval=None) # ping_interval=None is important, otherwise the server will disconnect us # https://stackoverflow.com/a/58993145/11512104 print("Connected to server...") # TODO replace with logging # DTOs class PieceDto(): def __init__(self, x: int, y: int, type_: str) -> None: self.x = x self.y = y self.type = type_ pass @staticmethod def create(data: Dict) -> "PieceDto": return PieceDto(data["x"], data["y"], data["type"])