Reinforce websocket logic;

so that it doesn't change to disconnected after a long no-internet period
This commit is contained in:
DevilXD
2022-08-03 22:16:57 +02:00
parent 39d45360e6
commit b8cba528b5
2 changed files with 26 additions and 26 deletions

View File

@@ -203,6 +203,8 @@ class Twitch:
# clear the flag and wait until it's set again
self._state_change.clear()
elif self._state is State.INVENTORY_FETCH:
# ensure the websocket is running
await self.websocket.start()
# NOTE: maintenance task is restarted on inventory fetch
if self._mnt_task is not None and not self._mnt_task.done():
self._mnt_task.cancel()

View File

@@ -33,11 +33,13 @@ class Websocket:
self._pool: WebsocketPool = pool
self._twitch: Twitch = pool._twitch
self._ws_gui: WebsocketStatus = self._twitch.gui.websockets
self._state_lock = asyncio.Lock()
# websocket index
self._idx: int = index
# current websocket connection
self._ws: AwaitableValue[aiohttp.ClientWebSocketResponse] = AwaitableValue()
# set when the websocket needs to reconnect
# set when the websocket needs to be closed or reconnect
self._closed = asyncio.Event()
self._reconnect_requested = asyncio.Event()
# set when the topics changed
self._topics_changed = asyncio.Event()
@@ -69,16 +71,11 @@ class Websocket:
self._next_ping = time()
self._reconnect_requested.set()
async def close(self):
ws = self._ws.get_with_default(None)
if ws is not None:
self.set_status("Disconnecting...")
await ws.close()
async def start(self):
if not self.connected:
self.start_nowait()
await self.wait_until_connected()
async with self._state_lock:
if not self.connected:
self.start_nowait()
await self.wait_until_connected()
def start_nowait(self):
if not self.connected:
@@ -88,18 +85,21 @@ class Websocket:
ws_logger.error(f"Detected zombified handle task: {self._handle_task!r}")
async def stop(self):
await self.close()
if self._handle_task is not None:
# this raises back any stray exceptions
await self._handle_task
self._handle_task = None
async with self._state_lock:
if self._closed.is_set():
return
self._closed.set()
ws = self._ws.get_with_default(None)
if ws is not None:
self.set_status("Disconnecting...")
await ws.close()
def stop_nowait(self):
# weird syntax but that's what we get for using a decorator for this
# return type of 'task_wrapper' is a coro, so we need to instance it for the task
asyncio.create_task(task_wrapper(self.close)())
asyncio.create_task(task_wrapper(self.stop)())
def remove(self):
def stop_and_remove(self):
# this stops the websocket, and then removes it from the gui list
@task_wrapper
async def remover():
@@ -121,9 +121,10 @@ class Websocket:
async with session.ws_connect(ws_url, ssl=True, proxy=proxy) as websocket:
yield websocket
backoff.reset()
except aiohttp.ClientConnectionError:
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
ws_logger.info(
f"Websocket[{self._idx}] connection error (sleep: {delay:.3}s)", exc_info=True
f"Websocket[{self._idx}] connection problem (sleep: {delay:.3}s)",
exc_info=True,
)
await asyncio.sleep(delay)
except RuntimeError:
@@ -138,8 +139,9 @@ class Websocket:
# ensure we're logged in before connecting
self.set_status("Initializing...")
await self._twitch.wait_until_login()
ws_logger.info(f"Websocket[{self._idx}] connecting...")
self.set_status("Connecting...")
ws_logger.info(f"Websocket[{self._idx}] connecting...")
self._closed.clear()
# Connect/Reconnect loop
async for websocket in self._backoff_connect(
"wss://pubsub-edge.twitch.tv/v1", maximum=3*60 # 3 minutes maximum backoff time
@@ -167,7 +169,7 @@ class Websocket:
ws_logger.warning(
f"Websocket[{self._idx}] closed unexpectedly: {websocket.close_code}"
)
else:
elif self._closed.is_set():
# we closed it - exit
ws_logger.info(f"Websocket[{self._idx}] stopped.")
self.set_status("Disconnected")
@@ -328,14 +330,10 @@ class WebsocketPool:
async def start(self):
await self._twitch.wait_until_login()
if self.running:
return
self._running.set()
await asyncio.gather(*(ws.start() for ws in self.websockets))
async def stop(self):
if not self.running:
return
self._running.clear()
await asyncio.gather(*(ws.stop() for ws in self.websockets))
@@ -382,7 +380,7 @@ class WebsocketPool:
if count <= (len(self.websockets) - 1) * WS_TOPICS_LIMIT:
ws = self.websockets.pop()
recycled_topics.extend(ws.topics.values())
ws.remove()
ws.stop_and_remove()
else:
break
if recycled_topics: