diff --git a/.vscode/launch.json b/.vscode/launch.json index 2ec95b5..4ac33b4 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -16,7 +16,7 @@ "type": "python", "request": "launch", "program": "main.py", - "args": ["--no-run-check", "-vv"], + "args": ["--no-run-check", "--idle", "-vv"], "console": "integratedTerminal", "justMyCode": false }, @@ -25,7 +25,7 @@ "type": "python", "request": "launch", "program": "main.py", - "args": ["--no-run-check", "-vvv"], + "args": ["--no-run-check", "--idle", "-vvv"], "console": "integratedTerminal", "justMyCode": false }, @@ -34,7 +34,7 @@ "type": "python", "request": "launch", "program": "main.py", - "args": ["--no-run-check", "-vvv", "--debug-ws"], + "args": ["--no-run-check", "--idle", "-vvv", "--debug-ws"], "console": "integratedTerminal", "justMyCode": false }, diff --git a/channel.py b/channel.py index 1f30d2b..55f436c 100644 --- a/channel.py +++ b/channel.py @@ -312,7 +312,7 @@ class Channel: self.display() @cached_property - def _payload(self): + def _payload(self) -> JsonType: assert self._stream is not None payload = [ { diff --git a/gui.py b/gui.py index a5c572e..aebfa74 100644 --- a/gui.py +++ b/gui.py @@ -663,7 +663,9 @@ class ChannelList: def set_watching(self, channel: Channel): self.clear_watching() - self._table.item(channel.iid, tags="watching") + iid = channel.iid + self._table.item(iid, tags="watching") + self._table.see(iid) def get_selection(self) -> Optional[Channel]: if not self._channel_map: @@ -676,6 +678,10 @@ class ChannelList: def clear_selection(self): self._table.selection_set('') + def clear(self): + iids = self._table.get_children() + self._table.delete(*iids) + def display(self, channel: Channel, *, add: bool = False): # priority priority = "✔" if channel.priority else "❌" diff --git a/requirements.txt b/requirements.txt index 0ed5ab2..1bed197 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ aiohttp>2.0,<4.0 pystray -websockets \ No newline at end of file +websockets>10.0 diff --git a/twitch.py b/twitch.py index ab4dd23..fb9cc77 100644 --- a/twitch.py +++ b/twitch.py @@ -17,8 +17,8 @@ from typing import ( Set, OrderedDict, Callable, - Iterable, AsyncIterator, + Final, cast, TYPE_CHECKING, ) @@ -41,7 +41,9 @@ from constants import ( USER_AGENT, COOKIES_PATH, GQL_OPERATIONS, + MAX_WEBSOCKETS, WATCH_INTERVAL, + WS_TOPICS_LIMIT, DROPS_ENABLED_TAG, JsonType, State, @@ -49,8 +51,6 @@ from constants import ( WebsocketTopic, ) -from exceptions import MinerException - if TYPE_CHECKING: from gui import LoginForm from main import ParsedArgs @@ -60,6 +60,13 @@ logger = logging.getLogger("TwitchDrops") gql_logger = logging.getLogger("TwitchDrops.gql") +def viewers_key(channel: Channel) -> int: + viewers = channel.viewers + if viewers is not None: + return viewers + return -1 + + class Twitch: def __init__(self, options: ParsedArgs): self.options = options @@ -174,6 +181,7 @@ class Twitch: ]) games: List[Game] = [] full_cleanup: bool = False + channels: Final[OrderedDict[int, Channel]] = self.channels self.change_state(State.INVENTORY_FETCH) while True: if self._state is State.IDLE: @@ -214,10 +222,6 @@ class Twitch: self.change_state(State.GAME_SELECT) elif self._state is State.GAME_SELECT: self.game = self.gui.games.get_selection() - # pre-display the active drop without a countdown - active_drop = self.get_active_drop() - if active_drop is not None: - active_drop.display(countdown=False) # restart the watch loop if needed self.restart_watching() # signal channel cleanup that we're removing everything @@ -226,12 +230,12 @@ class Twitch: elif self._state is State.CHANNELS_CLEANUP: if self.game is None or full_cleanup: # no game selected or we're doing full cleanup: remove everything - to_remove: List[Channel] = list(self.channels.values()) + to_remove: List[Channel] = list(channels.values()) else: # remove all channels that: to_remove = [ channel - for channel in self.channels.values() + for channel in channels.values() if ( not channel.priority # aren't prioritized and ( @@ -242,14 +246,14 @@ class Twitch: ) ] full_cleanup = False - self.websocket.remove_topics( - WebsocketTopic.as_str("Channel", "VideoPlayback", channel.id) - for channel in to_remove - ) - for channel in to_remove: - del self.channels[channel.id] - channel.remove() - self.gui.channels.shrink() + if to_remove: + self.websocket.remove_topics( + WebsocketTopic.as_str("Channel", "StreamState", channel.id) + for channel in to_remove + ) + for channel in to_remove: + del channels[channel.id] + channel.remove() self.change_state(State.CHANNELS_FETCH) elif self._state is State.CHANNELS_FETCH: if self.game is None: @@ -259,61 +263,77 @@ class Twitch: if (active_drop := self.get_active_drop()) is not None: active_drop.display(countdown=False) # gather ACLs from campaigns + # NOTE: we consider only campaigns that can be progressed no_acl = False new_channels: OrderedSet[Channel] = OrderedSet() for campaign in self.inventory[self.game]: - acls = campaign.allowed_channels - if acls: - for channel in acls: - new_channels.add(channel) - else: - no_acl = True + if any(drop.can_earn() for drop in campaign.drops): + acl = campaign.allowed_channels + if acl: + new_channels.update(acl) + else: + no_acl = True # set them online if possible await asyncio.gather(*(channel.check_online() for channel in new_channels)) if no_acl: - # if there's at least one game without an ACL, - # get a list of all live channels with drops enabled - live_streams: List[Channel] = await self.get_live_streams() - for channel in live_streams: - new_channels.add(channel) - for channel in new_channels: - if self.can_watch(channel): - # there are streams we can watch, so let's pre-display the active drop - # again, but this time with a substracted minute - if (active_drop := self.get_active_drop(channel)) is not None: - active_drop.display(countdown=False, subone=True) - break - # add them, filtering out ones we already have - for channel in new_channels: - if (channel_id := channel.id) not in self.channels: - self.channels[channel_id] = channel - channel.display(add=True) - # Subscribe to these channel's state updates - topics: List[WebsocketTopic] = [ - WebsocketTopic( - "Channel", "VideoPlayback", channel_id, self.process_stream_state + # if there's at least one campaign without an ACL, + # add a list of live channels with drops enabled + new_channels.update(await self.get_live_streams()) + # merge current channels into new ones + new_channels.update(self.channels.values()) + # sort them descending by viewers, + # then by priority so that prioritized ones are first + # NOTE: We can drop OrderedSet now because there's no more channels being added + ordered_channels: List[Channel] = sorted( + new_channels, key=viewers_key, reverse=True + ) + ordered_channels.sort(key=lambda ch: ch.priority, reverse=True) + # ensure that we won't end up with more channels than we can handle + # NOTE: we substract 2 due to the two base topics always being added: + # channel points gain and drop update subscriptions + # NOTE: we trim from the end because that's where the non-priority, + # offline (or online but low viewers) channels end up + max_channels = MAX_WEBSOCKETS * WS_TOPICS_LIMIT - 2 + to_remove = ordered_channels[max_channels:] + ordered_channels = ordered_channels[:max_channels] + if to_remove: + # tracked channels and gui are cleared below, so no need to do it here + # just make sure to unsubscribe from their topics + self.websocket.remove_topics( + WebsocketTopic.as_str("Channel", "StreamState", channel.id) + for channel in to_remove ) - for channel_id in self.channels - ] - - try: - self.websocket.add_topics(topics) - except MinerException as err: - # don't panic if we have more topics than available sockets - # but still log an error - logger.error(err) - pass - + # set our new channel list + channels.clear() + self.gui.channels.clear() + self.gui.channels.shrink() + for channel in ordered_channels: + channels[channel.id] = channel + channel.display(add=True) + # subscribe to these channel's state updates + self.websocket.add_topics([ + WebsocketTopic( + "Channel", "StreamState", channel_id, self.process_stream_state + ) + for channel_id in channels + ]) # relink watching channel after cleanup, # or stop watching it if it no longer qualifies watching_channel = self.watching_channel.get_with_default(None) if watching_channel is not None: - new_watching = self.channels.get(watching_channel.id) + new_watching = channels.get(watching_channel.id) if new_watching is not None and self.can_watch(new_watching): self.watch(new_watching) else: # we're removing a channel we're watching self.stop_watching() + # pre-display the active drop again with a substracted minute + for channel in channels.values(): + # check if there's any channels we can watch first + if self.can_watch(channel): + if (active_drop := self.get_active_drop(channel)) is not None: + active_drop.display(countdown=False, subone=True) + break self.change_state(State.CHANNEL_SWITCH) elif self._state is State.CHANNEL_SWITCH: if self.game is None: @@ -321,7 +341,6 @@ class Twitch: else: # Change into the selected channel, stay in the watching channel, # or select a new channel that meets the required conditions - channels: Iterable[Channel] priority_channels: List[Channel] = [] selected_channel = self.gui.channels.get_selection() if selected_channel is not None: @@ -330,9 +349,8 @@ class Twitch: watching_channel = self.watching_channel.get_with_default(None) if watching_channel is not None: priority_channels.append(watching_channel) - channels = chain(priority_channels, self.channels.values()) # If there's no selected channel, change into a channel we can watch - for channel in channels: + for channel in chain(priority_channels, channels.values()): if self.can_watch(channel): self.watch(channel) # break the state change chain by clearing the flag diff --git a/utils.py b/utils.py index 3eb9056..7be6471 100644 --- a/utils.py +++ b/utils.py @@ -56,14 +56,14 @@ class OrderedSet(MutableSet[_V]): Implementation of a set that preserves insertion order, based on OrderedDict with values set to None. """ - def __init__(self, iterable: Iterable[_V] = []): + def __init__(self, /, iterable: Iterable[_V] = []): self._items: OrderedDict[_V, None] = OrderedDict((item, None) for item in iterable) def __repr__(self) -> str: return f"{self.__class__.__name__}([{', '.join(map(repr, self._items))}])" - def __contains__(self, x: object) -> bool: - return x in self._items + def __contains__(self, /, item: object) -> bool: + return item in self._items def __iter__(self) -> Iterator[_V]: return iter(self._items) @@ -71,13 +71,25 @@ class OrderedSet(MutableSet[_V]): def __len__(self) -> int: return len(self._items) - def add(self, item: _V) -> None: + def add(self, /, item: _V) -> None: self._items[item] = None - def discard(self, item: _V) -> None: + def discard(self, /, item: _V) -> None: with suppress(KeyError): del self._items[item] + def update(self, /, *others: Iterable[_V]) -> None: + for it in others: + for item in it: + if item not in self._items: + self._items[item] = None + + def difference_update(self, *others: Iterable[_V]) -> None: + for it in others: + for item in it: + if item in self._items: + del self._items[item] + class AwaitableValue(Generic[_V]): def __init__(self): diff --git a/websocket.py b/websocket.py index a223004..85ab511 100644 --- a/websocket.py +++ b/websocket.py @@ -7,8 +7,11 @@ from time import time from contextlib import suppress from typing import Optional, List, Dict, Set, Iterable, TYPE_CHECKING -from websockets.exceptions import ConnectionClosed, ConnectionClosedOK -from websockets.client import WebSocketClientProtocol, connect as websocket_connect +try: + from websockets.exceptions import ConnectionClosed, ConnectionClosedOK + from websockets.client import WebSocketClientProtocol, connect as websocket_connect +except ModuleNotFoundError as exc: + raise ImportError("You have to run 'pip install websockets' first") from exc from exceptions import MinerException from utils import task_wrapper, create_nonce