From 5751f96882c1121bf8268f992453aeb2ec35d2e4 Mon Sep 17 00:00:00 2001 From: DevilXD Date: Sun, 11 Dec 2022 18:39:38 +0100 Subject: [PATCH] Implement handling for broadcast-settings-update --- README.md | 2 +- channel.py | 34 ++++++---- constants.py | 8 ++- gui.py | 11 ++-- twitch.py | 171 +++++++++++++++++++++++++++++++++++---------------- 5 files changed, 156 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index f636e6f..8393e31 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Every ~60 seconds, the application sends a "minute watched" event to the channel - Stream-less drop mining - save on bandwidth. - Game priority and exclusion lists, allowing you to focus on mining what you want, in the order you want, and ignore what you don't want. -- Sharded websocket connection, allowing for tracking up to `8*50-2=398` channels at the same time. +- Sharded websocket connection, allowing for tracking up to `8*25-2=199` channels at the same time. - Automatic drop campaigns discovery based on linked accounts (requires you to do [account linking](https://www.twitch.tv/drops/campaigns) yourself though) - Stream tags and drop campaign validation, to ensure you won't end up mining a stream that can't earn you the drop. - Automatic channel stream switching, when the one you were currently watching goes offline, as well as when a channel streaming a higher priority game goes online. diff --git a/channel.py b/channel.py index 3bd07ae..f6f407e 100644 --- a/channel.py +++ b/channel.py @@ -178,7 +178,7 @@ class Channel: This is because 'stream-up' event is received way before stream information becomes available. """ - return self._pending_stream_up is not None + return self._stream is None and self._pending_stream_up is not None @property def game(self) -> Game | None: @@ -250,12 +250,20 @@ class Channel: return None return Stream.from_get_stream(self, stream_data) - async def check_online(self) -> bool: + async def update_stream(self, *, trigger_events: bool) -> bool: + """ + Fetches the current channel stream, and if one exists, + updates it's game, title, tags and viewers. Updates channel status in general. + + Setting 'trigger_events' to True will trigger on_online and on_offline events, + if the new status differs from the one set before the call. + """ + old_stream = self._stream self._stream = await self.get_stream() invalidate_cache(self, "_payload") - if self._stream is None: - return False - return True + if trigger_events: + self._twitch.on_channel_update(self, old_stream, self._stream) + return self._stream is not None async def _online_delay(self): """ @@ -263,11 +271,8 @@ class Channel: so just wait a bit and check if it's actually online by then. """ await asyncio.sleep(ONLINE_DELAY.total_seconds()) - online = await self.check_online() self._pending_stream_up = None # for 'display' to work properly - self.display() - if online: - self._twitch.on_online(self) + await self.update_stream(trigger_events=True) def set_online(self): """ @@ -275,8 +280,9 @@ class Channel: it's going to be set to ONLINE. This is called externally, if we receive an event about this happening. + This is also called when an already online channel needs a delayed status update. """ - if self.offline: + if self._pending_stream_up is None: self._pending_stream_up = asyncio.create_task(self._online_delay()) self.display() @@ -286,15 +292,19 @@ class Channel: This is called externally, if we receive an event about this happening. """ + needs_display: bool = False if self._pending_stream_up is not None: self._pending_stream_up.cancel() self._pending_stream_up = None - self.display() + needs_display = True if self.online: + old_stream = self._stream self._stream = None invalidate_cache(self, "_payload") + self._twitch.on_channel_update(self, old_stream, self._stream) + needs_display = False + if needs_display: self.display() - self._twitch.on_offline(self) async def claim_bonus(self): """ diff --git a/constants.py b/constants.py index f14b0b4..b8354b2 100644 --- a/constants.py +++ b/constants.py @@ -19,6 +19,9 @@ if TYPE_CHECKING: # True if we're running from a built EXE, False inside a dev build IS_PACKAGED = hasattr(sys, "_MEIPASS") +# logging special levels +CALL = logging.INFO - 1 +logging.addLevelName(CALL, "CALL") def _resource_path(relative_path: Path | str) -> Path: @@ -57,8 +60,12 @@ JsonType = Dict[str, Any] URLType = NewType("URLType", str) TopicProcess: TypeAlias = "abc.Callable[[int, JsonType], Any]" # Values +BASE_TOPICS = 2 MAX_WEBSOCKETS = 8 WS_TOPICS_LIMIT = 50 +TOPICS_PER_CHANNEL = 2 +MAX_TOPICS = (MAX_WEBSOCKETS * WS_TOPICS_LIMIT) - BASE_TOPICS +MAX_CHANNELS = MAX_TOPICS // TOPICS_PER_CHANNEL # Misc DEFAULT_LANG = "English" BASE_URL = URL("https://twitch.tv") @@ -262,7 +269,6 @@ WEBSOCKET_TOPICS: dict[str, dict[str, str]] = { "Drops": "channel-drop-events", # unused "CommunityPoints": "community-points-channel-v1", # unused "StreamState": "video-playback-by-id", - # currently unused, can be used to receive updates regarding stream's title and tag changes "StreamUpdate": "broadcast-settings-update", }, } diff --git a/gui.py b/gui.py index 57033b3..f50b181 100644 --- a/gui.py +++ b/gui.py @@ -45,7 +45,7 @@ if TYPE_CHECKING: TK_PADDING = Union[int, Tuple[int, int], Tuple[int, int, int], Tuple[int, int, int, int]] -digits = ceil(log10(WS_TOPICS_LIMIT)) +DIGITS = ceil(log10(WS_TOPICS_LIMIT)) ###################### @@ -372,7 +372,7 @@ class WebsocketStatus: ttk.Label( frame, textvariable=self._topics_var, - width=(digits * 2 + 1), + width=(DIGITS * 2 + 1), justify="right", style="MS.TLabel", ).grid(column=2, row=0) @@ -407,7 +407,7 @@ class WebsocketStatus: topic_lines.append('') else: status_lines.append(item["status"]) - topic_lines.append(f"{item['topics']:>{digits}}/{WS_TOPICS_LIMIT}") + topic_lines.append(f"{item['topics']:>{DIGITS}}/{WS_TOPICS_LIMIT}") self._status_var.set('\n'.join(status_lines)) self._topics_var.set('\n'.join(topic_lines)) @@ -912,6 +912,10 @@ class ChannelList: self.shrink() def display(self, channel: Channel, *, add: bool = False): + iid = channel.iid + if not add and iid not in self._channel_map: + # the channel isn't on the list and we're not supposed to add it + return # ACL-based acl_based = "✔" if channel.acl_based else "❌" # status @@ -933,7 +937,6 @@ class ChannelList: points = '' if channel.points is not None: points = str(channel.points) - iid = channel.iid if iid in self._channel_map: self._set(iid, "game", game) self._set(iid, "drops", drops) diff --git a/twitch.py b/twitch.py index 0e0c420..67b3b86 100644 --- a/twitch.py +++ b/twitch.py @@ -58,9 +58,8 @@ from constants import ( CLIENT_ID, COOKIES_PATH, GQL_OPERATIONS, - MAX_WEBSOCKETS, + MAX_CHANNELS, WATCH_INTERVAL, - WS_TOPICS_LIMIT, DROPS_ENABLED_TAG, ANDROID_USER_AGENT, State, @@ -70,6 +69,7 @@ from constants import ( if TYPE_CHECKING: from utils import Game from gui import LoginForm + from channel import Stream from settings import Settings from inventory import TimedDrop from constants import JsonType, GQLOperation @@ -758,10 +758,10 @@ class Twitch: self.gui.status.update(_("gui", "status", "cleanup")) if not self.wanted_games or full_cleanup: # no games selected or we're doing full cleanup: remove everything - to_remove: list[Channel] = list(channels.values()) + to_remove_channels: list[Channel] = list(channels.values()) else: # remove all channels that: - to_remove = [ + to_remove_channels = [ channel for channel in channels.values() if ( @@ -774,15 +774,20 @@ class Twitch: ) ] full_cleanup = False - if to_remove: - self.websocket.remove_topics( - WebsocketTopic.as_str("Channel", "StreamState", channel.id) - for channel in to_remove - ) - for channel in to_remove: + if to_remove_channels: + to_remove_topics: list[str] = [] + for channel in to_remove_channels: + to_remove_topics.append( + WebsocketTopic.as_str("Channel", "StreamState", channel.id) + ) + to_remove_topics.append( + WebsocketTopic.as_str("Channel", "StreamUpdate", channel.id) + ) + self.websocket.remove_topics(to_remove_topics) + for channel in to_remove_channels: del channels[channel.id] channel.remove() - del to_remove + del to_remove_channels, to_remove_topics if self.wanted_games: self.change_state(State.CHANNELS_FETCH) else: @@ -791,8 +796,10 @@ class Twitch: self.change_state(State.IDLE) elif self._state is State.CHANNELS_FETCH: self.gui.status.update(_("gui", "status", "gathering")) - # start with all current channels - new_channels: OrderedSet[Channel] = OrderedSet(self.channels.values()) + # start with all current channels, clear the memory and GUI + new_channels: OrderedSet[Channel] = OrderedSet(channels.values()) + channels.clear() + self.gui.channels.clear() # gather and add ACL channels from campaigns # NOTE: we consider only campaigns that can be progressed # NOTE: we use another set so that we can set them online separately @@ -812,7 +819,9 @@ class Twitch: acl_channels.difference_update(new_channels) # use the other set to set them online if possible if acl_channels: - await asyncio.gather(*(channel.check_online() for channel in acl_channels)) + await asyncio.gather( + *(channel.update_stream(trigger_events=False) for channel in acl_channels) + ) # finally, add them as new channels new_channels.update(acl_channels) for game in no_acl: @@ -827,40 +836,47 @@ class Twitch: ordered_channels.sort(key=lambda ch: ch.acl_based, reverse=True) ordered_channels.sort(key=self.get_priority, reverse=True) # ensure that we won't end up with more channels than we can handle - # NOTE: we substract 2 due to the two base topics always being added: - # channel points gain and drop update subscriptions # NOTE: we trim from the end because that's where the non-priority, # offline (or online but low viewers) channels end up - max_channels = MAX_WEBSOCKETS * WS_TOPICS_LIMIT - 2 - to_remove = ordered_channels[max_channels:] - ordered_channels = ordered_channels[:max_channels] - if to_remove: - # tracked channels and gui are cleared below, so no need to do it here + to_remove_channels = ordered_channels[MAX_CHANNELS:] + ordered_channels = ordered_channels[:MAX_CHANNELS] + if to_remove_channels: + # tracked channels and gui were cleared earlier, so no need to do it here # just make sure to unsubscribe from their topics - self.websocket.remove_topics( - WebsocketTopic.as_str("Channel", "StreamState", channel.id) - for channel in to_remove - ) - del to_remove + to_remove_topics = [] + for channel in to_remove_channels: + to_remove_topics.append( + WebsocketTopic.as_str("Channel", "StreamState", channel.id) + ) + to_remove_topics.append( + WebsocketTopic.as_str("Channel", "StreamUpdate", channel.id) + ) + self.websocket.remove_topics(to_remove_topics) + del to_remove_channels, to_remove_topics # set our new channel list - channels.clear() - self.gui.channels.clear() for channel in ordered_channels: channels[channel.id] = channel channel.display(add=True) # subscribe to these channel's state updates - self.websocket.add_topics([ - WebsocketTopic( - "Channel", "StreamState", channel_id, self.process_stream_state + to_add_topics: list[WebsocketTopic] = [] + for channel_id in channels: + to_add_topics.append( + WebsocketTopic( + "Channel", "StreamState", channel_id, self.process_stream_state + ) ) - for channel_id in channels - ]) + to_add_topics.append( + WebsocketTopic( + "Channel", "StreamUpdate", channel_id, self.process_stream_update + ) + ) + self.websocket.add_topics(to_add_topics) # relink watching channel after cleanup, # or stop watching it if it no longer qualifies # NOTE: this replaces 'self.watching_channel's internal value with the new object watching_channel = self.watching_channel.get_with_default(None) if watching_channel is not None: - new_watching = channels.get(watching_channel.id) + new_watching: Channel | None = channels.get(watching_channel.id) if new_watching is not None and self.can_watch(new_watching): self.watch(new_watching) else: @@ -874,6 +890,15 @@ class Twitch: active_drop.display(countdown=False, subone=True) break self.change_state(State.CHANNEL_SWITCH) + del ( + no_acl, + acl_channels, + new_channels, + new_watching, + to_add_topics, + ordered_channels, + watching_channel, + ) elif self._state is State.CHANNEL_SWITCH: self.gui.status.update(_("gui", "status", "switching")) # Change into the selected channel, stay in the watching channel, @@ -892,7 +917,6 @@ class Twitch: if self.can_watch(channel) and self.should_switch(channel): new_watching = channel break - watching_channel = self.watching_channel.get_with_default(None) if new_watching is not None: # if we have a better switch target - do so self.watch(new_watching) @@ -901,17 +925,21 @@ class Twitch: ) # break the state change chain by clearing the flag self._state_change.clear() - elif watching_channel is not None: + elif ( + (watching_channel := self.watching_channel.get_with_default(None)) is not None + ): # otherwise, continue watching what we had before self.gui.status.update( _("gui", "status", "watching").format(channel=watching_channel.name) ) # break the state change chain by clearing the flag self._state_change.clear() + del watching_channel else: # not watching anything and there isn't anything to watch either self.print(_("status", "no_channel")) self.change_state(State.IDLE) + del new_watching, selected_channel elif self._state is State.EXIT: self.gui.status.update(_("gui", "status", "exiting")) # we've been requested to exit the application @@ -1087,13 +1115,63 @@ class Twitch: else: logger.warning(f"Unknown stream state: {msg_type}") - def on_online(self, channel: Channel): + @task_wrapper + async def process_stream_update(self, channel_id: int, message: JsonType): + # message = { + # "channel_id": "12345678", + # "type": "broadcast_settings_update", + # "channel": "channel._login", + # "old_status": "Old title", + # "status": "New title", + # "old_game": "Old game name", + # "game": "New game name", + # "old_game_id": 123456, + # "game_id": 123456 + # } + channel = self.channels.get(channel_id) + if channel is None: + logger.error(f"Broadcast settings update for a non-existing channel: {channel_id}") + return + # There's no information about channel tags here, but this event is triggered + # when the tags change. We can use this to just update the stream data after the change. + # Use 'set_online' to introduce a delay, allowing for multiple title and tags + # changes before we update. This eventually calls 'on_channel_update' below. + channel.set_online() + + def on_channel_update( + self, channel: Channel, stream_before: Stream | None, stream_after: Stream | None + ): """ - Called by a Channel when it goes online (after pending). + Called by a Channel when it's status is updated (ONLINE, OFFLINE, title/tags change). + + NOTE: 'stream_before' gets dealocated once this function finishes. """ - logger.info(f"{channel.name} goes ONLINE") + if stream_before is None and stream_after is not None: + # Channel going ONLINE + logger.info(f"{channel.name} goes ONLINE") + # continue to below + elif stream_before is not None and stream_after is None: + # Channel going OFFLINE + # Change the channel if we're currently watching it + watching_channel = self.watching_channel.get_with_default(None) + if watching_channel is not None and watching_channel == channel: + self.print(_("status", "goes_offline").format(channel=channel.name)) + self.change_state(State.CHANNEL_SWITCH) + else: + logger.info(f"{channel.name} goes OFFLINE") + channel.display() + return + elif stream_after is not None and stream_after is not None: + # Channel is and stays ONLINE, but has been updated + logger.info(f"{channel.name} status has been updated") + # continue to below + else: + # Channel was OFFLINE and stays that way + # Nothing to do here for now + return + if ( - self.can_watch(channel) # we can watch the channel that just got ONLINE + self.can_watch(channel) # we can watch the channel and self.should_switch(channel) # and we should! ): self.watch(channel) @@ -1101,18 +1179,7 @@ class Twitch: self.gui.status.update( _("gui", "status", "watching").format(channel=channel.name) ) - - def on_offline(self, channel: Channel): - """ - Called by a Channel when it goes offline. - """ - # change the channel if we're currently watching it - watching_channel = self.watching_channel.get_with_default(None) - if watching_channel is not None and watching_channel == channel: - self.print(_("status", "goes_offline").format(channel=channel.name)) - self.change_state(State.CHANNEL_SWITCH) - else: - logger.info(f"{channel.name} goes OFFLINE") + channel.display() @task_wrapper async def process_drops(self, user_id: int, message: JsonType):