From b8cba528b52a80dd95ae6d951968b68c00fcaadd Mon Sep 17 00:00:00 2001 From: DevilXD Date: Wed, 3 Aug 2022 22:16:57 +0200 Subject: [PATCH] Reinforce websocket logic; so that it doesn't change to disconnected after a long no-internet period --- twitch.py | 2 ++ websocket.py | 50 ++++++++++++++++++++++++-------------------------- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/twitch.py b/twitch.py index 4740ae2..1204861 100644 --- a/twitch.py +++ b/twitch.py @@ -203,6 +203,8 @@ class Twitch: # clear the flag and wait until it's set again self._state_change.clear() elif self._state is State.INVENTORY_FETCH: + # ensure the websocket is running + await self.websocket.start() # NOTE: maintenance task is restarted on inventory fetch if self._mnt_task is not None and not self._mnt_task.done(): self._mnt_task.cancel() diff --git a/websocket.py b/websocket.py index dd734e8..872091e 100644 --- a/websocket.py +++ b/websocket.py @@ -33,11 +33,13 @@ class Websocket: self._pool: WebsocketPool = pool self._twitch: Twitch = pool._twitch self._ws_gui: WebsocketStatus = self._twitch.gui.websockets + self._state_lock = asyncio.Lock() # websocket index self._idx: int = index # current websocket connection self._ws: AwaitableValue[aiohttp.ClientWebSocketResponse] = AwaitableValue() - # set when the websocket needs to reconnect + # set when the websocket needs to be closed or reconnect + self._closed = asyncio.Event() self._reconnect_requested = asyncio.Event() # set when the topics changed self._topics_changed = asyncio.Event() @@ -69,16 +71,11 @@ class Websocket: self._next_ping = time() self._reconnect_requested.set() - async def close(self): - ws = self._ws.get_with_default(None) - if ws is not None: - self.set_status("Disconnecting...") - await ws.close() - async def start(self): - if not self.connected: - self.start_nowait() - await self.wait_until_connected() + async with self._state_lock: + if not self.connected: + self.start_nowait() + await self.wait_until_connected() def start_nowait(self): if not self.connected: @@ -88,18 +85,21 @@ class Websocket: ws_logger.error(f"Detected zombified handle task: {self._handle_task!r}") async def stop(self): - await self.close() - if self._handle_task is not None: - # this raises back any stray exceptions - await self._handle_task - self._handle_task = None + async with self._state_lock: + if self._closed.is_set(): + return + self._closed.set() + ws = self._ws.get_with_default(None) + if ws is not None: + self.set_status("Disconnecting...") + await ws.close() def stop_nowait(self): # weird syntax but that's what we get for using a decorator for this # return type of 'task_wrapper' is a coro, so we need to instance it for the task - asyncio.create_task(task_wrapper(self.close)()) + asyncio.create_task(task_wrapper(self.stop)()) - def remove(self): + def stop_and_remove(self): # this stops the websocket, and then removes it from the gui list @task_wrapper async def remover(): @@ -121,9 +121,10 @@ class Websocket: async with session.ws_connect(ws_url, ssl=True, proxy=proxy) as websocket: yield websocket backoff.reset() - except aiohttp.ClientConnectionError: + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): ws_logger.info( - f"Websocket[{self._idx}] connection error (sleep: {delay:.3}s)", exc_info=True + f"Websocket[{self._idx}] connection problem (sleep: {delay:.3}s)", + exc_info=True, ) await asyncio.sleep(delay) except RuntimeError: @@ -138,8 +139,9 @@ class Websocket: # ensure we're logged in before connecting self.set_status("Initializing...") await self._twitch.wait_until_login() - ws_logger.info(f"Websocket[{self._idx}] connecting...") self.set_status("Connecting...") + ws_logger.info(f"Websocket[{self._idx}] connecting...") + self._closed.clear() # Connect/Reconnect loop async for websocket in self._backoff_connect( "wss://pubsub-edge.twitch.tv/v1", maximum=3*60 # 3 minutes maximum backoff time @@ -167,7 +169,7 @@ class Websocket: ws_logger.warning( f"Websocket[{self._idx}] closed unexpectedly: {websocket.close_code}" ) - else: + elif self._closed.is_set(): # we closed it - exit ws_logger.info(f"Websocket[{self._idx}] stopped.") self.set_status("Disconnected") @@ -328,14 +330,10 @@ class WebsocketPool: async def start(self): await self._twitch.wait_until_login() - if self.running: - return self._running.set() await asyncio.gather(*(ws.start() for ws in self.websockets)) async def stop(self): - if not self.running: - return self._running.clear() await asyncio.gather(*(ws.stop() for ws in self.websockets)) @@ -382,7 +380,7 @@ class WebsocketPool: if count <= (len(self.websockets) - 1) * WS_TOPICS_LIMIT: ws = self.websockets.pop() recycled_topics.extend(ws.topics.values()) - ws.remove() + ws.stop_and_remove() else: break if recycled_topics: