mirror of
https://github.com/rangermix/TwitchDropsMiner.git
synced 2026-05-26 07:08:04 +00:00
Implement handling for broadcast-settings-update
This commit is contained in:
@@ -10,7 +10,7 @@ Every ~60 seconds, the application sends a "minute watched" event to the channel
|
||||
|
||||
- Stream-less drop mining - save on bandwidth.
|
||||
- Game priority and exclusion lists, allowing you to focus on mining what you want, in the order you want, and ignore what you don't want.
|
||||
- Sharded websocket connection, allowing for tracking up to `8*50-2=398` channels at the same time.
|
||||
- Sharded websocket connection, allowing for tracking up to `8*25-2=199` channels at the same time.
|
||||
- Automatic drop campaigns discovery based on linked accounts (requires you to do [account linking](https://www.twitch.tv/drops/campaigns) yourself though)
|
||||
- Stream tags and drop campaign validation, to ensure you won't end up mining a stream that can't earn you the drop.
|
||||
- Automatic channel stream switching, when the one you were currently watching goes offline, as well as when a channel streaming a higher priority game goes online.
|
||||
|
||||
34
channel.py
34
channel.py
@@ -178,7 +178,7 @@ class Channel:
|
||||
This is because 'stream-up' event is received way before
|
||||
stream information becomes available.
|
||||
"""
|
||||
return self._pending_stream_up is not None
|
||||
return self._stream is None and self._pending_stream_up is not None
|
||||
|
||||
@property
|
||||
def game(self) -> Game | None:
|
||||
@@ -250,12 +250,20 @@ class Channel:
|
||||
return None
|
||||
return Stream.from_get_stream(self, stream_data)
|
||||
|
||||
async def check_online(self) -> bool:
|
||||
async def update_stream(self, *, trigger_events: bool) -> bool:
|
||||
"""
|
||||
Fetches the current channel stream, and if one exists,
|
||||
updates it's game, title, tags and viewers. Updates channel status in general.
|
||||
|
||||
Setting 'trigger_events' to True will trigger on_online and on_offline events,
|
||||
if the new status differs from the one set before the call.
|
||||
"""
|
||||
old_stream = self._stream
|
||||
self._stream = await self.get_stream()
|
||||
invalidate_cache(self, "_payload")
|
||||
if self._stream is None:
|
||||
return False
|
||||
return True
|
||||
if trigger_events:
|
||||
self._twitch.on_channel_update(self, old_stream, self._stream)
|
||||
return self._stream is not None
|
||||
|
||||
async def _online_delay(self):
|
||||
"""
|
||||
@@ -263,11 +271,8 @@ class Channel:
|
||||
so just wait a bit and check if it's actually online by then.
|
||||
"""
|
||||
await asyncio.sleep(ONLINE_DELAY.total_seconds())
|
||||
online = await self.check_online()
|
||||
self._pending_stream_up = None # for 'display' to work properly
|
||||
self.display()
|
||||
if online:
|
||||
self._twitch.on_online(self)
|
||||
await self.update_stream(trigger_events=True)
|
||||
|
||||
def set_online(self):
|
||||
"""
|
||||
@@ -275,8 +280,9 @@ class Channel:
|
||||
it's going to be set to ONLINE.
|
||||
|
||||
This is called externally, if we receive an event about this happening.
|
||||
This is also called when an already online channel needs a delayed status update.
|
||||
"""
|
||||
if self.offline:
|
||||
if self._pending_stream_up is None:
|
||||
self._pending_stream_up = asyncio.create_task(self._online_delay())
|
||||
self.display()
|
||||
|
||||
@@ -286,15 +292,19 @@ class Channel:
|
||||
|
||||
This is called externally, if we receive an event about this happening.
|
||||
"""
|
||||
needs_display: bool = False
|
||||
if self._pending_stream_up is not None:
|
||||
self._pending_stream_up.cancel()
|
||||
self._pending_stream_up = None
|
||||
self.display()
|
||||
needs_display = True
|
||||
if self.online:
|
||||
old_stream = self._stream
|
||||
self._stream = None
|
||||
invalidate_cache(self, "_payload")
|
||||
self._twitch.on_channel_update(self, old_stream, self._stream)
|
||||
needs_display = False
|
||||
if needs_display:
|
||||
self.display()
|
||||
self._twitch.on_offline(self)
|
||||
|
||||
async def claim_bonus(self):
|
||||
"""
|
||||
|
||||
@@ -19,6 +19,9 @@ if TYPE_CHECKING:
|
||||
|
||||
# True if we're running from a built EXE, False inside a dev build
|
||||
IS_PACKAGED = hasattr(sys, "_MEIPASS")
|
||||
# logging special levels
|
||||
CALL = logging.INFO - 1
|
||||
logging.addLevelName(CALL, "CALL")
|
||||
|
||||
|
||||
def _resource_path(relative_path: Path | str) -> Path:
|
||||
@@ -57,8 +60,12 @@ JsonType = Dict[str, Any]
|
||||
URLType = NewType("URLType", str)
|
||||
TopicProcess: TypeAlias = "abc.Callable[[int, JsonType], Any]"
|
||||
# Values
|
||||
BASE_TOPICS = 2
|
||||
MAX_WEBSOCKETS = 8
|
||||
WS_TOPICS_LIMIT = 50
|
||||
TOPICS_PER_CHANNEL = 2
|
||||
MAX_TOPICS = (MAX_WEBSOCKETS * WS_TOPICS_LIMIT) - BASE_TOPICS
|
||||
MAX_CHANNELS = MAX_TOPICS // TOPICS_PER_CHANNEL
|
||||
# Misc
|
||||
DEFAULT_LANG = "English"
|
||||
BASE_URL = URL("https://twitch.tv")
|
||||
@@ -262,7 +269,6 @@ WEBSOCKET_TOPICS: dict[str, dict[str, str]] = {
|
||||
"Drops": "channel-drop-events", # unused
|
||||
"CommunityPoints": "community-points-channel-v1", # unused
|
||||
"StreamState": "video-playback-by-id",
|
||||
# currently unused, can be used to receive updates regarding stream's title and tag changes
|
||||
"StreamUpdate": "broadcast-settings-update",
|
||||
},
|
||||
}
|
||||
|
||||
11
gui.py
11
gui.py
@@ -45,7 +45,7 @@ if TYPE_CHECKING:
|
||||
|
||||
|
||||
TK_PADDING = Union[int, Tuple[int, int], Tuple[int, int, int], Tuple[int, int, int, int]]
|
||||
digits = ceil(log10(WS_TOPICS_LIMIT))
|
||||
DIGITS = ceil(log10(WS_TOPICS_LIMIT))
|
||||
|
||||
|
||||
######################
|
||||
@@ -372,7 +372,7 @@ class WebsocketStatus:
|
||||
ttk.Label(
|
||||
frame,
|
||||
textvariable=self._topics_var,
|
||||
width=(digits * 2 + 1),
|
||||
width=(DIGITS * 2 + 1),
|
||||
justify="right",
|
||||
style="MS.TLabel",
|
||||
).grid(column=2, row=0)
|
||||
@@ -407,7 +407,7 @@ class WebsocketStatus:
|
||||
topic_lines.append('')
|
||||
else:
|
||||
status_lines.append(item["status"])
|
||||
topic_lines.append(f"{item['topics']:>{digits}}/{WS_TOPICS_LIMIT}")
|
||||
topic_lines.append(f"{item['topics']:>{DIGITS}}/{WS_TOPICS_LIMIT}")
|
||||
self._status_var.set('\n'.join(status_lines))
|
||||
self._topics_var.set('\n'.join(topic_lines))
|
||||
|
||||
@@ -912,6 +912,10 @@ class ChannelList:
|
||||
self.shrink()
|
||||
|
||||
def display(self, channel: Channel, *, add: bool = False):
|
||||
iid = channel.iid
|
||||
if not add and iid not in self._channel_map:
|
||||
# the channel isn't on the list and we're not supposed to add it
|
||||
return
|
||||
# ACL-based
|
||||
acl_based = "✔" if channel.acl_based else "❌"
|
||||
# status
|
||||
@@ -933,7 +937,6 @@ class ChannelList:
|
||||
points = ''
|
||||
if channel.points is not None:
|
||||
points = str(channel.points)
|
||||
iid = channel.iid
|
||||
if iid in self._channel_map:
|
||||
self._set(iid, "game", game)
|
||||
self._set(iid, "drops", drops)
|
||||
|
||||
171
twitch.py
171
twitch.py
@@ -58,9 +58,8 @@ from constants import (
|
||||
CLIENT_ID,
|
||||
COOKIES_PATH,
|
||||
GQL_OPERATIONS,
|
||||
MAX_WEBSOCKETS,
|
||||
MAX_CHANNELS,
|
||||
WATCH_INTERVAL,
|
||||
WS_TOPICS_LIMIT,
|
||||
DROPS_ENABLED_TAG,
|
||||
ANDROID_USER_AGENT,
|
||||
State,
|
||||
@@ -70,6 +69,7 @@ from constants import (
|
||||
if TYPE_CHECKING:
|
||||
from utils import Game
|
||||
from gui import LoginForm
|
||||
from channel import Stream
|
||||
from settings import Settings
|
||||
from inventory import TimedDrop
|
||||
from constants import JsonType, GQLOperation
|
||||
@@ -758,10 +758,10 @@ class Twitch:
|
||||
self.gui.status.update(_("gui", "status", "cleanup"))
|
||||
if not self.wanted_games or full_cleanup:
|
||||
# no games selected or we're doing full cleanup: remove everything
|
||||
to_remove: list[Channel] = list(channels.values())
|
||||
to_remove_channels: list[Channel] = list(channels.values())
|
||||
else:
|
||||
# remove all channels that:
|
||||
to_remove = [
|
||||
to_remove_channels = [
|
||||
channel
|
||||
for channel in channels.values()
|
||||
if (
|
||||
@@ -774,15 +774,20 @@ class Twitch:
|
||||
)
|
||||
]
|
||||
full_cleanup = False
|
||||
if to_remove:
|
||||
self.websocket.remove_topics(
|
||||
WebsocketTopic.as_str("Channel", "StreamState", channel.id)
|
||||
for channel in to_remove
|
||||
)
|
||||
for channel in to_remove:
|
||||
if to_remove_channels:
|
||||
to_remove_topics: list[str] = []
|
||||
for channel in to_remove_channels:
|
||||
to_remove_topics.append(
|
||||
WebsocketTopic.as_str("Channel", "StreamState", channel.id)
|
||||
)
|
||||
to_remove_topics.append(
|
||||
WebsocketTopic.as_str("Channel", "StreamUpdate", channel.id)
|
||||
)
|
||||
self.websocket.remove_topics(to_remove_topics)
|
||||
for channel in to_remove_channels:
|
||||
del channels[channel.id]
|
||||
channel.remove()
|
||||
del to_remove
|
||||
del to_remove_channels, to_remove_topics
|
||||
if self.wanted_games:
|
||||
self.change_state(State.CHANNELS_FETCH)
|
||||
else:
|
||||
@@ -791,8 +796,10 @@ class Twitch:
|
||||
self.change_state(State.IDLE)
|
||||
elif self._state is State.CHANNELS_FETCH:
|
||||
self.gui.status.update(_("gui", "status", "gathering"))
|
||||
# start with all current channels
|
||||
new_channels: OrderedSet[Channel] = OrderedSet(self.channels.values())
|
||||
# start with all current channels, clear the memory and GUI
|
||||
new_channels: OrderedSet[Channel] = OrderedSet(channels.values())
|
||||
channels.clear()
|
||||
self.gui.channels.clear()
|
||||
# gather and add ACL channels from campaigns
|
||||
# NOTE: we consider only campaigns that can be progressed
|
||||
# NOTE: we use another set so that we can set them online separately
|
||||
@@ -812,7 +819,9 @@ class Twitch:
|
||||
acl_channels.difference_update(new_channels)
|
||||
# use the other set to set them online if possible
|
||||
if acl_channels:
|
||||
await asyncio.gather(*(channel.check_online() for channel in acl_channels))
|
||||
await asyncio.gather(
|
||||
*(channel.update_stream(trigger_events=False) for channel in acl_channels)
|
||||
)
|
||||
# finally, add them as new channels
|
||||
new_channels.update(acl_channels)
|
||||
for game in no_acl:
|
||||
@@ -827,40 +836,47 @@ class Twitch:
|
||||
ordered_channels.sort(key=lambda ch: ch.acl_based, reverse=True)
|
||||
ordered_channels.sort(key=self.get_priority, reverse=True)
|
||||
# ensure that we won't end up with more channels than we can handle
|
||||
# NOTE: we substract 2 due to the two base topics always being added:
|
||||
# channel points gain and drop update subscriptions
|
||||
# NOTE: we trim from the end because that's where the non-priority,
|
||||
# offline (or online but low viewers) channels end up
|
||||
max_channels = MAX_WEBSOCKETS * WS_TOPICS_LIMIT - 2
|
||||
to_remove = ordered_channels[max_channels:]
|
||||
ordered_channels = ordered_channels[:max_channels]
|
||||
if to_remove:
|
||||
# tracked channels and gui are cleared below, so no need to do it here
|
||||
to_remove_channels = ordered_channels[MAX_CHANNELS:]
|
||||
ordered_channels = ordered_channels[:MAX_CHANNELS]
|
||||
if to_remove_channels:
|
||||
# tracked channels and gui were cleared earlier, so no need to do it here
|
||||
# just make sure to unsubscribe from their topics
|
||||
self.websocket.remove_topics(
|
||||
WebsocketTopic.as_str("Channel", "StreamState", channel.id)
|
||||
for channel in to_remove
|
||||
)
|
||||
del to_remove
|
||||
to_remove_topics = []
|
||||
for channel in to_remove_channels:
|
||||
to_remove_topics.append(
|
||||
WebsocketTopic.as_str("Channel", "StreamState", channel.id)
|
||||
)
|
||||
to_remove_topics.append(
|
||||
WebsocketTopic.as_str("Channel", "StreamUpdate", channel.id)
|
||||
)
|
||||
self.websocket.remove_topics(to_remove_topics)
|
||||
del to_remove_channels, to_remove_topics
|
||||
# set our new channel list
|
||||
channels.clear()
|
||||
self.gui.channels.clear()
|
||||
for channel in ordered_channels:
|
||||
channels[channel.id] = channel
|
||||
channel.display(add=True)
|
||||
# subscribe to these channel's state updates
|
||||
self.websocket.add_topics([
|
||||
WebsocketTopic(
|
||||
"Channel", "StreamState", channel_id, self.process_stream_state
|
||||
to_add_topics: list[WebsocketTopic] = []
|
||||
for channel_id in channels:
|
||||
to_add_topics.append(
|
||||
WebsocketTopic(
|
||||
"Channel", "StreamState", channel_id, self.process_stream_state
|
||||
)
|
||||
)
|
||||
for channel_id in channels
|
||||
])
|
||||
to_add_topics.append(
|
||||
WebsocketTopic(
|
||||
"Channel", "StreamUpdate", channel_id, self.process_stream_update
|
||||
)
|
||||
)
|
||||
self.websocket.add_topics(to_add_topics)
|
||||
# relink watching channel after cleanup,
|
||||
# or stop watching it if it no longer qualifies
|
||||
# NOTE: this replaces 'self.watching_channel's internal value with the new object
|
||||
watching_channel = self.watching_channel.get_with_default(None)
|
||||
if watching_channel is not None:
|
||||
new_watching = channels.get(watching_channel.id)
|
||||
new_watching: Channel | None = channels.get(watching_channel.id)
|
||||
if new_watching is not None and self.can_watch(new_watching):
|
||||
self.watch(new_watching)
|
||||
else:
|
||||
@@ -874,6 +890,15 @@ class Twitch:
|
||||
active_drop.display(countdown=False, subone=True)
|
||||
break
|
||||
self.change_state(State.CHANNEL_SWITCH)
|
||||
del (
|
||||
no_acl,
|
||||
acl_channels,
|
||||
new_channels,
|
||||
new_watching,
|
||||
to_add_topics,
|
||||
ordered_channels,
|
||||
watching_channel,
|
||||
)
|
||||
elif self._state is State.CHANNEL_SWITCH:
|
||||
self.gui.status.update(_("gui", "status", "switching"))
|
||||
# Change into the selected channel, stay in the watching channel,
|
||||
@@ -892,7 +917,6 @@ class Twitch:
|
||||
if self.can_watch(channel) and self.should_switch(channel):
|
||||
new_watching = channel
|
||||
break
|
||||
watching_channel = self.watching_channel.get_with_default(None)
|
||||
if new_watching is not None:
|
||||
# if we have a better switch target - do so
|
||||
self.watch(new_watching)
|
||||
@@ -901,17 +925,21 @@ class Twitch:
|
||||
)
|
||||
# break the state change chain by clearing the flag
|
||||
self._state_change.clear()
|
||||
elif watching_channel is not None:
|
||||
elif (
|
||||
(watching_channel := self.watching_channel.get_with_default(None)) is not None
|
||||
):
|
||||
# otherwise, continue watching what we had before
|
||||
self.gui.status.update(
|
||||
_("gui", "status", "watching").format(channel=watching_channel.name)
|
||||
)
|
||||
# break the state change chain by clearing the flag
|
||||
self._state_change.clear()
|
||||
del watching_channel
|
||||
else:
|
||||
# not watching anything and there isn't anything to watch either
|
||||
self.print(_("status", "no_channel"))
|
||||
self.change_state(State.IDLE)
|
||||
del new_watching, selected_channel
|
||||
elif self._state is State.EXIT:
|
||||
self.gui.status.update(_("gui", "status", "exiting"))
|
||||
# we've been requested to exit the application
|
||||
@@ -1087,13 +1115,63 @@ class Twitch:
|
||||
else:
|
||||
logger.warning(f"Unknown stream state: {msg_type}")
|
||||
|
||||
def on_online(self, channel: Channel):
|
||||
@task_wrapper
|
||||
async def process_stream_update(self, channel_id: int, message: JsonType):
|
||||
# message = {
|
||||
# "channel_id": "12345678",
|
||||
# "type": "broadcast_settings_update",
|
||||
# "channel": "channel._login",
|
||||
# "old_status": "Old title",
|
||||
# "status": "New title",
|
||||
# "old_game": "Old game name",
|
||||
# "game": "New game name",
|
||||
# "old_game_id": 123456,
|
||||
# "game_id": 123456
|
||||
# }
|
||||
channel = self.channels.get(channel_id)
|
||||
if channel is None:
|
||||
logger.error(f"Broadcast settings update for a non-existing channel: {channel_id}")
|
||||
return
|
||||
# There's no information about channel tags here, but this event is triggered
|
||||
# when the tags change. We can use this to just update the stream data after the change.
|
||||
# Use 'set_online' to introduce a delay, allowing for multiple title and tags
|
||||
# changes before we update. This eventually calls 'on_channel_update' below.
|
||||
channel.set_online()
|
||||
|
||||
def on_channel_update(
|
||||
self, channel: Channel, stream_before: Stream | None, stream_after: Stream | None
|
||||
):
|
||||
"""
|
||||
Called by a Channel when it goes online (after pending).
|
||||
Called by a Channel when it's status is updated (ONLINE, OFFLINE, title/tags change).
|
||||
|
||||
NOTE: 'stream_before' gets dealocated once this function finishes.
|
||||
"""
|
||||
logger.info(f"{channel.name} goes ONLINE")
|
||||
if stream_before is None and stream_after is not None:
|
||||
# Channel going ONLINE
|
||||
logger.info(f"{channel.name} goes ONLINE")
|
||||
# continue to below
|
||||
elif stream_before is not None and stream_after is None:
|
||||
# Channel going OFFLINE
|
||||
# Change the channel if we're currently watching it
|
||||
watching_channel = self.watching_channel.get_with_default(None)
|
||||
if watching_channel is not None and watching_channel == channel:
|
||||
self.print(_("status", "goes_offline").format(channel=channel.name))
|
||||
self.change_state(State.CHANNEL_SWITCH)
|
||||
else:
|
||||
logger.info(f"{channel.name} goes OFFLINE")
|
||||
channel.display()
|
||||
return
|
||||
elif stream_after is not None and stream_after is not None:
|
||||
# Channel is and stays ONLINE, but has been updated
|
||||
logger.info(f"{channel.name} status has been updated")
|
||||
# continue to below
|
||||
else:
|
||||
# Channel was OFFLINE and stays that way
|
||||
# Nothing to do here for now
|
||||
return
|
||||
|
||||
if (
|
||||
self.can_watch(channel) # we can watch the channel that just got ONLINE
|
||||
self.can_watch(channel) # we can watch the channel
|
||||
and self.should_switch(channel) # and we should!
|
||||
):
|
||||
self.watch(channel)
|
||||
@@ -1101,18 +1179,7 @@ class Twitch:
|
||||
self.gui.status.update(
|
||||
_("gui", "status", "watching").format(channel=channel.name)
|
||||
)
|
||||
|
||||
def on_offline(self, channel: Channel):
|
||||
"""
|
||||
Called by a Channel when it goes offline.
|
||||
"""
|
||||
# change the channel if we're currently watching it
|
||||
watching_channel = self.watching_channel.get_with_default(None)
|
||||
if watching_channel is not None and watching_channel == channel:
|
||||
self.print(_("status", "goes_offline").format(channel=channel.name))
|
||||
self.change_state(State.CHANNEL_SWITCH)
|
||||
else:
|
||||
logger.info(f"{channel.name} goes OFFLINE")
|
||||
channel.display()
|
||||
|
||||
@task_wrapper
|
||||
async def process_drops(self, user_id: int, message: JsonType):
|
||||
|
||||
Reference in New Issue
Block a user