Handle maximum channels limit;

Proper handle of a case where the app ends up with more channels from ACLs than is supported
This commit is contained in:
DevilXD
2022-02-13 20:13:11 +01:00
parent ce9af7b4ba
commit 423e51bc27
7 changed files with 111 additions and 72 deletions

6
.vscode/launch.json vendored
View File

@@ -16,7 +16,7 @@
"type": "python",
"request": "launch",
"program": "main.py",
"args": ["--no-run-check", "-vv"],
"args": ["--no-run-check", "--idle", "-vv"],
"console": "integratedTerminal",
"justMyCode": false
},
@@ -25,7 +25,7 @@
"type": "python",
"request": "launch",
"program": "main.py",
"args": ["--no-run-check", "-vvv"],
"args": ["--no-run-check", "--idle", "-vvv"],
"console": "integratedTerminal",
"justMyCode": false
},
@@ -34,7 +34,7 @@
"type": "python",
"request": "launch",
"program": "main.py",
"args": ["--no-run-check", "-vvv", "--debug-ws"],
"args": ["--no-run-check", "--idle", "-vvv", "--debug-ws"],
"console": "integratedTerminal",
"justMyCode": false
},

View File

@@ -312,7 +312,7 @@ class Channel:
self.display()
@cached_property
def _payload(self):
def _payload(self) -> JsonType:
assert self._stream is not None
payload = [
{

8
gui.py
View File

@@ -663,7 +663,9 @@ class ChannelList:
def set_watching(self, channel: Channel):
self.clear_watching()
self._table.item(channel.iid, tags="watching")
iid = channel.iid
self._table.item(iid, tags="watching")
self._table.see(iid)
def get_selection(self) -> Optional[Channel]:
if not self._channel_map:
@@ -676,6 +678,10 @@ class ChannelList:
def clear_selection(self):
self._table.selection_set('')
def clear(self):
iids = self._table.get_children()
self._table.delete(*iids)
def display(self, channel: Channel, *, add: bool = False):
# priority
priority = "" if channel.priority else ""

View File

@@ -1,3 +1,3 @@
aiohttp>2.0,<4.0
pystray
websockets
websockets>10.0

136
twitch.py
View File

@@ -17,8 +17,8 @@ from typing import (
Set,
OrderedDict,
Callable,
Iterable,
AsyncIterator,
Final,
cast,
TYPE_CHECKING,
)
@@ -41,7 +41,9 @@ from constants import (
USER_AGENT,
COOKIES_PATH,
GQL_OPERATIONS,
MAX_WEBSOCKETS,
WATCH_INTERVAL,
WS_TOPICS_LIMIT,
DROPS_ENABLED_TAG,
JsonType,
State,
@@ -49,8 +51,6 @@ from constants import (
WebsocketTopic,
)
from exceptions import MinerException
if TYPE_CHECKING:
from gui import LoginForm
from main import ParsedArgs
@@ -60,6 +60,13 @@ logger = logging.getLogger("TwitchDrops")
gql_logger = logging.getLogger("TwitchDrops.gql")
def viewers_key(channel: Channel) -> int:
viewers = channel.viewers
if viewers is not None:
return viewers
return -1
class Twitch:
def __init__(self, options: ParsedArgs):
self.options = options
@@ -174,6 +181,7 @@ class Twitch:
])
games: List[Game] = []
full_cleanup: bool = False
channels: Final[OrderedDict[int, Channel]] = self.channels
self.change_state(State.INVENTORY_FETCH)
while True:
if self._state is State.IDLE:
@@ -214,10 +222,6 @@ class Twitch:
self.change_state(State.GAME_SELECT)
elif self._state is State.GAME_SELECT:
self.game = self.gui.games.get_selection()
# pre-display the active drop without a countdown
active_drop = self.get_active_drop()
if active_drop is not None:
active_drop.display(countdown=False)
# restart the watch loop if needed
self.restart_watching()
# signal channel cleanup that we're removing everything
@@ -226,12 +230,12 @@ class Twitch:
elif self._state is State.CHANNELS_CLEANUP:
if self.game is None or full_cleanup:
# no game selected or we're doing full cleanup: remove everything
to_remove: List[Channel] = list(self.channels.values())
to_remove: List[Channel] = list(channels.values())
else:
# remove all channels that:
to_remove = [
channel
for channel in self.channels.values()
for channel in channels.values()
if (
not channel.priority # aren't prioritized
and (
@@ -242,14 +246,14 @@ class Twitch:
)
]
full_cleanup = False
self.websocket.remove_topics(
WebsocketTopic.as_str("Channel", "VideoPlayback", channel.id)
for channel in to_remove
)
for channel in to_remove:
del self.channels[channel.id]
channel.remove()
self.gui.channels.shrink()
if to_remove:
self.websocket.remove_topics(
WebsocketTopic.as_str("Channel", "StreamState", channel.id)
for channel in to_remove
)
for channel in to_remove:
del channels[channel.id]
channel.remove()
self.change_state(State.CHANNELS_FETCH)
elif self._state is State.CHANNELS_FETCH:
if self.game is None:
@@ -259,61 +263,77 @@ class Twitch:
if (active_drop := self.get_active_drop()) is not None:
active_drop.display(countdown=False)
# gather ACLs from campaigns
# NOTE: we consider only campaigns that can be progressed
no_acl = False
new_channels: OrderedSet[Channel] = OrderedSet()
for campaign in self.inventory[self.game]:
acls = campaign.allowed_channels
if acls:
for channel in acls:
new_channels.add(channel)
else:
no_acl = True
if any(drop.can_earn() for drop in campaign.drops):
acl = campaign.allowed_channels
if acl:
new_channels.update(acl)
else:
no_acl = True
# set them online if possible
await asyncio.gather(*(channel.check_online() for channel in new_channels))
if no_acl:
# if there's at least one game without an ACL,
# get a list of all live channels with drops enabled
live_streams: List[Channel] = await self.get_live_streams()
for channel in live_streams:
new_channels.add(channel)
for channel in new_channels:
if self.can_watch(channel):
# there are streams we can watch, so let's pre-display the active drop
# again, but this time with a substracted minute
if (active_drop := self.get_active_drop(channel)) is not None:
active_drop.display(countdown=False, subone=True)
break
# add them, filtering out ones we already have
for channel in new_channels:
if (channel_id := channel.id) not in self.channels:
self.channels[channel_id] = channel
channel.display(add=True)
# Subscribe to these channel's state updates
topics: List[WebsocketTopic] = [
WebsocketTopic(
"Channel", "VideoPlayback", channel_id, self.process_stream_state
# if there's at least one campaign without an ACL,
# add a list of live channels with drops enabled
new_channels.update(await self.get_live_streams())
# merge current channels into new ones
new_channels.update(self.channels.values())
# sort them descending by viewers,
# then by priority so that prioritized ones are first
# NOTE: We can drop OrderedSet now because there's no more channels being added
ordered_channels: List[Channel] = sorted(
new_channels, key=viewers_key, reverse=True
)
ordered_channels.sort(key=lambda ch: ch.priority, reverse=True)
# ensure that we won't end up with more channels than we can handle
# NOTE: we substract 2 due to the two base topics always being added:
# channel points gain and drop update subscriptions
# NOTE: we trim from the end because that's where the non-priority,
# offline (or online but low viewers) channels end up
max_channels = MAX_WEBSOCKETS * WS_TOPICS_LIMIT - 2
to_remove = ordered_channels[max_channels:]
ordered_channels = ordered_channels[:max_channels]
if to_remove:
# tracked channels and gui are cleared below, so no need to do it here
# just make sure to unsubscribe from their topics
self.websocket.remove_topics(
WebsocketTopic.as_str("Channel", "StreamState", channel.id)
for channel in to_remove
)
for channel_id in self.channels
]
try:
self.websocket.add_topics(topics)
except MinerException as err:
# don't panic if we have more topics than available sockets
# but still log an error
logger.error(err)
pass
# set our new channel list
channels.clear()
self.gui.channels.clear()
self.gui.channels.shrink()
for channel in ordered_channels:
channels[channel.id] = channel
channel.display(add=True)
# subscribe to these channel's state updates
self.websocket.add_topics([
WebsocketTopic(
"Channel", "StreamState", channel_id, self.process_stream_state
)
for channel_id in channels
])
# relink watching channel after cleanup,
# or stop watching it if it no longer qualifies
watching_channel = self.watching_channel.get_with_default(None)
if watching_channel is not None:
new_watching = self.channels.get(watching_channel.id)
new_watching = channels.get(watching_channel.id)
if new_watching is not None and self.can_watch(new_watching):
self.watch(new_watching)
else:
# we're removing a channel we're watching
self.stop_watching()
# pre-display the active drop again with a substracted minute
for channel in channels.values():
# check if there's any channels we can watch first
if self.can_watch(channel):
if (active_drop := self.get_active_drop(channel)) is not None:
active_drop.display(countdown=False, subone=True)
break
self.change_state(State.CHANNEL_SWITCH)
elif self._state is State.CHANNEL_SWITCH:
if self.game is None:
@@ -321,7 +341,6 @@ class Twitch:
else:
# Change into the selected channel, stay in the watching channel,
# or select a new channel that meets the required conditions
channels: Iterable[Channel]
priority_channels: List[Channel] = []
selected_channel = self.gui.channels.get_selection()
if selected_channel is not None:
@@ -330,9 +349,8 @@ class Twitch:
watching_channel = self.watching_channel.get_with_default(None)
if watching_channel is not None:
priority_channels.append(watching_channel)
channels = chain(priority_channels, self.channels.values())
# If there's no selected channel, change into a channel we can watch
for channel in channels:
for channel in chain(priority_channels, channels.values()):
if self.can_watch(channel):
self.watch(channel)
# break the state change chain by clearing the flag

View File

@@ -56,14 +56,14 @@ class OrderedSet(MutableSet[_V]):
Implementation of a set that preserves insertion order,
based on OrderedDict with values set to None.
"""
def __init__(self, iterable: Iterable[_V] = []):
def __init__(self, /, iterable: Iterable[_V] = []):
self._items: OrderedDict[_V, None] = OrderedDict((item, None) for item in iterable)
def __repr__(self) -> str:
return f"{self.__class__.__name__}([{', '.join(map(repr, self._items))}])"
def __contains__(self, x: object) -> bool:
return x in self._items
def __contains__(self, /, item: object) -> bool:
return item in self._items
def __iter__(self) -> Iterator[_V]:
return iter(self._items)
@@ -71,13 +71,25 @@ class OrderedSet(MutableSet[_V]):
def __len__(self) -> int:
return len(self._items)
def add(self, item: _V) -> None:
def add(self, /, item: _V) -> None:
self._items[item] = None
def discard(self, item: _V) -> None:
def discard(self, /, item: _V) -> None:
with suppress(KeyError):
del self._items[item]
def update(self, /, *others: Iterable[_V]) -> None:
for it in others:
for item in it:
if item not in self._items:
self._items[item] = None
def difference_update(self, *others: Iterable[_V]) -> None:
for it in others:
for item in it:
if item in self._items:
del self._items[item]
class AwaitableValue(Generic[_V]):
def __init__(self):

View File

@@ -7,8 +7,11 @@ from time import time
from contextlib import suppress
from typing import Optional, List, Dict, Set, Iterable, TYPE_CHECKING
from websockets.exceptions import ConnectionClosed, ConnectionClosedOK
from websockets.client import WebSocketClientProtocol, connect as websocket_connect
try:
from websockets.exceptions import ConnectionClosed, ConnectionClosedOK
from websockets.client import WebSocketClientProtocol, connect as websocket_connect
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install websockets' first") from exc
from exceptions import MinerException
from utils import task_wrapper, create_nonce