import asyncio import websockets from threading import Thread class MultiplayerService(): _thread = None @classmethod def init(cls): thread = Thread(target=asyncio.get_event_loop().run_until_complete,\ args=(_NetworkConnectionService.init(),)) thread.start() @classmethod def quit(cls): _NetworkConnectionService.close_connection() class _NetworkConnectionService(): _websocket = None _URI = "ws://webapi.tetri5.com" _is_closed = False @classmethod async def init(cls): await cls._connect_to_server() await cls._run_network_loop() @classmethod def close_connection(cls): cls._is_closed = True @classmethod async def _run_network_loop(cls): while True: await asyncio.sleep(16e-3) await cls._websocket.send("ping") print(await cls._websocket.recv()) if cls._is_closed: await cls._websocket.close() break @classmethod async def _connect_to_server(cls): print("Connecting to server...") cls._websocket = await websockets.connect(cls._URI, ping_interval=None) # ping_interval=None is important, otherwise the server will disconnect us # https://stackoverflow.com/a/58993145/11512104 print("Connected to server...")