From c8d3b992d70ef54ce284ab14d4171648c498d40d Mon Sep 17 00:00:00 2001 From: DevilXD Date: Wed, 29 Dec 2021 13:55:05 +0100 Subject: [PATCH] Huge GUI update --- .gitignore | 2 +- .vscode/launch.json | 8 + channel.py | 122 ++++++- constants.py | 20 +- gui.py | 827 ++++++++++++++++++++++++++++++++++++++++++++ inventory.py | 18 +- main.py | 99 +----- twitch.py | 698 +++++++++++++++++++++++-------------- version.py | 1 + websocket.py | 118 +++---- 10 files changed, 1460 insertions(+), 453 deletions(-) create mode 100644 gui.py create mode 100644 version.py diff --git a/.gitignore b/.gitignore index c757a33..91f64dc 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,6 @@ __pycache__ .mypy_cache /build /dist -/cookies.pickle +/cookies.jar /settings.json /*.spec diff --git a/.vscode/launch.json b/.vscode/launch.json index ceeb788..c236f32 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -38,5 +38,13 @@ "console": "integratedTerminal", "justMyCode": false }, + { + "name": "Debug GUI", + "type": "python", + "request": "launch", + "program": "gui.py", + "console": "integratedTerminal", + "justMyCode": false + }, ] } diff --git a/channel.py b/channel.py index 11de9bd..239ff65 100644 --- a/channel.py +++ b/channel.py @@ -4,13 +4,16 @@ import re import json import asyncio import logging +from time import time from base64 import b64encode from datetime import datetime, timezone from typing import Any, Optional, TYPE_CHECKING from inventory import Game from exceptions import MinerException -from constants import JsonType, BASE_URL, GQL_OPERATIONS, ONLINE_DELAY, DROPS_ENABLED_TAG +from constants import ( + JsonType, BASE_URL, GQL_OPERATIONS, ONLINE_DELAY, WATCH_INTERVAL, DROPS_ENABLED_TAG +) if TYPE_CHECKING: from twitch import Twitch @@ -21,17 +24,17 @@ logger = logging.getLogger("TwitchDrops") class Stream: def __init__(self, channel: Channel, data: JsonType): - self._twitch = channel._twitch - self.channel = channel + self._twitch: Twitch = channel._twitch + self.channel: Channel = channel stream = data["stream"] self.broadcast_id = int(stream["id"]) - self.viewer_count = stream["viewersCount"] - self.drops_enabled = any(tag["id"] == DROPS_ENABLED_TAG for tag in stream["tags"]) + self.viewers: int = stream["viewersCount"] + self.drops_enabled: bool = any(tag["id"] == DROPS_ENABLED_TAG for tag in stream["tags"]) settings = data["broadcastSettings"] self.game: Optional[Game] = None if settings["game"] is not None: self.game = Game(settings["game"]) - self.title = settings["title"] + self.title: str = settings["title"] self._timestamp = datetime.now(timezone.utc) @classmethod @@ -40,7 +43,7 @@ class Stream: self._twitch = channel._twitch self.channel = channel self.broadcast_id = data["id"] - self.viewer_count = data["viewersCount"] + self.viewers = data["viewersCount"] self.drops_enabled = any(tag["id"] == DROPS_ENABLED_TAG for tag in data["tags"]) self.game = Game(data["game"]) self.title = data["title"] @@ -64,7 +67,8 @@ class Channel: self.name: str = channel_name self.url: str = f"{BASE_URL}/{channel_name}" self._spade_url: Optional[str] = None - self.stream: Optional[Stream] = None + self.points: Optional[int] = None + self._stream: Optional[Stream] = None self._pending_stream_up: Optional[asyncio.Task[Any]] = None await self.get_stream() @@ -77,10 +81,14 @@ class Channel: self.name = channel["displayName"] self.url = f"{BASE_URL}/{self.name}" self._spade_url = None - self.stream = Stream.from_directory(self, data) + self.points = None + self._stream = Stream.from_directory(self, data) self._pending_stream_up = None return self + def __repr__(self) -> str: + return f"Channel({self.name}, {self.id})" + def __eq__(self, other: object): if isinstance(other, self.__class__): return self.id == other.id @@ -89,12 +97,17 @@ class Channel: def __hash__(self) -> int: return hash((self.__class__.__name__, self.id)) + @property + def iid(self) -> str: + # this is responsible for the ID/Key of the columns inside channel list + return str(self.id) + @property def online(self) -> bool: """ Returns True if the streamer is online and is currently streaming, False otherwise. """ - return self.stream is not None + return self._stream is not None @property def pending_online(self) -> bool: @@ -105,6 +118,39 @@ class Channel: """ return self._pending_stream_up is not None + @property + def game(self) -> Optional[Game]: + if self._stream is not None and self._stream.game is not None: + return self._stream.game + return None + + @property + def viewers(self) -> Optional[int]: + if self._stream is not None: + return self._stream.viewers + return None + + @viewers.setter + def viewers(self, value: int): + if self._stream is not None: + self._stream.viewers = value + self.display() + + @property + def drops_enabled(self) -> Optional[bool]: + if self._stream is not None: + return self._stream.drops_enabled + return None + + def display(self): + self._twitch.gui.channels.display(self) + + def remove(self): + if self._pending_stream_up is not None: + self._pending_stream_up.cancel() + self._pending_stream_up = None + self._twitch.gui.channels.remove(self) + async def get_spade_url(self) -> str: """ To get this monstrous thing, you have to walk a chain of requests. @@ -137,10 +183,10 @@ class Channel: stream_data = response["data"]["user"] self.id = int(stream_data["id"]) # fill channel_id if stream_data["stream"]: - self.stream = Stream(self, stream_data) + self._stream = Stream(self, stream_data) else: - self.stream = None - return self.stream + self._stream = None + return self._stream async def check_online(self) -> bool: stream = await self.get_stream() @@ -149,9 +195,11 @@ class Channel: return True async def _online_delay(self): + self.display() await asyncio.sleep(ONLINE_DELAY.total_seconds()) - await self.get_stream() + await self.check_online() self._pending_stream_up = None + self.display() def set_online(self): if self.online or self.pending_online: @@ -160,22 +208,42 @@ class Channel: # stream-up is sent before the stream actually goes online, so just wait a bit # and check if it's actually online by then self._pending_stream_up = asyncio.create_task(self._online_delay()) + # display is called from within the task def set_offline(self): # to be called externally, if we receive an event about this happening if self._pending_stream_up is not None: self._pending_stream_up.cancel() self._pending_stream_up = None - self.stream = None + self._stream = None + self.display() + + async def claim_bonus(self): + # this also fills out the 'points' attribute + response = await self._twitch.gql_request( + GQL_OPERATIONS["ChannelPointsContext"].with_variables({"channelLogin": self.name}) + ) + channel_data: JsonType = response["data"]["community"]["channel"] + self.points = channel_data["self"]["communityPoints"]["balance"] + claim_available: JsonType = ( + channel_data["self"]["communityPoints"]["availableClaim"] + ) + if claim_available: + await self._twitch.claim_points(channel_data["id"], claim_available["id"]) + logger.info("Claimed bonus points") + else: + # calling claim_points is going to refresh the display, so if we're not calling it, + # we need to do it ourselves + self.display() def _encode_payload(self): - assert self.stream is not None + assert self._stream is not None payload = [ { "event": "minute-watched", "properties": { "channel_id": self.id, - "broadcast_id": self.stream.broadcast_id, + "broadcast_id": self._stream.broadcast_id, "player": "site", "user_id": self._twitch._user_id, } @@ -184,17 +252,33 @@ class Channel: json_event = json.dumps(payload, separators=(",", ":")) return {"data": (b64encode(json_event.encode("utf8"))).decode("utf8")} - async def _send_watch(self): + async def _send_watch(self) -> bool: """ This uses the encoded payload on spade url to simulate watching the stream. Optimally, send every 60 seconds to advance drops. """ if not self.online: - return + return False if self._spade_url is None: self._spade_url = await self.get_spade_url() logger.debug(f"Sending minute-watched to {self.name}") + self._twitch._last_watch = time() async with self._twitch._session.post( self._spade_url, data=self._encode_payload() ) as response: return response.status == 204 + + async def watch_loop(self): + # last_watch is a timestamp of the last time we've sent a watch payload + # We need this because watch_loop can be cancelled and rescheduled multiple times + # in quick succession, and apparently Twitch doesn't like that very much + interval = WATCH_INTERVAL.total_seconds() + await asyncio.sleep(self._twitch._last_watch + interval - time()) + i = 0 + while True: + await self._send_watch() + if i == 0: + # ensure every 30 minutes that we don't have unclaimed points bonus + await self.claim_bonus() + i = (i + 1) % 30 + await asyncio.sleep(interval) diff --git a/constants.py b/constants.py index df1d0ac..fcf4095 100644 --- a/constants.py +++ b/constants.py @@ -1,6 +1,7 @@ from __future__ import annotations from copy import copy +from enum import Enum, auto from datetime import timedelta from typing import Any, Optional, Dict, Literal, Callable @@ -23,15 +24,22 @@ USER_AGENT = ( ) # Paths SETTINGS_PATH = "settings.json" -COOKIES_PATH = "cookies.pickle" +COOKIES_PATH = "cookies.jar" # Intervals and Delays PING_INTERVAL = timedelta(minutes=3) PING_TIMEOUT = timedelta(seconds=10) -ONLINE_DELAY = timedelta(seconds=30) +ONLINE_DELAY = timedelta(seconds=60) +WATCH_INTERVAL = timedelta(seconds=58.5) # Tags DROPS_ENABLED_TAG = "c2542d6d-cd10-4532-919b-3d19f30a768b" -# Strings -TERMINATED_STR = "Application Terminated.\nClose the console window to exit the application." + + +class State(Enum): + INVENTORY_FETCH = auto() + GAME_SELECT = auto() + CHANNEL_FETCH = auto() + CHANNEL_CLEANUP = auto() + CHANNEL_SWITCH = auto() class GQLOperation(JsonType): @@ -83,6 +91,10 @@ GQL_OPERATIONS: Dict[str, GQLOperation] = { "Inventory", "e0765ebaa8e8eeb4043cc6dfeab3eac7f682ef5f724b81367e6e55c7aef2be4c", ), + "CurrentDrop": GQLOperation( + "DropCurrentSessionContext", + "2e4b3630b91552eb05b76a94b6850eb25fe42263b7cf6d06bee6d156dd247c1c", + ), "ViewerDropsDashboard": GQLOperation( "ViewerDropsDashboard", "c4d61d7b71d03b324914d3cf8ca0bc23fe25dacf54120cc954321b9704a3f4e2", diff --git a/gui.py b/gui.py new file mode 100644 index 0000000..4527119 --- /dev/null +++ b/gui.py @@ -0,0 +1,827 @@ +from __future__ import annotations + +import asyncio +import logging +import tkinter as tk +from math import log10, ceil +from tkinter.font import Font +from collections import namedtuple, OrderedDict +from tkinter import Tk, ttk, StringVar, DoubleVar +from typing import Any, Optional, List, Dict, Set, TypedDict, Iterable, NoReturn, TYPE_CHECKING + +from version import __version__ +from constants import WS_TOPICS_LIMIT, MAX_WEBSOCKETS, State + +if TYPE_CHECKING: + from twitch import Twitch + from channel import Channel + from inventory import Game, TimedDrop + + +digits = ceil(log10(WS_TOPICS_LIMIT)) +WS_FONT = ("Courier New", 10) + + +class TKOutputHandler(logging.Handler): + def __init__(self, output: GUIManager): + super().__init__() + self._output = output + + def emit(self, record): + self._output.print(self.format(record)) + + +class PlaceholderEntry(ttk.Entry): + def __init__( + self, + master, + *args, + placeholder: str, + placeholdercolor: str = "grey60", + **kwargs, + ): + super().__init__(master, *args, **kwargs) + self._show: str = kwargs.get("show", '') + self._text_color: str = kwargs.get("foreground", '') + self._ph_color: str = placeholdercolor + self._ph_text: str = placeholder + self.bind("", self._focus_in) + self.bind("", self._focus_out) + self._ph: bool = False + self._focus_out(None) + + def _focus_in(self, event): + """ + On focus in, if we've had a placeholder, clear the box and set normal text colour and show. + """ + if self._ph: + self._ph = False + self.config(foreground=self._text_color, show=self._show) + self.delete(0, "end") + + def _focus_out(self, event): + """ + On focus out, if we're empty, insert a placeholder, + set placeholder text color and make sure it's shown. + If we're not empty, leave the box as is. + """ + if not super().get(): + self._ph = True + self.config(foreground=self._ph_color, show='') + self.insert(0, self._ph_text) + + def _store_option(self, options: Dict[str, Any], attr: str, name: str): + value = options.get(name) + if value is not None: + setattr(self, attr, value) + + def configure(self, *args, **kwargs): + if args: + options = args[0] + if kwargs: + options = kwargs + self._store_option(options, "_show", "show") + self._store_option(options, "_ph_text", "placeholder") + self._store_option(options, "_text_color", "foreground") + self._store_option(options, "_ph_color", "placeholdercolor") + super().configure(*args, *kwargs) + + def get(self): + if self._ph: + return '' + return super().get() + + def clear(self): + self.delete(0, "end") + self._ph = True + self.config(foreground=self._ph_color, show='') + self.insert(0, self._ph_text) + + +class _WSEntry(TypedDict): + status: str + topics: int + + +class WebsocketStatus: + def __init__(self, manager: GUIManager, master: tk.Misc): + self._status_var = StringVar() + self._topics_var = StringVar() + frame = ttk.LabelFrame(master, text="Websocket Status", padding=(4, 0, 4, 4)) + frame.grid(column=0, row=0, sticky="nsew", padx=2) + ttk.Label( + frame, + text='\n'.join(f"Websocket #{i}:" for i in range(1, MAX_WEBSOCKETS + 1)), + font=WS_FONT, + ).grid(column=0, row=0) + ttk.Label( + frame, + textvariable=self._status_var, + width=16, + justify="left", + font=WS_FONT, + ).grid(column=1, row=0) + ttk.Label( + frame, + textvariable=self._topics_var, + width=(digits * 2 + 1), + justify="right", + font=WS_FONT, + ).grid(column=2, row=0) + self._items: Dict[int, Optional[_WSEntry]] = {i: None for i in range(MAX_WEBSOCKETS)} + self._update() + + def update(self, idx: int, status: Optional[str] = None, topics: Optional[int] = None): + if status is None and topics is None: + raise TypeError("You need to provide at least one of: status, topics") + entry = self._items.get(idx) + if entry is None: + entry = self._items[idx] = _WSEntry(status="Disconnected", topics=0) + if status is not None: + entry["status"] = status + if topics is not None: + entry["topics"] = topics + self._update() + + def remove(self, idx: int): + if idx in self._items: + del self._items[idx] + + def _update(self): + status_lines: List[str] = [] + topic_lines: List[str] = [] + for idx in range(MAX_WEBSOCKETS): + item = self._items.get(idx) + if item is None: + status_lines.append('') + topic_lines.append('') + else: + status_lines.append(item["status"]) + topic_lines.append(f"{item['topics']:>{digits}}/{WS_TOPICS_LIMIT}") + self._status_var.set('\n'.join(status_lines)) + self._topics_var.set('\n'.join(topic_lines)) + + +LoginData = namedtuple("LoginData", ["username", "password", "token"]) + + +class LoginForm: + def __init__(self, manager: GUIManager, master: tk.Misc): + self._manager = manager + self._var = StringVar() + frame = ttk.LabelFrame(master, text="Login Status", padding=(4, 0, 4, 4)) + frame.grid(column=1, row=0, sticky="nsew", padx=2) + frame.columnconfigure(0, weight=2) + frame.columnconfigure(1, weight=1) + frame.rowconfigure(4, weight=1) + ttk.Label(frame, text=("Status:\nUser ID:")).grid(column=0, row=0) + ttk.Label(frame, textvariable=self._var, justify="center").grid(column=1, row=0) + self._login_entry = PlaceholderEntry(frame, placeholder="Login") + self._login_entry.grid(column=0, row=1, columnspan=2) + self._pass_entry = PlaceholderEntry(frame, placeholder="Password", show='•') + self._pass_entry.grid(column=0, row=2, columnspan=2) + self._token_entry = PlaceholderEntry(frame, placeholder="2FA Code") + self._token_entry.grid(column=0, row=3, columnspan=2) + self._confirm = asyncio.Event() + self._button = ttk.Button(frame, text="Login", command=self._confirm.set, state="disabled") + self._button.grid(column=0, row=4, columnspan=2) + self.update("Logged out", None) + + def clear(self, login: bool = False, password: bool = False, token: bool = False): + clear_all = not login and not password and not token + if login or clear_all: + self._login_entry.clear() + if password or clear_all: + self._pass_entry.clear() + if token or clear_all: + self._token_entry.clear() + + async def ask_login(self) -> LoginData: + self._manager.print("Please log in.") + self._confirm.clear() + self._button.config(state="normal") + await self._confirm.wait() + self._button.config(state="disabled") + data = LoginData(self._login_entry.get(), self._pass_entry.get(), self._token_entry.get()) + return data + + def update(self, status: str, user_id: Optional[int]): + if user_id is not None: + user_str = str(user_id) + else: + user_str = "-" + self._var.set(f"{status}\n{user_str}") + + +class GameSelector: + def __init__(self, manager: GUIManager, master: tk.Misc): + self._manager = manager + self._var = StringVar() + frame = ttk.LabelFrame(master, text="Game Selector", padding=(4, 0, 4, 4)) + frame.grid(column=1, row=1, sticky="nsew", padx=2) + frame.columnconfigure(0, weight=1) + self._list = tk.Listbox( + frame, + height=5, + selectmode="single", + activestyle="none", + exportselection=False, + highlightthickness=0, + ) + self._list.pack(fill="both", expand=True) + self._selection: Optional[str] = self._manager._twitch._options.game + self._games: OrderedDict[str, Game] = OrderedDict() + self._list.bind("<>", self._on_select) + + @property + def selected(self) -> Optional[str]: + return self._selection + + def set_games(self, games: Iterable[Game]): + self._games.clear() + self._games.update((str(g), g) for g in sorted(games, key=lambda g: g.name)) + self._list.delete(0, "end") + self._list.insert("end", *self._games.keys()) + self._list.config(width=0) # autoadjust listbox width + if self._selection is not None: + selected_index: Optional[int] = next( + ( + i + for i, str_game in enumerate(self._games.keys()) + if str_game == self._selection + ), + None, + ) + if selected_index is not None: + # reselect the currently selected item + self._list.selection_set(selected_index) + else: + # the game we've had selected isn't there anymore - clear selection + self._selection = None + + def _on_select(self, event): + current = self._list.curselection() + if not current: + # can happen when the user clicks on an empty list + self._selection = None + else: + self._selection = self._list.get(current[0]) + + def get_selection(self) -> Game: + if self._selection is None: + if not self._games: + raise RuntimeError("No games to select from") + # select and return the first game from the list + self._list.selection_set(0) + first_game = next(iter(self._games.values())) + self._selection = str(first_game) + return first_game + return self._games[self._selection] + + def get_next_selection(self) -> Optional[Game]: + current = self._list.curselection() + if not current: + return self.get_selection() + game_name = self._list.get(current[0]+1) + if game_name: + return self._games[game_name] + else: + # this was the last game on the list + return None + + +class _BaseVars(TypedDict): + progress: DoubleVar + percentage: StringVar + remaining: StringVar + minutes: int + + +class _CampaignVars(_BaseVars): + name: StringVar + + +class _DropVars(_BaseVars): + rewards: StringVar + + +class _ProgressVars(TypedDict): + campaign: _CampaignVars + drop: _DropVars + seconds: int + + +class CampaignProgress: + BAR_LENGTH = 240 + + def __init__(self, manager: GUIManager, master: tk.Misc): + self._vars: _ProgressVars = { + "campaign": { + "name": StringVar(), # campaign name + "progress": DoubleVar(), # controls the progress bar + "percentage": StringVar(), # percentage display string + "remaining": StringVar(), # time remaining string + "minutes": 0, # remaining minutes + }, + "drop": { + "rewards": StringVar(), # drop rewards + "progress": DoubleVar(), # as above + "percentage": StringVar(), # as above + "remaining": StringVar(), # as above + "minutes": 0, # as above + }, + "seconds": 1, # remaining seconds (common for both campaign and drop) + } + self._frame = frame = ttk.LabelFrame( + master, text="Campaign Progress", padding=(4, 0, 4, 4) + ) + frame.grid(column=0, row=1, sticky="nsew", padx=2) + frame.columnconfigure(0, weight=2) + frame.columnconfigure(1, weight=1) + ttk.Label(frame, text="Campaign:").grid(column=0, row=0, columnspan=2) + ttk.Label( + frame, textvariable=self._vars["campaign"]["name"] + ).grid(column=0, row=1, columnspan=2) + ttk.Label(frame, text="Progress:").grid(column=0, row=2, rowspan=2) + ttk.Label(frame, textvariable=self._vars["campaign"]["percentage"]).grid(column=1, row=2) + ttk.Label(frame, textvariable=self._vars["campaign"]["remaining"]).grid(column=1, row=3) + ttk.Progressbar( + frame, + mode="determinate", + length=self.BAR_LENGTH, + maximum=1, + variable=self._vars["campaign"]["progress"], + ).grid(column=0, row=4, columnspan=2) + ttk.Separator( + frame, orient="horizontal" + ).grid(row=5, columnspan=2, sticky="ew", pady=(4, 0)) + ttk.Label(frame, text="Drop:").grid(column=0, row=6, columnspan=2) + ttk.Label( + frame, textvariable=self._vars["drop"]["rewards"] + ).grid(column=0, row=7, columnspan=2) + ttk.Label(frame, text="Progress:").grid(column=0, row=8, rowspan=2) + ttk.Label(frame, textvariable=self._vars["drop"]["percentage"]).grid(column=1, row=8) + ttk.Label(frame, textvariable=self._vars["drop"]["remaining"]).grid(column=1, row=9) + ttk.Progressbar( + frame, + mode="determinate", + length=self.BAR_LENGTH, + maximum=1, + variable=self._vars["drop"]["progress"], + ).grid(column=0, row=10, columnspan=2) + self._timer_task: Optional[asyncio.Task[None]] = None + self._update_time() + + def _update_time(self) -> bool: + # read vars + minutes_changed: bool = False + seconds: int = self._vars["seconds"] + drop_vars: _DropVars = self._vars["drop"] + campaign_vars: _CampaignVars = self._vars["campaign"] + drop_minutes: int = drop_vars["minutes"] + campaign_minutes: int = campaign_vars["minutes"] + # handle seconds + if seconds <= 0: + if drop_minutes > 0: + drop_minutes -= 1 + minutes_changed = True + if campaign_minutes > 0: + campaign_minutes -= 1 + minutes_changed = True + if minutes_changed: + seconds = 60 + if seconds > 0: + seconds -= 1 + # display time + hours, minutes = divmod(drop_minutes, 60) + drop_vars["remaining"].set(f"{hours:>2}:{minutes:02}:{seconds:02} remaining") + hours, minutes = divmod(campaign_minutes, 60) + campaign_vars["remaining"].set(f"{hours:>2}:{minutes:02}:{seconds:02} remaining") + # store back + self._vars["seconds"] = seconds + if minutes_changed: + drop_vars["minutes"] = drop_minutes + campaign_vars["minutes"] = campaign_minutes + # if there's no time left, stop the loop + if campaign_minutes + drop_minutes + seconds > 0: + return True + return False + + async def _timer_loop(self): + run = self._update_time() + while run: + await asyncio.sleep(1) + run = self._update_time() + self._timer_task = None + + def start_timer(self): + if self._timer_task is None: + self._vars["seconds"] = 1 + self._timer_task = asyncio.create_task(self._timer_loop()) + + def stop_timer(self): + if self._timer_task is not None: + self._timer_task.cancel() + self._timer_task = None + + def restart_timer(self): + self.stop_timer() + self.start_timer() + + def update(self, drop: TimedDrop): + # campaign update + campaign = drop.campaign + vars_campaign = self._vars["campaign"] + vars_campaign["name"].set(campaign.name) + vars_campaign["progress"].set(campaign.progress) + vars_campaign["percentage"].set( + f"{campaign.progress:6.1%} ({campaign.claimed_drops}/{campaign.total_drops})" + ) + vars_campaign["minutes"] = campaign.remaining_minutes + # drop update + vars_drop = self._vars["drop"] + vars_drop["rewards"].set(drop.rewards_text()) + vars_drop["progress"].set(drop.progress) + vars_drop["percentage"].set(f"{drop.progress:6.1%}") + vars_drop["minutes"] = drop.remaining_minutes + # reschedule our seconds update timer + self.restart_timer() + + +class ConsoleOutput: + def __init__(self, manager: GUIManager, master: tk.Misc): + frame = ttk.LabelFrame(master, text="Output", padding=(4, 0, 4, 4)) + frame.grid(column=0, row=2, columnspan=2, sticky="nsew", padx=2) + frame.rowconfigure(0, weight=1) # let the frame expand + frame.columnconfigure(0, weight=1) + master.rowconfigure(2, weight=1) # tell master frame that the containing row can expand + xscroll = ttk.Scrollbar(frame, orient="horizontal") + yscroll = ttk.Scrollbar(frame, orient="vertical") + self._text = tk.Text( + frame, + exportselection=False, + height=10, + width=52, + wrap="none", + state="disabled", + xscrollcommand=xscroll.set, + yscrollcommand=yscroll.set, + ) + xscroll.config(command=self._text.xview) + yscroll.config(command=self._text.yview) + self._text.grid(column=0, row=0, sticky="nsew") + xscroll.grid(column=0, row=1, sticky="ew") + yscroll.grid(column=1, row=0, sticky="ns") + + def print(self, *values, sep: str = ' ', end: str = '\n'): + self._text.config(state="normal") + self._text.insert("end", f"{sep.join(values)}{end}") + self._text.see("end") # scroll to the newly added line + self._text.config(state="disabled") + + +class Buttons(TypedDict): + frame: ttk.Frame + cleanup: ttk.Button + switch: ttk.Button + load_points: ttk.Button + + +class ChannelList: + def __init__(self, manager: GUIManager, master: tk.Misc): + self._manager = manager + frame = ttk.LabelFrame(master, text="Channels", padding=(4, 0, 4, 4)) + frame.grid(column=2, row=0, rowspan=3, sticky="nsew", padx=2) + frame.rowconfigure(1, weight=1) + frame.columnconfigure(0, weight=1) + # tell master frame that the containing column can expand + master.columnconfigure(2, weight=1) + buttons_frame = ttk.Frame(frame) + self._buttons: Buttons = { + "frame": buttons_frame, + "cleanup": ttk.Button( + buttons_frame, + text="Cleanup", + command=manager._twitch.state_change(State.CHANNEL_CLEANUP), + ), + "switch": ttk.Button( + buttons_frame, + text="Switch", + state="disabled", + command=manager._twitch.state_change(State.CHANNEL_SWITCH), + ), + "load_points": ttk.Button( + buttons_frame, text="Load Points", command=self._load_points + ), + } + buttons_frame.grid(column=0, row=0, columnspan=2) + self._buttons["cleanup"].grid(column=0, row=0) + self._buttons["switch"].grid(column=1, row=0) + self._buttons["load_points"].grid(column=2, row=0) + scroll = ttk.Scrollbar(frame, orient="vertical") + self._table = table = ttk.Treeview( + frame, + columns=("channel", "status", "game", "viewers", "points"), + yscrollcommand=scroll.set, + ) + scroll.config(command=table.yview) + table.grid(column=0, row=1, sticky="nsew") + scroll.grid(column=1, row=1, sticky="ns") + self._font = Font(frame, manager._style.lookup("Treeview", "font")) + self._const_width: Set[str] = set() + table.tag_configure("watching", background="gray70") + table.bind("", self._disable_column_resize) + table.bind("<>", self._selected) + self._column("#0", '', width=0) + self._column("channel", "Channel", width=100, anchor='w') + self._column("status", "Status", width_template="OFFLINE ❌") + self._column("game", "Game", width=50) + self._column("viewers", "Viewers", width_template="0000000") + self._column("points", "Points", width_template="0000000") + self._channel_map: Dict[str, Channel] = {} + + def _column( + self, + cid: str, + name: str, + *, + anchor: str = "center", + width: Optional[int] = None, + width_template: Optional[str] = None, + ): + if width_template is not None: + width = self._measure(width_template) + self._const_width.add(cid) + assert width is not None + self._table.column(cid, width=width, stretch=False) + self._table.heading(cid, text=name, anchor=anchor) + + def _disable_column_resize(self, event): + if self._table.identify_region(event.x, event.y) == "separator": + return "break" + + def _selected(self, event): + selection = self._table.selection() + if selection: + self._buttons["switch"].config(state="normal") + else: + self._buttons["switch"].config(state="disabled") + + def _load_points(self): + # disable the button afterwards + self._buttons["load_points"].config(state="disabled") + asyncio.gather(*(ch.claim_bonus() for ch in self._manager._twitch.channels.values())) + + def _measure(self, text: str) -> int: + # we need this because columns have 9-10 pixels of padding that cuts text off + return self._font.measure(text) + 10 + + def _adjust_width(self, column: str, value: str): + # causes the column to expand if the value's width is greater than the current width + if column in self._const_width: + return + value_width = self._measure(value) + curr_width = self._table.column(column, "width") + if value_width > curr_width: + self._table.column(column, minwidth=value_width, width=value_width) + self._table.event_generate("<>") # force redraw + + def _set(self, iid: str, column: str, value: str): + self._table.set(iid, column, value) + self._adjust_width(column, value) + + def _insert(self, iid: str, *args: str): + self._table.insert(parent='', index="end", iid=iid, values=args) + for column, value in zip(self._table.cget("columns"), args): + self._adjust_width(column, value) + + def clear_watching(self): + for iid in self._table.tag_has("watching"): + self._table.item(iid, tags='') + + def set_watching(self, channel: Channel): + self.clear_watching() + self._table.item(channel.iid, tags="watching") + + def get_selection(self) -> Optional[Channel]: + if not self._channel_map: + return None + selection = self._table.selection() + if not selection: + return None + return self._channel_map[selection[0]] + + def clear_selection(self): + self._table.selection_set('') + + def display(self, channel: Channel): + # status + if channel.online: + status_str = "ONLINE ✅" + elif channel.pending_online: + status_str = "OFFLINE ⏰" + else: + status_str = "OFFLINE ❌" + # game + game_str = str(channel.game or '') + # viewers + viewers_str = '' + if channel.viewers is not None: + viewers_str = str(channel.viewers) + # points + points_str = '' + if channel.points is not None: + points_str = str(channel.points) + iid = channel.iid + if self._table.exists(iid): + self._set(iid, "status", status_str) + self._set(iid, "game", game_str) + self._set(iid, "viewers", viewers_str) + if points_str: + self._set(iid, "points", points_str) + else: + self._channel_map[iid] = channel + self._insert(iid, channel.name, status_str, game_str, viewers_str, points_str) + + def remove(self, channel: Channel): + iid = channel.iid + del self._channel_map[iid] + self._table.delete(iid) + + +class GUIManager: + def __init__(self, twitch: Twitch): + self._twitch: Twitch = twitch + self._poll_task: Optional[asyncio.Task[NoReturn]] = None + self._closed = asyncio.Event() + self._root = root = Tk() + root.resizable(False, True) + root.iconbitmap("pickaxe.ico") # window icon + root.title(f"Twitch Drops Miner v{__version__} (by DevilXD)") # window title + root.protocol("WM_DELETE_WINDOW", self._on_close) + root.bind_all("", self.unfocus) + self._style = ttk.Style(root) + self._style.map( + "Treeview", + foreground=self._fixed_map("foreground"), + background=self._fixed_map("background"), + ) + main_frame = ttk.Frame(root, padding=8) + main_frame.grid(sticky="nsew") + root.rowconfigure(0, weight=1) + root.columnconfigure(0, weight=1) + self.websockets = WebsocketStatus(self, main_frame) + self.login = LoginForm(self, main_frame) + self.progress = CampaignProgress(self, main_frame) + self.games = GameSelector(self, main_frame) + self.output = ConsoleOutput(self, main_frame) + self.channels = ChannelList(self, main_frame) + # clamp minimum window height (update first, so that geometry calculates the size) + root.update_idletasks() + root.minsize(width=0, height=root.winfo_reqheight()) + # register logging handler + handler = TKOutputHandler(self) + handler.setFormatter( + logging.Formatter("{asctime}: {levelname}: {message}", style='{', datefmt="%H:%M:%S") + ) + logging.getLogger("TwitchDrops").addHandler(handler) + + # https://stackoverflow.com/questions/56329342/tkinter-treeview-background-tag-not-working + def _fixed_map(self, option): + # Fix for setting text colour for Tkinter 8.6.9 + # From: https://core.tcl.tk/tk/info/509cafafae + # + # Returns the style map for 'option' with any styles starting with + # ('!disabled', '!selected', ...) filtered out. + + # style.map() returns an empty list for missing options, so this + # should be future-safe. + return [ + elm for elm in self._style.map("Treeview", query_opt=option) + if elm[:2] != ("!disabled", "!selected") + ] + + @property + def running(self) -> bool: + return self._poll_task is not None + + @property + def close_requested(self) -> bool: + return self._closed.is_set() + + def start(self): + if self._poll_task is None: + self._poll_task = asyncio.create_task(self._poll()) + # self.progress.start_timer() + + def stop(self): + self.progress.stop_timer() + if self._poll_task is not None: + self._poll_task.cancel() + self._poll_task = None + + async def _poll(self): + """ + This runs the Tkinter event loop via asyncio instead of calling mainloop. + 0.05s gives similar performance and CPU usage. + Not ideal, but the simplest way to avoid threads, thread safety, + loop.call_soon_threadsafe, futures and all of that. + """ + update = self._root.update + while True: + update() + await asyncio.sleep(0.05) + + def unfocus(self, event): + self._root.focus_set() + self.channels.clear_selection() + + def _on_close(self): + self._closed.set() + # notify client we're supposed to close + self._twitch.request_close() + + def prevent_close(self): + self._closed.clear() + + async def wait_until_closed(self): + # wait until the user closes the window + await self._closed.wait() + + def close(self): + self.stop() + if self._root is not None: + self._root.destroy() + self._closed.set() + + def print(self, *args, **kwargs): + # print to our custom output + self.output.print(*args, **kwargs) + + +if __name__ == "__main__": + # Everything below is for debug purposes only + from types import SimpleNamespace + + class StrNamespace(SimpleNamespace): + def __str__(self): + if hasattr(self, "_str__"): + return self._str__(self) + return super().__str__() + + def state_change(state: State): + def changer(state: State = state): + gui.print(f"State change: {state.value}") + return changer + + gui: GUIManager + mock = SimpleNamespace( + _options=SimpleNamespace(game=None), + state_change=state_change, + ) + gui = GUIManager(mock) # type: ignore + mock.request_close = gui._root.destroy + + def create_game(id: int, name: str): + return StrNamespace(name=name, id=id, _str__=lambda s: s.name) + + iid = 0 + + def create_channel(name: str, online: int, game: Optional[str], viewers: int, points: int): + if online == 1: + online = False + pending = True + else: + pending = False + if game is not None: + game_obj: Optional[StrNamespace] = create_game(0, game) + else: + game_obj = None + global iid + return SimpleNamespace( + name=name, + iid=(iid := iid + 1), + points=points, + online=bool(online), + pending_online=pending, + game=game_obj, + viewers=viewers, + ) + + # Game selctor + gui.games.set_games([ + create_game(491115, "Paladins"), + create_game(460630, "Tom Clancy's Rainbow Six Siege"), + ]) + game = gui.games.get_next_selection() + game = gui.games.get_next_selection() + game = gui.games.get_next_selection() + # Channel list + gui.channels.display(create_channel("PaladinsGame", 0, None, 0, 0)) + channel = create_channel("Traitus", 1, None, 0, 0) + gui.channels.display(channel) + gui.channels.display(create_channel("Testus", 2, "Paladins", 42, 1234567)) + gui.channels.set_watching(channel) + gui._root.update() + gui.channels.get_selection() + gui._root.mainloop() diff --git a/inventory.py b/inventory.py index 3a90c66..dc78ee8 100644 --- a/inventory.py +++ b/inventory.py @@ -17,6 +17,9 @@ class Game: def __str__(self) -> str: return self.name + def __repr__(self) -> str: + return f"Game({self.id}, {self.name})" + def __eq__(self, other: object): if isinstance(other, self.__class__): return self.id == other.id @@ -93,15 +96,14 @@ class TimedDrop(BaseDrop): @property def remaining_minutes(self) -> int: - return self.required_minutes - self.current_minutes + return self.required_minutes - self.current_minutes + 1 @property def progress(self) -> float: return self.current_minutes / self.required_minutes def update(self, message: JsonType): - # {"type": "drop-progress", data: {"current_progress_min": 3, "required_progress_min": 10}} - # {"type": "drop-claim", data: {"drop_instance_id": ...}} + # See Twitch.process_drop for message examples msg_type = message["type"] if msg_type == "drop-progress": self.current_minutes = message["data"]["current_progress_min"] @@ -109,6 +111,10 @@ class TimedDrop(BaseDrop): elif msg_type == "drop-claim": self.claim_id = message["data"]["drop_instance_id"] + def bump_minutes(self): + if self.current_minutes < self.required_minutes: + self.current_minutes += 1 + class DropsCampaign: def __init__(self, twitch: Twitch, data: JsonType): @@ -149,3 +155,9 @@ class DropsCampaign: def get_drop(self, drop_id: str) -> Optional[TimedDrop]: return self.timed_drops.get(drop_id) + + def get_active_drop(self) -> Optional[TimedDrop]: + for drop in self.timed_drops.values(): + if drop.can_earn: + return drop + return None diff --git a/main.py b/main.py index f969e11..8aa5cfa 100644 --- a/main.py +++ b/main.py @@ -1,51 +1,18 @@ from __future__ import annotations -__version__ = 4 - -import sys -import json -import ctypes import logging -import asyncio import argparse -import warnings -import traceback -import threading -from typing import NoReturn +from typing import Optional from twitch import Twitch -from constants import JsonType, SETTINGS_PATH, TERMINATED_STR - -try: - import win32api - from win32con import CTRL_CLOSE_EVENT, CTRL_LOGOFF_EVENT, CTRL_SHUTDOWN_EVENT -except (ImportError, ModuleNotFoundError): - raise ImportError("You have to run 'python -m pip install pywin32' first") - -# disable some warnings -warnings.simplefilter("ignore", RuntimeWarning) -warnings.simplefilter("ignore", DeprecationWarning) -# nice console title -try: - ctypes.windll.kernel32.SetConsoleTitleW(f"Twitch Drops Miner v{__version__} (by DevilXD)") -except AttributeError: - # ensure we're on windows and there was no import problems - print("Only Windows supported!") - quit() -assert sys.platform == "win32" - - -def terminate() -> NoReturn: - forever = threading.Event() - print(f"\n{TERMINATED_STR}") - forever.wait() - raise RuntimeError("Uh oh") # this will never run, solely for MyPy +from version import __version__ class ParsedArgs(argparse.Namespace): _verbose: int _debug_ws: bool _debug_gql: bool + game: Optional[str] @property def logging_level(self) -> int: @@ -87,6 +54,7 @@ parser.add_argument("-V", "--version", action="version", version=f"v{__version__ parser.add_argument("-v", dest="_verbose", action="count", default=0) parser.add_argument("--debug-ws", dest="_debug_ws", action="store_true") parser.add_argument("--debug-gql", dest="_debug_gql", action="store_true") +parser.add_argument("-g", "--game", default=None) options: ParsedArgs = parser.parse_args(namespace=ParsedArgs()) # handle logging stuff if options.logging_level > logging.DEBUG: @@ -94,62 +62,9 @@ if options.logging_level > logging.DEBUG: # that aren't ours. This always runs, unless the main logging level is DEBUG or below. logging.getLogger().addHandler(logging.NullHandler()) logger = logging.getLogger("TwitchDrops") -handler = logging.StreamHandler(sys.stdout) -handler.setFormatter(logging.Formatter("{levelname}: {message}", style='{')) -logger.addHandler(handler) logger.setLevel(options.logging_level) logging.getLogger("TwitchDrops.gql").setLevel(options.debug_gql) logging.getLogger("TwitchDrops.websocket").setLevel(options.debug_ws) -# handle settings -try: - with open(SETTINGS_PATH, 'r', encoding="utf8") as file: - settings: JsonType = json.load(file) -except json.JSONDecodeError as exc: - print(f"Error while reading the settings file:\n{str(exc)}") - terminate() -except FileNotFoundError: - settings = {} -# asyncio loop -loop = asyncio.get_event_loop() -# client init -client = Twitch(settings.get("username"), settings.get("password")) -# main task and it's close event -main_task = loop.create_task(client.run(settings.get("channels"))) -close_event = threading.Event() - - -def clean_exit(code: int): - if code not in (CTRL_CLOSE_EVENT, CTRL_LOGOFF_EVENT, CTRL_SHUTDOWN_EVENT): - # filter only events we want - return False - # cancel the main task - this triggers the cleanup - main_task.cancel() - # wait until cleanup completes - close_event.wait() - # tell OS that we're free to exit now - return True - - -# ensures clean exit upon closing the console -win32api.SetConsoleCtrlHandler(clean_exit, True) -try: - loop.run_until_complete(main_task) -except (asyncio.CancelledError, KeyboardInterrupt): - # KeyboardInterrupt causes run_until_complete to exit, but without cancelling the task. - # The loop stops and thus the task gets frozen, until the loop runs again. - # Because we don't want anything from there to actually run during cleanup, - # we need to explicitly cancel the task ourselves here. - main_task.cancel() - # main_task was cancelled due to program shutting down - do the cleanup - loop.run_until_complete(client.close()) - # notify we're free to exit - close_event.set() -except Exception: - # Remove the handler so it doesn't delay exit - win32api.SetConsoleCtrlHandler(clean_exit, False) - print("Fatal error encountered:\n") - traceback.print_exc() - terminate() -finally: - loop.run_until_complete(loop.shutdown_asyncgens()) - loop.close() +# client run +client = Twitch(options) +client.start() diff --git a/twitch.py b/twitch.py index 08cc798..3829d43 100644 --- a/twitch.py +++ b/twitch.py @@ -1,12 +1,14 @@ from __future__ import annotations import os -import msvcrt import asyncio import logging +import traceback from yarl import URL +from time import time +from itertools import chain from functools import partial -from typing import Any, Optional, Union, List, Dict, Collection, cast +from typing import Any, Callable, Iterable, Optional, Union, List, Dict, Set, cast, TYPE_CHECKING try: import aiohttp @@ -14,10 +16,12 @@ except ImportError: raise ImportError("You have to run 'python -m pip install aiohttp' first") from channel import Channel -from websocket import WebsocketPool -from inventory import DropsCampaign, Game +from gui import GUIManager, LoginData +from websocket import WebsocketPool, task_wrapper +from inventory import DropsCampaign, Game, TimedDrop from exceptions import LoginException, CaptchaRequired from constants import ( + State, JsonType, WebsocketTopic, CLIENT_ID, @@ -27,66 +31,119 @@ from constants import ( GQL_URL, GQL_OPERATIONS, DROPS_ENABLED_TAG, - TERMINATED_STR, - MAX_WEBSOCKETS, - WS_TOPICS_LIMIT, GQLOperation, ) +if TYPE_CHECKING: + from main import ParsedArgs + logger = logging.getLogger("TwitchDrops") gql_logger = logging.getLogger("TwitchDrops.gql") class Twitch: - def __init__( - self, - username: Optional[str] = None, - password: Optional[str] = None, - *, - loop: Optional[asyncio.AbstractEventLoop] = None, - ): - self.username: Optional[str] = username - self.password: Optional[str] = password + def __init__(self, options: ParsedArgs): + self._options = options + # GUI + self.gui = GUIManager(self) # Cookies, session and auth cookie_jar = aiohttp.CookieJar() if os.path.isfile(COOKIES_PATH): cookie_jar.load(COOKIES_PATH) self._session = aiohttp.ClientSession( - cookie_jar=cookie_jar, headers={"User-Agent": USER_AGENT}, loop=loop + cookie_jar=cookie_jar, headers={"User-Agent": USER_AGENT} ) self._access_token: Optional[str] = None self._user_id: Optional[int] = None self._is_logged_in = asyncio.Event() - # Storing, watching and changing channels + # State management + self._state: State = State.INVENTORY_FETCH + self._state_change = asyncio.Event() + self.inventory: List[DropsCampaign] = [] # inventory + # Storing and watching channels self.channels: Dict[int, Channel] = {} self._watching_channel: Optional[Channel] = None self._watching_task: Optional[asyncio.Task[Any]] = None - self._channel_change = asyncio.Event() - # Inventory - self.inventory: List[DropsCampaign] = [] - self._campaign_change = asyncio.Event() + self._last_watch = time() - 60 # Websocket self.websocket = WebsocketPool(self) + # Runner task + self._main_task: Optional[asyncio.Task[None]] = None def wait_until_login(self): return self._is_logged_in.wait() - def reevaluate_campaigns(self): - self._campaign_change.set() + def change_state(self, state: State) -> None: + self._state = state + self._state_change.set() + + def state_change(self, state: State) -> Callable[[], None]: + # this is identical to change_state, but defers the call + # perfect for GUI usage + return partial(self.change_state, state) + + def request_close(self): + """ + Called when the application is requested to close, + usually by the console or application window being closed. + """ + self.stop() + + def start(self): + self._loop = loop = asyncio.get_event_loop() + self._main_task = loop.create_task(self._run()) + + try: + loop.run_until_complete(self._main_task) + except asyncio.CancelledError: + # happens when the user requests close + pass + except KeyboardInterrupt: + # KeyboardInterrupt causes run_until_complete to exit, but without cancelling the task. + # The loop stops and thus the task gets frozen, until the loop runs again. + # Because we don't want anything from there to actually run during cleanup, + # we need to explicitly cancel the task ourselves here. + self.stop() + except CaptchaRequired: + self.gui.prevent_close() + self.gui.print( + "Your login attempt was denied by CAPTCHA.\nPlease try again in +12 hours." + ) + except Exception: + self.gui.prevent_close() + self.gui.print("Fatal error encountered:\n") + self.gui.print(traceback.format_exc()) + finally: + loop.run_until_complete(self.close()) + loop.run_until_complete(loop.shutdown_asyncgens()) + if not self.gui.close_requested: + self.gui.print( + "\nApplication Terminated.\nClose the window to exit the application." + ) + loop.run_until_complete(self.gui.wait_until_closed()) + loop.close() + + def stop(self): + if self._main_task is not None: + self._main_task.cancel() + self._main_task = None async def close(self): - print("Exiting...") - self._session.cookie_jar.save(COOKIES_PATH) # type: ignore + start_time = time() + self.gui.print("Exiting...") self.stop_watching() + self._session.cookie_jar.save(COOKIES_PATH) # type: ignore await self._session.close() await self.websocket.stop() - await asyncio.sleep(1) # allows aiohttp to safely close the session + # wait at least one full second + whatever it takes to complete the closing + # this allows aiohttp to safely close the session + await asyncio.sleep(start_time + 1 - time()) - def is_currently_watching(self, channel: Channel) -> bool: + def is_watching(self, channel: Channel) -> bool: return self._watching_channel is not None and self._watching_channel == channel - async def run(self, channel_names: Optional[List[str]] = None): + async def _run(self): """ Main method that runs the whole client. @@ -95,129 +152,130 @@ class Twitch: • Selecting a stream to watch, and watching it • Changing the stream that's being watched if necessary """ + self.gui.start() + await self.check_login() + # Add default topics + assert self._user_id is not None + self.websocket.add_topics([ + WebsocketTopic("User", "Drops", self._user_id, self.process_drops), + WebsocketTopic("User", "CommunityPoints", self._user_id, self.process_points), + ]) + await self.websocket.start() + games: Set[Game] = set() + selected_game: Optional[Game] = None + self.change_state(State.INVENTORY_FETCH) while True: - # Claim the drops we can - self.inventory = await self.get_inventory() - games = set() - for campaign in self.inventory: - if campaign.status == "UPCOMING": - # we have no use in processing upcoming campaigns here - continue - for drop in campaign.timed_drops.values(): - if drop.can_earn: - games.add(campaign.game) - if drop.can_claim: - await drop.claim() - # 'games' now has all games we want to farm drops for - # if it's empty, there's no point in continuing - if not games: - print(f"No active campaigns to farm drops for.\n\n{TERMINATED_STR}") - await asyncio.Future() - # Start our websocket connection, only after we confirm that there are drops to mine - await self.websocket.start() - if not channel_names: - # get a list of all channels with drops enabled - print("Fetching suitable live channels to watch...") - live_streams: Dict[Game, List[Channel]] = await self.get_live_streams( - games, [DROPS_ENABLED_TAG] - ) - for game, channels in live_streams.items(): - for channel in channels: - if channel.id not in self.channels: - self.channels[channel.id] = channel - print(f"Added channel: {channel.name} for game: {game.name}") - else: - # Fetch information about all channels we're supposed to handle - for channel_name in channel_names: - channel: Channel = await Channel(self, channel_name) # type: ignore - self.channels[channel.id] = channel - # Sub to these channel updates - topics: List[WebsocketTopic] = [ - WebsocketTopic("Channel", "VideoPlayback", channel_id, self.process_stream_state) - for channel_id in self.channels - ] - self.websocket.add_topics(topics) - self._campaign_change.clear() - # Repeat: Change into a channel we can watch, then reset the flag - self._channel_change.set() - refresh_channels = False # we're entering having fresh channel data already - while True: - # wait for either the change channel signal, or campaign change signal - await asyncio.wait( - ( - self._channel_change.wait(), - self._campaign_change.wait(), - ), - return_when=asyncio.FIRST_COMPLETED, - ) - if self._campaign_change.is_set(): - # we need to reevaluate all campaigns - # stop watching - self.stop_watching() - # close the websocket - await self.websocket.stop() - break # cycle the outer loop - # otherwise, it was the channel change one - for channel in self.channels.values(): - if ( - channel.stream is not None # steam online - and channel.stream.game is not None # there's game information - and channel.stream.drops_enabled # drops are enabled - and channel.stream.game in games # it's a game we can earn drops in - ): - self.watch(channel) - refresh_channels = True - self._channel_change.clear() - break - else: - # there's no available channel to watch - if refresh_channels: - # refresh the status of all channels, - # to make sure that our websocket didn't miss anything til this point - print("No suitable channel to watch, refreshing...") - for channel in self.channels.values(): - await channel.get_stream() - await asyncio.sleep(0.5) - refresh_channels = False + if self._state is State.INVENTORY_FETCH: + # Claim the drops we can + await self.fetch_inventory() + games.clear() + for campaign in self.inventory: + if campaign.status == "UPCOMING": + # we have no use in processing upcoming campaigns here continue - print("No suitable channel to watch, retrying in 120 seconds") - await asyncio.sleep(120) + for drop in campaign.timed_drops.values(): + if drop.can_earn: + games.add(campaign.game) + if drop.can_claim: + await drop.claim() + self.change_state(State.GAME_SELECT) + elif self._state is State.GAME_SELECT: + # 'games' has all games we want to farm drops for + # if it's empty, there's no point in continuing + if not games: + self.gui.print("No active campaigns to farm drops for.") + return + self.gui.games.set_games(games) + selected_game = self.gui.games.get_selection() + self.change_state(State.CHANNEL_FETCH) + elif self._state is State.CHANNEL_FETCH: + if selected_game is None: + self.change_state(State.GAME_SELECT) + else: + # get a list of all live channels with drops enabled + live_streams: List[Channel] = await self.get_live_streams( + selected_game, [DROPS_ENABLED_TAG] + ) + # filter out ones we already have + live_streams = [ch for ch in live_streams if ch.id not in self.channels] + for channel in live_streams: + self.channels[channel.id] = channel + channel.display() + # load points + # asyncio.gather(*(channel.claim_bonus() for channel in live_streams)) + # Sub to these channel updates + topics: List[WebsocketTopic] = [ + WebsocketTopic( + "Channel", "VideoPlayback", channel_id, self.process_stream_state + ) + for channel_id in self.channels + ] + self.websocket.add_topics(topics) + self.change_state(State.CHANNEL_SWITCH) + elif self._state is State.CHANNEL_CLEANUP: + # remove all channels that are offline, + # or aren't streaming the game we want anymore + to_remove = [ + channel for channel in self.channels.values() + if not (channel.online or channel.pending_online) + or channel.game is None or channel.game != selected_game + ] + self.websocket.remove_topics( + WebsocketTopic.as_str("Channel", "VideoPlayback", channel.id) + for channel in to_remove + ) + for channel in to_remove: + del self.channels[channel.id] + channel.remove() + self.change_state(State.CHANNEL_FETCH) + elif self._state is State.CHANNEL_SWITCH: + if selected_game is None: + self.change_state(State.GAME_SELECT) + else: + # Change into the selected channel + channels: Iterable[Channel] + selected_channel = self.gui.channels.get_selection() + if selected_channel is not None: + self.gui.channels.clear_selection() + channels = chain([selected_channel], self.channels.values()) + else: + channels = self.channels.values() + # If there's no selected channel, change into a channel we can watch + for channel in channels: + if ( + channel.online # steam online + and channel.drops_enabled # drops are enabled + and channel.game == selected_game # it's a game we've selected + ): + self.watch(channel) + # break the state change chain by clearing the flag + self._state_change.clear() + break + else: + self.stop_watching() + selected_game = self.gui.games.get_next_selection() + if selected_game is None: + self.gui.print("No suitable channel to watch.") + # TODO: Figure out what to do here. + return + self.change_state(State.CHANNEL_CLEANUP) + await self._state_change.wait() def watch(self, channel: Channel): + if self.is_watching(channel): + # we're already watching the same channel, so there's no point switching + return if self._watching_task is not None: self._watching_task.cancel() - - async def watcher(channel: Channel): - op = GQL_OPERATIONS["ChannelPointsContext"].with_variables( - {"channelLogin": channel.name} - ) - i = 0 - while True: - await channel._send_watch() - if i == 0: - # ensure every 30 minutes that we don't have unclaimed points bonus - response = await self.gql_request(op) - channel_data: JsonType = response["data"]["community"]["channel"] - claim_available: JsonType = ( - channel_data["self"]["communityPoints"]["availableClaim"] - ) - if claim_available: - await self.claim_points(channel_data["id"], claim_available["id"]) - logger.info("Claimed bonus points") - i = (i + 1) % 30 - await asyncio.sleep(58.5) - - if channel.stream is not None and channel.stream.game is not None: - game_name = channel.stream.game.name - else: - game_name = "" - print(f"Watching: {channel.name}, game: {game_name}") + self.gui.channels.set_watching(channel) self._watching_channel = channel - self._watching_task = asyncio.create_task(watcher(channel)) + self._watching_task = asyncio.create_task(channel.watch_loop()) + self.gui.progress.start_timer() def stop_watching(self): + self.gui.progress.stop_timer() + self.gui.channels.clear_watching() if self._watching_task is not None: - logger.info("Watching stopped.") self._watching_task.cancel() self._watching_task = None self._watching_channel = None @@ -231,10 +289,10 @@ class Twitch: if msg_type == "stream-down": logger.info(f"{channel.name} goes OFFLINE") channel.set_offline() - if self.is_currently_watching(channel): - print(f"{channel.name} goes OFFLINE, switching...") + if self.is_watching(channel): + self.gui.print(f"{channel.name} goes OFFLINE, switching...") # change the channel if we're currently watching it - self._channel_change.set() + self.change_state(State.CHANNEL_SWITCH) elif msg_type == "stream-up": logger.info(f"{channel.name} goes ONLINE") channel.set_online() @@ -243,17 +301,140 @@ class Twitch: # if it's not online for some reason, set it so channel.set_online() else: - assert channel.stream is not None viewers = message["viewers"] - channel.stream.viewer_count = viewers + channel.viewers = viewers logger.debug(f"{channel.name} viewers: {viewers}") + @task_wrapper + async def process_drops(self, user_id: int, message: JsonType): + # Message examples: + # {"type": "drop-progress", data: {"current_progress_min": 3, "required_progress_min": 10}} + # {"type": "drop-claim", data: {"drop_instance_id": ...}} + msg_type: str = message["type"] + if msg_type not in ("drop-progress", "drop-claim"): + return + drop_id: str = message["data"]["drop_id"] + drop: Optional[TimedDrop] = self.get_drop(drop_id) + if msg_type == "drop-claim" and drop is None: + logger.error( + f"Received a drop claim ID for a non-existing drop: {drop_id}\n" + f"Drop claim ID: {message['data']['drop_instance_id']}" + ) + return + # Sometimes, the drop update we receive doesn't actually match what we're mining. + # This is a Twitch bug workaround: use GQL to get the current drop progress. + if msg_type == "drop-progress" and (drop is None or not drop.campaign.active): + logger.debug( + "Received a drop update for an inactive campaign, using drop context instead" + ) + context = await self.gql_request(GQL_OPERATIONS["CurrentDrop"]) + drop_data = context["data"]["currentUser"]["dropCurrentSession"] + drop_id = drop_data["dropID"] + drop = self.get_drop(drop_id) + if drop is None: + logger.warning(f"Received an update for a non-existing drop: {drop_id}") + return + if not drop.campaign.active: + # Sometimes, even GQL fails to give us the correct drop. + # In that case, we can use the locally cached inventory to try and put together + # the drop that we're actually mining right now. + # TODO: Find a better way of figuring out the correct game + game = drop.campaign.game + drop = None + for campaign in self.inventory: + if campaign.game == game: + drop = campaign.get_active_drop() + break + if drop is None or not drop.campaign.active: + logger.error("Active drop search failed") + return + drop.bump_minutes() + self.gui.progress.update(drop) + return + else: + # TODO: Use a cleaner solution than modifying the raw payload + message["data"]["current_progress_min"] = drop_data["currentMinutesWatched"] + message["data"]["required_progress_min"] = drop_data["requiredMinutesWatched"] + assert drop is not None + drop.update(message) + if msg_type == "drop-claim": + campaign = drop.campaign + await drop.claim() + self.gui.print( + f"Claimed drop: {drop.rewards_text()} " + f"({campaign.claimed_drops}/{campaign.total_drops})" + ) + if campaign.remaining_drops == 0: + self.change_state(State.INVENTORY_FETCH) + self.gui.progress.update(drop) + + @task_wrapper + async def process_points(self, user_id: int, message: JsonType): + # Example payloads: + # { + # "type": "points-earned", + # "data": { + # "timestamp": "YYYY-MM-DDTHH:MM:SS.123456789Z", + # "channel_id": "123456789", + # "point_gain": { + # "user_id": "12345678", + # "channel_id": "123456789", + # "total_points": 10, + # "baseline_points": 10, + # "reason_code": "WATCH", + # "multipliers": [] + # }, + # "balance": { + # "user_id": "12345678", + # "channel_id": "123456789", + # "balance": 12345 + # } + # } + # } + # { + # "type": "claim-available", + # "data": { + # "timestamp":"2021-12-23T21:41:35.784041064Z", + # "claim": { + # "id": "4ae6fefd-3658-40ae-ad3d-92254c576a91", + # "user_id": "94275183", + # "channel_id": "218893986", + # "point_gain": { + # "user_id": "94275183", + # "channel_id": "218893986", + # "total_points": 50, + # "baseline_points": 50, + # "reason_code": "CLAIM", + # "multipliers": [] + # }, + # "created_at": "2021-12-23T21:41:31Z" + # } + # } + # } + msg_type = message["type"] + if msg_type == "points-earned": + data = message["data"] + channel = self.channels.get(int(data["channel_id"])) + points = data["point_gain"]["total_points"] + balance = data["balance"]["balance"] + if channel is not None: + channel.points = balance + self.gui.channels.display(channel) + self.gui.print(f"Earned points for watching: {points:3}, total: {balance}") + elif msg_type == "claim-available": + claim_data = message["data"]["claim"] + points = claim_data["point_gain"]["total_points"] + await self.claim_points(claim_data["channel_id"], claim_data["id"]) + self.gui.print(f"Claimed bonus points: {points}") + async def _validate_password(self, password: str) -> bool: """ Use Twitch's password validator to validate the password length, characters required, etc. Helps avoid running into the CAPTCHA if you mistype your password by mistake. Valid length: 8-71 """ + if not 8 <= len(password) <= 71: + return False payload = {"password": password} async with self._session.post( f"{AUTH_URL}/api/v1/password_strength", json=payload @@ -261,120 +442,97 @@ class Twitch: strength_response = await response.json() return strength_response["isValid"] - async def get_password(self, prompt: str = "Password: ") -> str: - """ - A loop that'll keep asking for password, until it's considered valid. - Use own implementation rather than `getpass.getpass`, to add some user feedback - on how many characters have been typed in. - """ + async def ask_login(self) -> LoginData: while True: - for c in prompt: - msvcrt.putwch(c) - pass_chars: List[str] = [] - try: - while True: - c = msvcrt.getwch() - if c in "\r\n": - break - elif c == '\003': - raise KeyboardInterrupt - elif c == '\b': - # backspace - if not pass_chars: - # we have nothing to remove - continue - pass_chars.pop() - # move one character back - msvcrt.putwch('\b') - # overwrite the • with a space - msvcrt.putwch(' ') - # move back again - msvcrt.putwch('\b') - else: - pass_chars.append(c) - msvcrt.putwch('•') - finally: - msvcrt.putwch('\r') - msvcrt.putwch('\n') - password = ''.join(pass_chars) - if await self._validate_password(password): - return password + data = await self.gui.login.ask_login() + if await self._validate_password(data.password): + return data async def _login(self) -> str: logger.debug("Login flow started") - if self.username is None: - self.username = input("Username: ") - if self.password is None: - print("\nNote: Password can be pasted in by pressing right click inside the window.\n") - self.password = await self.get_password() payload: JsonType = { - "username": self.username, - "password": self.password, "client_id": CLIENT_ID, "undelete_user": False, "remember_me": True, } - for attempt in range(10): - async with self._session.post(f"{AUTH_URL}/login", json=payload) as response: - login_response = await response.json() + while True: + username, password, token = await self.ask_login() + payload["username"] = username + payload["password"] = password + # remove stale 2FA tokens, if present + payload.pop("authy_token", None) + payload.pop("twitchguard_code", None) + for attempt in range(2): + async with self._session.post(f"{AUTH_URL}/login", json=payload) as response: + login_response: JsonType = await response.json() - # Feed this back in to avoid running into CAPTCHA if possible - if "captcha_proof" in login_response: - payload["captcha"] = {"proof": login_response["captcha_proof"]} + # Feed this back in to avoid running into CAPTCHA if possible + if "captcha_proof" in login_response: + payload["captcha"] = {"proof": login_response["captcha_proof"]} - # Error handling - if "error_code" in login_response: - error_code = login_response["error_code"] - logger.debug(f"Login error code: {error_code}") - if error_code == 1000: - # we've failed bois - logger.debug("Login failed due to CAPTCHA") - raise CaptchaRequired() - elif error_code == 3001: - # wrong password you dummy - logger.debug("Login failed due to incorrect login or pass") - print(f"Incorrect username or password.\nUsername: {self.username}") - self.password = await self.get_password() - elif error_code in ( - 3011, # Authy token needed - 3012, # Invalid authy token - 3022, # Email code needed - 3023, # Invalid email code - ): - # 2FA handling - email = error_code in (3022, 3023) - logger.debug("2FA token required") - token = input("2FA token: ") - if email: - # email code - payload["twitchguard_code"] = token + # Error handling + if "error_code" in login_response: + error_code: int = login_response["error_code"] + logger.debug(f"Login error code: {error_code}") + if error_code == 1000: + # we've failed bois + logger.debug("Login failed due to CAPTCHA") + raise CaptchaRequired() + elif error_code == 3001: + # wrong password you dummy + logger.debug("Login failed due to incorrect login or pass") + self.gui.print("Incorrect username or password.") + self.gui.login.clear(password=True) + break + elif error_code in ( + 3012, # Invalid authy token + 3023, # Invalid email code + ): + logger.debug("Login failed due to incorrect 2FA code") + if error_code == 3023: + self.gui.print("Incorrect email code.") + else: + self.gui.print("Incorrect 2FA code.") + self.gui.login.clear(token=True) + break + elif error_code in ( + 3011, # Authy token needed + 3022, # Email code needed + ): + # 2FA handling + email = error_code == 3022 + if not token: + logger.debug("2FA token required") + # user didn't provide a token, so ask them for it + if email: + self.gui.print("Email code required. Check your email.") + else: + self.gui.print("2FA token required.") + break + if email: + payload["twitchguard_code"] = token + else: + payload["authy_token"] = token + continue else: - # authy token - payload["authy_token"] = token - continue - else: - raise LoginException(login_response["error"]) - - # Success handling - if "access_token" in login_response: - # we're in bois - self._access_token = login_response["access_token"] - logger.debug(f"Access token: {self._access_token}") - break - - if self._access_token is None: - # this means we've ran out of retries - raise LoginException("Ran out of login retries") - return self._access_token + raise LoginException(login_response["error"]) + # Success handling + if "access_token" in login_response: + # we're in bois + self._access_token = cast(str, login_response["access_token"]) + logger.debug("Access token granted") + self.gui.login.clear() + return self._access_token async def check_login(self) -> None: if self._access_token is not None and self._user_id is not None: # we're all good return # looks like we're missing something - print("Logging in") + logger.debug("Checking login") + self.gui.login.update("Logging in...", None) jar = cast(aiohttp.CookieJar, self._session.cookie_jar) while True: cookie = jar.filter_cookies("https://twitch.tv") # type: ignore @@ -386,7 +544,7 @@ class Twitch: elif self._access_token is None: # have cookie - get our access token self._access_token = cookie["auth-token"].value - logger.debug("Session restored from cookie") + logger.debug("Restoring session from cookie") # validate our access token, by obtaining user_id async with self._session.get( "https://id.twitch.tv/oauth2/validate", @@ -395,6 +553,7 @@ class Twitch: status = response.status if status == 401: # the access token we have is invalid - clear the cookie and reauth + logger.debug("Restored session is invalid") jar.clear_domain("twitch.tv") continue elif status == 200: @@ -403,7 +562,8 @@ class Twitch: self._user_id = int(validate_response["user_id"]) cookie["persistent"] = str(self._user_id) self._is_logged_in.set() - print(f"Login successful, User ID: {self._user_id}") + logger.debug(f"Login successful, user ID: {self._user_id}") + self.gui.login.update("Logged in", self._user_id) # update our cookie and save it jar.update_cookies(cookie, URL("https://twitch.tv")) jar.save(COOKIES_PATH) @@ -420,34 +580,38 @@ class Twitch: gql_logger.debug(f"GQL Response: {response_json}") return response_json - async def get_inventory(self) -> List[DropsCampaign]: + async def fetch_inventory(self) -> None: response = await self.gql_request(GQL_OPERATIONS["Inventory"]) inventory = response["data"]["currentUser"]["inventory"] - return [DropsCampaign(self, data) for data in inventory["dropCampaignsInProgress"]] + self.inventory = [ + DropsCampaign(self, data) for data in inventory["dropCampaignsInProgress"] + ] - async def get_live_streams( - self, games: Collection[Game], tag_ids: List[str] - ) -> Dict[Game, List[Channel]]: - limit = min(int((MAX_WEBSOCKETS * WS_TOPICS_LIMIT) // len(games)), 100) - live_streams = {} - for game in games: - response = await self.gql_request( - GQL_OPERATIONS["GameDirectory"].with_variables({ - "limit": limit, - "name": game.name, - "options": { - "includeRestricted": ["SUB_ONLY_LIVE"], - "tags": tag_ids, - }, - }) - ) - live_streams[game] = [ - Channel.from_directory(self, stream_channel_data["node"]) - for stream_channel_data in response["data"]["game"]["streams"]["edges"] - ] - return live_streams + def get_drop(self, drop_id: str) -> Optional[TimedDrop]: + for campaign in self.inventory: + drop = campaign.get_drop(drop_id) + if drop is not None: + return drop + return None - async def claim_points(self, channel_id: Union[str, int], claim_id: str): + async def get_live_streams(self, game: Game, tag_ids: List[str]) -> List[Channel]: + limit = 45 + response = await self.gql_request( + GQL_OPERATIONS["GameDirectory"].with_variables({ + "limit": limit, + "name": game.name, + "options": { + "includeRestricted": ["SUB_ONLY_LIVE"], + "tags": tag_ids, + }, + }) + ) + return [ + Channel.from_directory(self, stream_channel_data["node"]) + for stream_channel_data in response["data"]["game"]["streams"]["edges"] + ] + + async def claim_points(self, channel_id: Union[str, int], claim_id: str) -> None: variables = {"input": {"channelID": str(channel_id), "claimID": claim_id}} await self.gql_request( GQL_OPERATIONS["ClaimCommunityPoints"].with_variables(variables) diff --git a/version.py b/version.py new file mode 100644 index 0000000..b350a5d --- /dev/null +++ b/version.py @@ -0,0 +1 @@ +__version__ = 4 diff --git a/websocket.py b/websocket.py index 6cb6b0e..b109236 100644 --- a/websocket.py +++ b/websocket.py @@ -13,7 +13,6 @@ from typing import Any, Optional, List, Dict, Set, Iterable, TYPE_CHECKING from websockets.exceptions import ConnectionClosed, ConnectionClosedOK from websockets.client import WebSocketClientProtocol, connect as websocket_connect -from inventory import TimedDrop from exceptions import MinerException from constants import ( JsonType, @@ -29,6 +28,7 @@ if TYPE_CHECKING: from twitch import Twitch +logger = logging.getLogger("TwitchDrops") ws_logger = logging.getLogger("TwitchDrops.websocket") NONCE_CHARS = string.ascii_letters + string.digits @@ -39,11 +39,11 @@ def create_nonce(length: int = 30) -> str: def task_wrapper(afunc): @wraps(afunc) - async def wrapper(self: Websocket, *args, **kwargs): + async def wrapper(self, *args, **kwargs): try: await afunc(self, *args, **kwargs) except Exception: - ws_logger.exception("Exception in websocket task") + logger.exception("Exception in task") raise # raise up to the wrapping task return wrapper @@ -55,7 +55,7 @@ class Websocket: # websocket index self._idx: int = index # current websocket connection - self._ws: WebSocketClientProtocol + self._ws: Optional[WebSocketClientProtocol] = None # set when there's an active websocket connection self._connected_flag = asyncio.Event() # set when the websocket needs to reconnect @@ -70,6 +70,8 @@ class Websocket: # topics stuff self.topics: Dict[str, WebsocketTopic] = {} self._submitted: Set[WebsocketTopic] = set() + # notify GUI + self.set_status("Disconnected") @property def connected(self) -> bool: @@ -78,12 +80,25 @@ class Websocket: def wait_until_connected(self): return self._connected_flag.wait() + def set_status(self, status: Optional[str] = None, refresh_topics: bool = False): + kwargs: Dict[str, Any] = {} + if status is not None: + kwargs["status"] = status + if refresh_topics: + kwargs["topics"] = len(self.topics) + self._twitch.gui.websockets.update(self._idx, **kwargs) + def request_reconnect(self): ws_logger.warning(f"Websocket[{self._idx}] requested reconnect.") # reset our ping interval, so we send a PING after reconnect right away self._next_ping = time() self._reconnect_requested.set() + async def close(self): + self.set_status("Disconnecting...") + if self._ws is not None: + await self._ws.close() + async def start(self): if self.connected: return @@ -98,27 +113,35 @@ class Websocket: self._handle_task = asyncio.create_task(self._handle()) async def stop(self): - if self._ws is not None: - await self._ws.close() + await self.close() if self._handle_task is not None: + # this raises back any stray exceptions await self._handle_task self._handle_task = None def stop_nowait(self): - if self._ws is not None: - asyncio.create_task(self._ws.close()) + asyncio.create_task(self.close()) # note: this detaches the handle task, so we have to assume it closes properly self._handle_task = None + def remove(self): + # this stops the websocket, and then removes it from the gui list + async def remover(): + await self.stop() + self._twitch.gui.websockets.remove(self._idx) + asyncio.create_task(remover()) + @task_wrapper async def _handle(self): # ensure we're logged in before connecting await self._twitch.wait_until_login() + self.set_status("Connecting...") ws_logger.info(f"Websocket[{self._idx}] connecting...") # Connect/Reconnect loop async for websocket in websocket_connect(WEBSOCKET_URL, ssl=True, ping_interval=None): websocket.BACKOFF_MAX = 3 * 60 # type: ignore # 3 minutes self._ws = websocket + self.set_status("Connected") try: try: self._reconnect_requested.clear() @@ -135,20 +158,24 @@ class Websocket: if isinstance(exc, ConnectionClosedOK): if exc.rcvd_then_sent: # server closed the connection, not us - reconnect - ws_logger.warning(f"Websocket[{self._idx}] disconnected.") + ws_logger.warning(f"Websocket[{self._idx}] got disconnected.") else: # we closed it - exit + self._ws = None ws_logger.info(f"Websocket[{self._idx}] stopped.") + self.set_status("Disconnected") return - if exc.rcvd is not None: - code = exc.rcvd.code - elif exc.sent is not None: - code = exc.sent.code else: - code = -1 - ws_logger.warning(f"Websocket[{self._idx}] closed unexpectedly: {code}") + if exc.rcvd is not None: + code = exc.rcvd.code + elif exc.sent is not None: + code = exc.sent.code + else: + code = -1 + ws_logger.warning(f"Websocket[{self._idx}] closed unexpectedly: {code}") except Exception: ws_logger.exception(f"Exception in Websocket[{self._idx}]") + self.set_status("Reconnecting...") ws_logger.warning(f"Websocket[{self._idx}] reconnecting...") async def _handle_ping(self): @@ -203,6 +230,7 @@ class Websocket: Gather incoming messages over the timeout specified. Note that there's no return value - this modifies `messages` in-place. """ + assert self._ws is not None while True: raw_message = await self._ws.recv() message = json.loads(raw_message) @@ -211,10 +239,10 @@ class Websocket: def _handle_message(self, message): # request the assigned topic to process the response - topic_process = self.topics.get(message["data"]["topic"]) - if topic_process is not None: + topic = self.topics.get(message["data"]["topic"]) + if topic is not None: # use a task to not block the websocket - asyncio.create_task(topic_process(json.loads(message["data"]["message"]))) + asyncio.create_task(topic(json.loads(message["data"]["message"]))) async def _handle_recv(self): """ @@ -246,6 +274,7 @@ class Websocket: topic = topics_set.pop() self.topics[str(topic)] = topic self._topics_changed.set() + self.set_status(refresh_topics=True) def remove_topics(self, topics_set: Set[str]): existing = topics_set.intersection(self.topics.keys()) @@ -256,6 +285,7 @@ class Websocket: for topic in existing: del self.topics[topic] self._topics_changed.set() + self.set_status(refresh_topics=True) async def send(self, message: JsonType): if self._ws is None: @@ -283,17 +313,12 @@ class WebsocketPool: await self._twitch.wait_until_login() if self.running: return - # Add default topics - assert self._twitch._user_id is not None - user_id = self._twitch._user_id - self.add_topics([ - WebsocketTopic("User", "Drops", user_id, self.process_drops), - WebsocketTopic("User", "CommunityPoints", user_id, self.process_points), - ]) self._running.set() await asyncio.gather(*(ws.start() for ws in self.websockets)) async def stop(self): + if not self.running: + return self._running.clear() await asyncio.gather(*(ws.stop() for ws in self.websockets)) @@ -340,49 +365,8 @@ class WebsocketPool: if count <= (len(self.websockets) - 1) * WS_TOPICS_LIMIT: ws = self.websockets.pop() recycled_topics.extend(ws.topics.values()) - ws.stop_nowait() + ws.remove() else: break if recycled_topics: self.add_topics(recycled_topics) - - @task_wrapper - async def process_drops(self, user_id: int, message: JsonType): - drop_id = message["data"]["drop_id"] - drop: Optional[TimedDrop] = None - for campaign in self._twitch.inventory: - drop = campaign.get_drop(drop_id) - if drop is not None: - break - else: - ws_logger.warning(f"Drop with ID of {drop_id} not found!") - return - drop.update(message) - msg_type = message["type"] - campaign = drop.campaign - if msg_type == "drop-progress": - print( - f"Drop: {drop.rewards_text()} ({campaign.claimed_drops}/{campaign.total_drops}): " - f"{drop.progress:6.1%} ({drop.remaining_minutes} minutes remaining)" - ) - elif msg_type == "drop-claim": - await drop.claim() - print( - f"Claimed drop: {drop.rewards_text()} " - f"({campaign.claimed_drops}/{campaign.total_drops})" - ) - if campaign.remaining_drops == 0: - self._twitch.reevaluate_campaigns() - - @task_wrapper - async def process_points(self, user_id: int, message: JsonType): - msg_type = message["type"] - if msg_type == "points-earned": - points = message["data"]["point_gain"]["total_points"] - balance = message["data"]["balance"]["balance"] - print(f"Earned points for watching: {points:3}, total: {balance}") - elif msg_type == "claim-available": - claim_data = message["data"]["claim"] - points = claim_data["point_gain"]["total_points"] - await self._twitch.claim_points(claim_data["channel_id"], claim_data["id"]) - print(f"Claimed bonus points: {points}")