diff --git a/.gitignore b/.gitignore index d84fdf3..021524e 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,5 @@ settings.json /lang/English.json logs/ -.claude/ \ No newline at end of file +.claude/ +data/ \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py index 4440e06..312b8c2 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -1,4 +1,3 @@ """TwitchDropsMiner - Modular source package.""" __version__ = "1.0.0" - diff --git a/src/__main__.py b/src/__main__.py index 7d977e5..00e9a6b 100644 --- a/src/__main__.py +++ b/src/__main__.py @@ -1,26 +1,28 @@ from __future__ import annotations -if __name__ == "__main__": - import argparse - import asyncio - import logging - from logging.handlers import TimedRotatingFileHandler - import signal - import sys - import traceback - import warnings +import argparse +import asyncio +import logging +import signal +import sys +import traceback +import warnings +from logging.handlers import TimedRotatingFileHandler + +import truststore + + +if __name__ == "__main__": - import truststore truststore.inject_into_ssl() - from src.config import FILE_FORMATTER, LOG_PATH, LOGGING_LEVELS, SELF_PATH + from src.config import FILE_FORMATTER, LOGGING_LEVELS from src.config.settings import Settings from src.core.client import Twitch from src.exceptions import CaptchaRequired from src.i18n import _ from src.version import __version__ - logger = logging.getLogger("TwitchDrops") # Force INFO level logging by default for better visibility logger.setLevel(logging.INFO) @@ -73,16 +75,13 @@ if __name__ == "__main__": # handle input parameters logger.debug("Parsing command line arguments") parser = argparse.ArgumentParser( - SELF_PATH.name, description="A program that allows you to mine timed drops on Twitch.", ) parser.add_argument("--version", action="version", version=f"v{__version__}") parser.add_argument("-v", dest="_verbose", action="count", default=0) parser.add_argument("--dump", action="store_true") # undocumented debug args - parser.add_argument( - "--debug-ws", dest="_debug_ws", action="store_true", help=argparse.SUPPRESS - ) + parser.add_argument("--debug-ws", dest="_debug_ws", action="store_true", help=argparse.SUPPRESS) parser.add_argument( "--debug-gql", dest="_debug_gql", action="store_true", help=argparse.SUPPRESS ) @@ -101,21 +100,18 @@ if __name__ == "__main__": async def main(): # set language from contextlib import suppress + with suppress(ValueError): # this language doesn't exist - stick to English _.set_language(settings.language) # Always log to file with timestamped filename in ./logs/ directory - from datetime import datetime from pathlib import Path # Create logs directory if it doesn't exist logs_dir = Path("logs") logs_dir.mkdir(exist_ok=True) - - # Generate timestamped log filename: TDM.YYYY-MM-DDTHH-MM-SS.log - timestamp = datetime.now().isoformat(timespec='seconds').replace(':', '-') - log_file = logs_dir / f"TDM.log" + log_file = logs_dir / "TDM.log" # Add file handler for timestamped log file_handler = TimedRotatingFileHandler(log_file, when="midnight", backupCount=5) @@ -139,6 +135,7 @@ if __name__ == "__main__": logger.info("Initializing web GUI mode") from src.web import app as webapp from src.web.gui_manager import WebGUIManager + # Set up web GUI logger.debug("Creating WebGUIManager") client.gui = WebGUIManager(client) @@ -147,16 +144,14 @@ if __name__ == "__main__": webapp.set_managers(client.gui, client) # Start web server in background logger.info("Starting web server on http://0.0.0.0:8080") - web_server_task = asyncio.create_task( - webapp.run_server(host="0.0.0.0", port=8080) - ) + web_server_task = asyncio.create_task(webapp.run_server(host="0.0.0.0", port=8080)) logger.info("Web server task created") loop = asyncio.get_running_loop() if sys.platform == "linux": logger.debug("Setting up signal handlers for SIGINT and SIGTERM") - loop.add_signal_handler(signal.SIGINT, lambda *_: client.gui.close()) - loop.add_signal_handler(signal.SIGTERM, lambda *_: client.gui.close()) + loop.add_signal_handler(signal.SIGINT, lambda *_: client.close()) + loop.add_signal_handler(signal.SIGTERM, lambda *_: client.close()) logger.info("Starting main client run loop") try: @@ -165,12 +160,10 @@ if __name__ == "__main__": except CaptchaRequired: logger.error("Captcha required - cannot continue") exit_status = 1 - client.prevent_close() client.print(_("error", "captcha")) except Exception: logger.exception("Fatal error encountered during client run") exit_status = 1 - client.prevent_close() client.print("Fatal error encountered:\n") client.print(traceback.format_exc()) finally: @@ -200,14 +193,16 @@ if __name__ == "__main__": except Exception as e: logger.error(f"Error while shutting down web server: {e}") else: - logger.debug(f"Web server task status: task={web_server_task is not None}, done={web_server_task.done() if web_server_task else 'N/A'}") + logger.debug( + f"Web server task status: task={web_server_task is not None}, done={web_server_task.done() if web_server_task else 'N/A'}" + ) logger.info("Shutting down Twitch client") await client.shutdown() logger.info("Twitch client shutdown completed") - logger.info(f"Shutdown complete - close_requested={client.gui.close_requested}, exit_status={exit_status}") - if not client.gui.close_requested: - logger.warning("User didn't request closure - showing error state") - # user didn't request the closure + logger.info(f"Shutdown complete - exit_status={exit_status}") + if exit_status != 0: + logger.warning("Application terminated with error - showing error state") + # Application terminated with error client.gui.tray.change_icon("error") client.print(_("status", "terminated")) client.gui.status.update(_("gui", "status", "terminated")) @@ -216,16 +211,11 @@ if __name__ == "__main__": # Web GUI doesn't need to wait - browser clients can stay connected logger.info("Web GUI - no need to wait for user to close browser") else: - logger.info("Close already requested - proceeding with shutdown") + logger.info("Normal shutdown - proceeding") # save the application state logger.info("Saving application state") client.save(force=True) logger.info("Application state saved") - logger.info("Stopping GUI") - client.gui.stop() - logger.info("GUI stopped") - logger.info("Closing GUI window") - client.gui.close_window() logger.info(f"=== Exiting with status code: {exit_status} ===") sys.exit(exit_status) diff --git a/src/api/gql_client.py b/src/api/gql_client.py index 6a317d6..a0fa446 100644 --- a/src/api/gql_client.py +++ b/src/api/gql_client.py @@ -63,16 +63,12 @@ class GQLClient: self._qgl_limiter = RateLimiter(capacity=5, window=1) @overload - async def request(self, ops: GQLOperation) -> JsonType: - ... + async def request(self, ops: GQLOperation) -> JsonType: ... @overload - async def request(self, ops: list[GQLOperation]) -> list[JsonType]: - ... + async def request(self, ops: list[GQLOperation]) -> list[JsonType]: ... - async def request( - self, ops: GQLOperation | list[GQLOperation] - ) -> JsonType | list[JsonType]: + async def request(self, ops: GQLOperation | list[GQLOperation]) -> JsonType | list[JsonType]: """ Execute one or more GraphQL operations. @@ -105,9 +101,7 @@ class GQLClient: "POST", "https://gql.twitch.tv/gql", json=ops, - headers=auth_state.headers( - user_agent=self._client_type.USER_AGENT, gql=True - ), + headers=auth_state.headers(user_agent=self._client_type.USER_AGENT, gql=True), ) as response: response_json: JsonType | list[JsonType] = await response.json() @@ -123,13 +117,9 @@ class GQLClient: if "errors" in response_json: for error_dict in response_json["errors"]: if "message" in error_dict: - if ( - single_retry - and error_dict["message"] - in ( - "service error", - "PersistedQueryNotFound", - ) + if single_retry and error_dict["message"] in ( + "service error", + "PersistedQueryNotFound", ): logger.error( f"Retrying a {error_dict['message']} for " @@ -160,9 +150,7 @@ class GQLClient: raise GQLException(response_json["errors"]) # Other error handling elif "error" in response_json: - raise GQLException( - f"{response_json['error']}: {response_json['message']}" - ) + raise GQLException(f"{response_json['error']}: {response_json['message']}") if force_retry: break diff --git a/src/api/http_client.py b/src/api/http_client.py index 5f01c46..97f282e 100644 --- a/src/api/http_client.py +++ b/src/api/http_client.py @@ -25,6 +25,7 @@ from src.utils import ExponentialBackoff if TYPE_CHECKING: from src.config import ClientInfo from src.config.settings import Settings + from src.core.client import Twitch from src.web.gui_manager import WebGUIManager @@ -46,6 +47,7 @@ class HTTPClient: self, settings: Settings, gui: WebGUIManager, + twitch: Twitch, client_type: ClientInfo, ): """ @@ -56,12 +58,15 @@ class HTTPClient: settings : Settings Application settings for connection quality and proxy configuration gui : WebGUIManager - GUI manager for user notifications and close detection + GUI manager for user notifications + twitch : Twitch + Twitch client for state checking client_type : ClientInfo Client type information (User-Agent, Client-ID, etc.) """ self.settings = settings self.gui = gui + self._twitch = twitch self._client_type = client_type self._session: aiohttp.ClientSession | None = None @@ -163,7 +168,9 @@ class HTTPClient: backoff = ExponentialBackoff(maximum=3 * 60) for delay in backoff: - if self.gui.close_requested: + from src.config import State + + if self._twitch._state == State.EXIT: raise ExitRequest() elif ( invalidate_after is not None @@ -174,9 +181,7 @@ class HTTPClient: try: response: aiohttp.ClientResponse | None = None - response = await self.gui.coro_unless_closed( - session.request(method, url, **kwargs) - ) + response = await session.request(method, url, **kwargs) assert response is not None logger.debug(f"Response: {response.status}: {response}") @@ -203,9 +208,8 @@ class HTTPClient: if response is not None: response.release() - # Wait for the backoff delay or until the GUI closes - with asyncio.suppress(asyncio.TimeoutError): - await asyncio.wait_for(self.gui.wait_until_closed(), timeout=delay) + # Wait for the backoff delay + await asyncio.sleep(delay) async def close(self) -> None: """ diff --git a/src/auth/auth_state.py b/src/auth/auth_state.py index 6b24729..aa4e459 100644 --- a/src/auth/auth_state.py +++ b/src/auth/auth_state.py @@ -155,8 +155,7 @@ class _AuthState: # the device_code has expired, request a new code continue - - def headers(self, *, user_agent: str = '', gql: bool = False) -> JsonType: + def headers(self, *, user_agent: str = "", gql: bool = False) -> JsonType: """ Build HTTP headers for Twitch API requests. @@ -181,7 +180,7 @@ class _AuthState: if hasattr(self, "session_id"): headers["Client-Session-Id"] = self.session_id # if hasattr(self, "client_version"): - # headers["Client-Version"] = self.client_version + # headers["Client-Version"] = self.client_version if hasattr(self, "device_id"): headers["X-Device-Id"] = self.device_id if gql: @@ -247,7 +246,7 @@ class _AuthState: async with self._twitch.request( "GET", "https://id.twitch.tv/oauth2/validate", - headers={"Authorization": f"OAuth {self.access_token}"} + headers={"Authorization": f"OAuth {self.access_token}"}, ) as response: if response.status == 401: # the access token we have is invalid - clear the cookie and reauth diff --git a/src/config/__init__.py b/src/config/__init__.py index 9bb8a3d..7338fd1 100644 --- a/src/config/__init__.py +++ b/src/config/__init__.py @@ -34,26 +34,11 @@ from .constants import ( ) from .operations import GQL_OPERATIONS from .paths import ( - CACHE_DB, - CACHE_PATH, COOKIES_PATH, DATA_DIR, - DUMP_PATH, - IS_DOCKER, - IS_PACKAGED, LANG_PATH, - LOCK_PATH, - LOG_PATH, - SCRIPTS_PATH, - SELF_PATH, SETTINGS_PATH, - SITE_PACKAGES_PATH, - SYS_SCRIPTS, - SYS_SITE_PACKAGES, - VENV_PATH, - WORKING_DIR, _merge_vars, - _resource_path, ) @@ -85,25 +70,10 @@ __all__ = [ "WATCH_INTERVAL", "WINDOW_TITLE", # paths.py - "IS_PACKAGED", - "IS_DOCKER", - "SYS_SITE_PACKAGES", - "SYS_SCRIPTS", - "SELF_PATH", - "WORKING_DIR", "DATA_DIR", - "VENV_PATH", - "SITE_PACKAGES_PATH", - "SCRIPTS_PATH", "LANG_PATH", - "LOG_PATH", - "DUMP_PATH", - "LOCK_PATH", - "CACHE_PATH", - "CACHE_DB", "COOKIES_PATH", "SETTINGS_PATH", - "_resource_path", "_merge_vars", # client_info.py "ClientInfo", diff --git a/src/config/client_info.py b/src/config/client_info.py index 5571e4f..78e10af 100644 --- a/src/config/client_info.py +++ b/src/config/client_info.py @@ -68,7 +68,7 @@ class ClientType: "Mozilla/5.0 (Linux; Android 16; LM-X420) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/138.0.7204.158 Mobile Safari/537.36" ), - ] + ], ) ANDROID_APP = ClientInfo( URL("https://www.twitch.tv"), @@ -102,7 +102,7 @@ class ClientType: "Dalvik/2.1.0 (Linux; U; Android 14; SM-X306B Build/UP1A.231005.007) " "tv.twitch.android.app/25.3.0/2503006" ), - ] + ], ) SMARTBOX = ClientInfo( URL("https://android.tv.twitch.tv"), diff --git a/src/config/constants.py b/src/config/constants.py index 0b04b7f..163bc97 100644 --- a/src/config/constants.py +++ b/src/config/constants.py @@ -31,10 +31,10 @@ LOGGING_LEVELS = { } FILE_FORMATTER = logging.Formatter( "{asctime}.{msecs:03.0f}:\t{levelname:>7}:\t{filename}:{lineno}:\t{message}", - style='{', + style="{", datefmt="%Y-%m-%d %H:%M:%S", ) -OUTPUT_FORMATTER = logging.Formatter("{levelname}: {message}", style='{', datefmt="%H:%M:%S") +OUTPUT_FORMATTER = logging.Formatter("{levelname}: {message}", style="{", datefmt="%H:%M:%S") # Type aliases JsonType = dict[str, Any] @@ -66,6 +66,7 @@ WINDOW_TITLE = f"Twitch Drops Miner v{__version__} (by DevilXD)" class State(Enum): """Application state machine states.""" + IDLE = auto() INVENTORY_FETCH = auto() GAMES_UPDATE = auto() @@ -86,7 +87,7 @@ class GQLOperation(JsonType): "version": 1, "sha256Hash": sha256, } - } + }, ) if variables is not None: self.__setitem__("variables", variables) @@ -120,9 +121,7 @@ class WebsocketTopic: self._process: TopicProcess = process @classmethod - def as_str( - cls, category: Literal["User", "Channel"], topic_name: str, target_id: int - ) -> str: + def as_str(cls, category: Literal["User", "Channel"], topic_name: str, target_id: int) -> str: return f"{WEBSOCKET_TOPICS[category][topic_name]}.{target_id}" def __call__(self, message: JsonType): diff --git a/src/config/operations.py b/src/config/operations.py index 0e2168f..05a7538 100644 --- a/src/config/operations.py +++ b/src/config/operations.py @@ -49,7 +49,7 @@ GQL_OPERATIONS: dict[str, GQLOperation] = { "d86775d0ef16a63a33ad52e80eaff963b2d5b72fada7c991504a57496e1d8e4b", variables={ "fetchRewardCampaigns": False, - } + }, ), # returns current state of drops (current drop progress) "CurrentDrop": GQLOperation( @@ -66,7 +66,7 @@ GQL_OPERATIONS: dict[str, GQLOperation] = { "5a4da2ab3d5b47c9f9ce864e727b2cb346af1e3ea8b897fe8f704a97ff017619", variables={ "fetchRewardCampaigns": False, - } + }, ), # returns extended information about a particular campaign "CampaignDetails": GQLOperation( diff --git a/src/config/paths.py b/src/config/paths.py index 99d65af..af9f82f 100644 --- a/src/config/paths.py +++ b/src/config/paths.py @@ -2,44 +2,11 @@ from __future__ import annotations -import os -import sys from pathlib import Path from typing import Any -# Type alias for path operations -JsonType = dict[str, Any] - - -# Environment detection -IS_DOCKER = os.getenv("DOCKER_ENV") == "1" or os.path.exists("/.dockerenv") - -# Site-packages venv path changes depending on the system platform -if sys.platform == "win32": - SYS_SITE_PACKAGES = "Lib/site-packages" -else: - # On Linux, the site-packages path includes a versioned 'pythonX.Y' folder part - # The Lib folder is also spelled in lowercase: 'lib' - version_info = sys.version_info - SYS_SITE_PACKAGES = f"lib/python{version_info.major}.{version_info.minor}/site-packages" - -# Scripts venv path changes depending on the system platform -if sys.platform == "win32": - SYS_SCRIPTS = "Scripts" -else: - SYS_SCRIPTS = "bin" - - -def _resource_path(relative_path: Path | str) -> Path: - """ - Get an absolute path to a bundled resource. - """ - base_path = WORKING_DIR - return base_path.joinpath(relative_path) - - -def _merge_vars(base_vars: JsonType, vars: JsonType) -> None: +def _merge_vars(base_vars: dict[str, Any], vars: dict[str, Any]) -> None: """ Merge variables recursively. @@ -68,30 +35,17 @@ def _merge_vars(base_vars: JsonType, vars: JsonType) -> None: # Base Paths - environment-specific resolution -if IS_DOCKER: - # Docker environment: use fixed paths - SELF_PATH = Path("/app/main.py") - WORKING_DIR = Path("/app") - DATA_DIR = Path("/app/data") +PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent +DATA_DIR = PROJECT_ROOT / "data" # Ensure data directory exists if not DATA_DIR.exists(): DATA_DIR.mkdir(parents=True, exist_ok=True) -# Development paths -VENV_PATH = Path(WORKING_DIR, "env") -SITE_PACKAGES_PATH = Path(VENV_PATH, SYS_SITE_PACKAGES) -SCRIPTS_PATH = Path(VENV_PATH, SYS_SCRIPTS) - # Translations path # NOTE: These don't have to be available to the end-user, so the path points to the internal dir -LANG_PATH = _resource_path("lang") +LANG_PATH = PROJECT_ROOT / "lang" # Persistent storage paths - use DATA_DIR for Docker compatibility -LOG_PATH = Path(DATA_DIR, "log.txt") -DUMP_PATH = Path(DATA_DIR, "dump.dat") -LOCK_PATH = Path(DATA_DIR, "lock.file") -CACHE_PATH = Path(DATA_DIR, "cache") -CACHE_DB = Path(CACHE_PATH, "mapping.json") -COOKIES_PATH = Path(DATA_DIR, "cookies.jar") -SETTINGS_PATH = Path(DATA_DIR, "settings.json") +COOKIES_PATH = DATA_DIR / "cookies.jar" +SETTINGS_PATH = DATA_DIR / "settings.json" diff --git a/src/core/client.py b/src/core/client.py index aa6d0ea..d9720b2 100644 --- a/src/core/client.py +++ b/src/core/client.py @@ -13,7 +13,6 @@ import aiohttp from src.api import GQLClient, HTTPClient from src.auth import _AuthState from src.config import ( - DUMP_PATH, MAX_CHANNELS, ClientType, State, @@ -67,7 +66,7 @@ class Twitch: self._client_type: ClientInfo = ClientType.ANDROID_APP self._auth_state: _AuthState = _AuthState(self) # GUI (will be set by main.py) - self.gui: WebGUIManager = None # type: ignore[assignment] + self.gui: WebGUIManager = None # type: ignore[assignment] # API clients (will be initialized after GUI is set) self._http_client: HTTPClient | None = None self._gql_client: GQLClient | None = None @@ -93,7 +92,7 @@ class Twitch: def _ensure_api_clients(self) -> None: """Ensure API clients are initialized (called after GUI is set).""" if self._http_client is None: - self._http_client = HTTPClient(self.settings, self.gui, self._client_type) + self._http_client = HTTPClient(self.settings, self.gui, self, self._client_type) if self._gql_client is None: self._gql_client = GQLClient(self._http_client, self._auth_state, self._client_type) @@ -162,13 +161,6 @@ class Twitch: """ self.change_state(State.EXIT) - def prevent_close(self) -> None: - """ - Called when the application window has to be prevented from closing, even after the user - closes it with X. Usually used solely to display tracebacks from the closing sequence. - """ - self.gui.prevent_close() - def print(self, message: str) -> None: """Print a message in the GUI.""" self.gui.print(message) @@ -191,21 +183,13 @@ class Twitch: """Remove websocket topics for a list of channels.""" topics_to_remove: list[str] = [] for channel in channels: - topics_to_remove.append( - WebsocketTopic.as_str("Channel", "StreamState", channel.id) - ) - topics_to_remove.append( - WebsocketTopic.as_str("Channel", "StreamUpdate", channel.id) - ) + topics_to_remove.append(WebsocketTopic.as_str("Channel", "StreamState", channel.id)) + topics_to_remove.append(WebsocketTopic.as_str("Channel", "StreamUpdate", channel.id)) if topics_to_remove: self.websocket.remove_topics(topics_to_remove) async def run(self) -> None: """Main entry point for the miner - handles reload and exit requests.""" - if self.settings.dump: - with open(DUMP_PATH, 'w', encoding="utf8"): - # replace the existing file with an empty one - pass while True: try: await self._run() @@ -226,7 +210,6 @@ class Twitch: • Selecting a stream to watch, and watching it • Changing the stream that's being watched if necessary """ - self.gui.start() # Initialize API clients now that GUI is available self._ensure_api_clients() auth_state = await self.get_auth() @@ -236,19 +219,21 @@ class Twitch: self._watching_task.cancel() self._watching_task = asyncio.create_task(self._watch_loop()) # Add default topics - self.websocket.add_topics([ - WebsocketTopic("User", "Drops", auth_state.user_id, self.process_drops), - WebsocketTopic( - "User", "Notifications", auth_state.user_id, self.process_notifications - ), - ]) + self.websocket.add_topics( + [ + WebsocketTopic("User", "Drops", auth_state.user_id, self.process_drops), + WebsocketTopic( + "User", "Notifications", auth_state.user_id, self.process_notifications + ), + ] + ) full_cleanup: bool = False channels: Final[OrderedDict[int, Channel]] = self.channels self.change_state(State.INVENTORY_FETCH) while True: if self._state is State.IDLE: if self.settings.dump: - self.gui.close() + self.close() continue self.gui.tray.change_icon("idle") self.gui.status.update(_("gui", "status", "idle")) @@ -278,17 +263,25 @@ class Twitch: games_to_watch: list[str] = self.settings.games_to_watch next_hour: datetime = datetime.now(timezone.utc) + timedelta(hours=1) logger.info("games_to_watch: %s", games_to_watch) - logger.info("inventory has %d eligible campaigns", sum(1 for c in self.inventory if c.eligible)) + logger.info( + "inventory has %d eligible campaigns", + sum(1 for c in self.inventory if c.eligible), + ) logger.debug("inventories: %s", self.inventory) # Log detailed game -> campaigns -> channels mapping if logger.isEnabledFor(logging.DEBUG): logger.info("=== Active Campaigns Mapping ===") from collections import defaultdict - game_campaign_map: dict[str, list[tuple[DropsCampaign, list[str]]]] = defaultdict(list) + + game_campaign_map: dict[str, list[tuple[DropsCampaign, list[str]]]] = ( + defaultdict(list) + ) for campaign in self.inventory: if campaign.eligible and not campaign.finished: - logger.info("eligible Campaign: %s - %s", campaign.name, campaign.game.name) + logger.info( + "eligible Campaign: %s - %s", campaign.name, campaign.game.name + ) if campaign.can_earn_within(next_hour): channel_names = [] if campaign.allowed_channels: @@ -300,14 +293,22 @@ class Twitch: logger.debug(f"Game: {game_name}") for campaign, channel_list in game_campaign_map[game_name]: status_info = f"{'ACTIVE' if campaign.active else 'UPCOMING'}" - ends_info = campaign.ends_at.astimezone().strftime('%Y-%m-%d %H:%M') - channel_info = f"{len(channel_list)} channels" if channel_list[0] != "" else "directory" - logger.debug(f" └─ Campaign: {campaign.name} [{status_info}] (ends: {ends_info})") + ends_info = campaign.ends_at.astimezone().strftime("%Y-%m-%d %H:%M") + channel_info = ( + f"{len(channel_list)} channels" + if channel_list[0] != "" + else "directory" + ) + logger.debug( + f" └─ Campaign: {campaign.name} [{status_info}] (ends: {ends_info})" + ) logger.debug(f" Channels: {channel_info}") if channel_list[0] != "" and len(channel_list) <= 10: logger.debug(f" └─ {', '.join(channel_list)}") elif channel_list[0] != "": - logger.debug(f" └─ {', '.join(channel_list[:10])} ... (+{len(channel_list)-10} more)") + logger.debug( + f" └─ {', '.join(channel_list[:10])} ... (+{len(channel_list) - 10} more)" + ) logger.info("=== End Campaigns Mapping ===") # Build wanted_games list preserving the order from games_to_watch @@ -319,27 +320,31 @@ class Twitch: if ( game.name.lower() == game_name_lower and game not in self.wanted_games # isn't already there - and campaign.can_earn_within(next_hour) # can be progressed within the next hour + and campaign.can_earn_within( + next_hour + ) # can be progressed within the next hour ): self.wanted_games.append(game) break # Only add each game once if self.wanted_games: logger.info( - "Wanted games: %s", - ", ".join(game.name for game in self.wanted_games) + "Wanted games: %s", ", ".join(game.name for game in self.wanted_games) ) else: logger.warning( "No wanted games found! games_to_watch=%s, eligible_campaigns=%d", games_to_watch, - sum(1 for c in self.inventory if c.eligible and c.can_earn_within(next_hour)) + sum( + 1 for c in self.inventory if c.eligible and c.can_earn_within(next_hour) + ), ) # Handle manual mode: check if manual game still has drops if self.is_manual_mode(): manual_has_drops = any( - campaign.can_earn_within(next_hour) and campaign.game == self._manual_target_game + campaign.can_earn_within(next_hour) + and campaign.game == self._manual_target_game for campaign in self.inventory ) if not manual_has_drops: @@ -348,7 +353,9 @@ class Twitch: # Move manual game to front of wanted_games for priority self.wanted_games.remove(self._manual_target_game) self.wanted_games.insert(0, self._manual_target_game) - logger.info(f"Manual mode: prioritizing game {self._manual_target_game.name}") + logger.info( + f"Manual mode: prioritizing game {self._manual_target_game.name}" + ) full_cleanup = True self.restart_watching() @@ -397,10 +404,7 @@ class Twitch: acl_channels: set[Channel] = set() next_hour = datetime.now(timezone.utc) + timedelta(hours=1) for campaign in self.inventory: - if ( - campaign.game in self.wanted_games - and campaign.can_earn_within(next_hour) - ): + if campaign.game in self.wanted_games and campaign.can_earn_within(next_hour): if campaign.allowed_channels: acl_channels.update(campaign.allowed_channels) else: @@ -466,10 +470,9 @@ class Twitch: for channel in channels.values(): # check if there's any channels we can watch first if self.can_watch(channel): - if ( - (active_campaign := self.get_active_campaign(channel)) is not None - and (active_drop := active_campaign.first_drop) is not None - ): + if (active_campaign := self.get_active_campaign(channel)) is not None and ( + active_drop := active_campaign.first_drop + ) is not None: active_drop.display(countdown=False, subone=True) break self.change_state(State.CHANNEL_SWITCH) @@ -483,7 +486,7 @@ class Twitch: ) elif self._state is State.CHANNEL_SWITCH: if self.settings.dump: - self.gui.close() + self.close() continue self.gui.status.update(_("gui", "status", "switching")) @@ -509,8 +512,14 @@ class Twitch: if channel.game == self._manual_target_game and self.can_watch(channel): new_watching = channel self._manual_target_channel = channel - game_name = self._manual_target_game.name if self._manual_target_game else "Unknown" - logger.info(f"Manual mode: switching to {channel.name} (same game: {game_name})") + game_name = ( + self._manual_target_game.name + if self._manual_target_game + else "Unknown" + ) + logger.info( + f"Manual mode: switching to {channel.name} (same game: {game_name})" + ) break # No channels available for manual game -> exit manual mode if new_watching is None: @@ -526,10 +535,9 @@ class Twitch: # Switch to new channel self.watch(new_watching) # Display the active drop for the new channel - if ( - (active_campaign := self.get_active_campaign(new_watching)) is not None - and (active_drop := active_campaign.first_drop) is not None - ): + if (active_campaign := self.get_active_campaign(new_watching)) is not None and ( + active_drop := active_campaign.first_drop + ) is not None: active_drop.display(countdown=False, subone=True) self._state_change.clear() elif watching_channel is not None and self.can_watch(watching_channel): @@ -618,7 +626,9 @@ class Twitch: return game_name = self._manual_target_game.name if self._manual_target_game else "Unknown" - logger.info(f"Exiting manual mode for game: {game_name}. Reason: {reason or 'User requested'}") + logger.info( + f"Exiting manual mode for game: {game_name}. Reason: {reason or 'User requested'}" + ) self._manual_target_channel = None self._manual_target_game = None @@ -640,7 +650,9 @@ class Twitch: return { "active": True, "game_name": self._manual_target_game.name if self._manual_target_game else "", - "channel_name": self._manual_target_channel.name if self._manual_target_channel else "" + "channel_name": self._manual_target_channel.name + if self._manual_target_channel + else "", } return {"active": False} @@ -705,7 +717,9 @@ class Twitch: self, game: Game, *, limit: int = 20, drops_enabled: bool = True ) -> list[Channel]: """Delegate to ChannelService.""" - return await self._channel_service.get_live_streams(game, limit=limit, drops_enabled=drops_enabled) + return await self._channel_service.get_live_streams( + game, limit=limit, drops_enabled=drops_enabled + ) async def bulk_check_online(self, channels: abc.Iterable[Channel]): """Delegate to ChannelService.""" diff --git a/src/exceptions.py b/src/exceptions.py index 8c26d56..94a3140 100644 --- a/src/exceptions.py +++ b/src/exceptions.py @@ -2,6 +2,7 @@ class MinerException(Exception): """ Base exception class for this application. """ + def __init__(self, *args: object): if args: super().__init__(*args) @@ -15,6 +16,7 @@ class ExitRequest(MinerException): Intended for internal use only. """ + def __init__(self): super().__init__("Application was requested to exit") @@ -25,6 +27,7 @@ class ReloadRequest(MinerException): Intended for internal use only. """ + def __init__(self): super().__init__("Application was requested to reload entirely") @@ -33,6 +36,7 @@ class RequestException(MinerException): """ Raised for cases where a web request doesn't return what we wanted it to. """ + def __init__(self, *args: object): if args: super().__init__(*args) @@ -46,6 +50,7 @@ class RequestInvalid(RequestException): Intended for internal use only. """ + def __init__(self): super().__init__("Request became invalid during its retry loop") @@ -59,6 +64,7 @@ class WebsocketClosed(RequestException): received: bool `True` if the closing was caused by our side receiving a close frame, `False` otherwise. """ + def __init__(self, *args: object, received: bool = False): if args: super().__init__(*args) @@ -71,6 +77,7 @@ class LoginException(RequestException): """ Raised when an exception occurs during login phase. """ + def __init__(self, *args: object): if args: super().__init__(*args) @@ -82,6 +89,7 @@ class CaptchaRequired(LoginException): """ The most dreaded thing about automated scripts... """ + def __init__(self): super().__init__("Captcha is required") @@ -90,5 +98,6 @@ class GQLException(RequestException): """ Raised when a GQL request returns an error response. """ + def __init__(self, message: str): super().__init__(message) diff --git a/src/i18n/translator.py b/src/i18n/translator.py index cca7c63..5803cb1 100644 --- a/src/i18n/translator.py +++ b/src/i18n/translator.py @@ -3,7 +3,7 @@ from __future__ import annotations from collections import abc from typing import TYPE_CHECKING, Any, TypedDict -from src.config import DEFAULT_LANG, IS_PACKAGED, LANG_PATH +from src.config import DEFAULT_LANG, LANG_PATH from src.exceptions import MinerException from src.utils.json_utils import json_load, json_save @@ -402,19 +402,19 @@ default_translation: Translation = { "2. Ensure your Twitch account is linked to all campaigns " "you're interested in mining.\n" "3. If you're interested in mining everything possible, " - "change the Priority Mode to anything other than \"Priority list only\" " - "and press on \"Reload\".\n" - "4. If you want to mine specific games first, use the \"Priority\" list " + 'change the Priority Mode to anything other than "Priority list only" ' + 'and press on "Reload".\n' + '4. If you want to mine specific games first, use the "Priority" list ' "to set up an ordered list of games of your choice. " "Games from the top of the list will be attempted to be mined first, " "before the ones lower down the list.\n" - "5. Keep the \"Priority mode\" selected as \"Priority list only\", " + '5. Keep the "Priority mode" selected as "Priority list only", ' "to avoid mining games that are not on the priority list. " "Or not - it's up to you.\n" - "6. Use the \"Exclude\" list to tell the application " + '6. Use the "Exclude" list to tell the application ' "which games should never be mined.\n" "7. Changing the contents of either of the lists, or changing " - "the \"Priority mode\", requires you to press on \"Reload\" " + 'the "Priority mode", requires you to press on "Reload" ' "for the changes to take an effect." ), }, @@ -428,9 +428,8 @@ class Translator: # start with (and always copy) the default translation self._translation: Translation = default_translation.copy() # if we're in dev, update the template English.json file - if not IS_PACKAGED: - default_langpath = LANG_PATH.joinpath(f"{DEFAULT_LANG}.json") - json_save(default_langpath, default_translation) + default_langpath = LANG_PATH.joinpath(f"{DEFAULT_LANG}.json") + json_save(default_langpath, default_translation) self._translation["language_name"] = DEFAULT_LANG # load available translation names for filepath in LANG_PATH.glob("*.json"): diff --git a/src/models/benefit.py b/src/models/benefit.py index 7a328a3..02c3485 100644 --- a/src/models/benefit.py +++ b/src/models/benefit.py @@ -10,6 +10,7 @@ if TYPE_CHECKING: class BenefitType(Enum): """Type of drop benefit (reward).""" + UNKNOWN = "UNKNOWN" BADGE = "BADGE" EMOTE = "EMOTE" @@ -21,6 +22,7 @@ class BenefitType(Enum): class Benefit: """Represents a reward/benefit from a completed drop.""" + __slots__ = ("id", "name", "type", "image_url") def __init__(self, data: JsonType): diff --git a/src/models/campaign.py b/src/models/campaign.py index 7f3d207..4d4dcb4 100644 --- a/src/models/campaign.py +++ b/src/models/campaign.py @@ -41,7 +41,8 @@ class DropsCampaign: allowed: JsonType = data["allow"] self.allowed_channels: list[Channel] = ( [Channel.from_acl(twitch, channel_data) for channel_data in allowed["channels"]] - if allowed["channels"] and allowed.get("isEnabled", True) else [] + if allowed["channels"] and allowed.get("isEnabled", True) + else [] ) self.timed_drops: dict[str, TimedDrop] = { drop_data["id"]: TimedDrop(self, drop_data, claimed_benefits) @@ -139,13 +140,15 @@ class DropsCampaign: self.eligible # account is eligible and self.active # campaign is active (and valid) and ( - channel is None or ( # channel isn't specified, + channel is None + or ( # channel isn't specified, # or there's no ACL, or the channel is in the ACL (not self.allowed_channels or channel in self.allowed_channels) # and the channel is live and playing the campaign's game and ( ignore_channel_status - or channel.game is not None and channel.game == self.game + or channel.game is not None + and channel.game == self.game ) ) ) @@ -163,13 +166,10 @@ class DropsCampaign: ) ) - def can_earn( - self, channel: Channel | None = None, ignore_channel_status: bool = False - ) -> bool: + def can_earn(self, channel: Channel | None = None, ignore_channel_status: bool = False) -> bool: # True if any of the containing drops can be earned - return ( - self._base_can_earn(channel, ignore_channel_status) - and any(drop._base_can_earn() for drop in self.drops) + return self._base_can_earn(channel, ignore_channel_status) and any( + drop._base_can_earn() for drop in self.drops ) def can_earn_within(self, stamp: datetime) -> bool: @@ -193,7 +193,7 @@ class DropsCampaign: # Executes if any drop's extra_current_minutes reach MAX_ESTIMATED_MINUTES # TODO: Figure out a better way to handle this case logger.warning( - f"At least one of the drops in campaign \"{self.name}({self.game.name})\" " + f'At least one of the drops in campaign "{self.name}({self.game.name})" ' "has reached the maximum extra minutes limit!" ) self._twitch.change_state(State.CHANNEL_SWITCH) diff --git a/src/models/channel.py b/src/models/channel.py index d45a570..ee120cf 100644 --- a/src/models/channel.py +++ b/src/models/channel.py @@ -60,7 +60,7 @@ class Stream: "muted": False, "player": "site", "user_id": self.channel._twitch._auth_state.user_id, - } + }, } ] return {"data": (b64encode(json_minify(payload).encode("utf8"))).decode("utf8")} @@ -107,7 +107,7 @@ class Stream: token_value = token_data["value"] token_signature = token_data["signature"] # using the token, query Twitch for a list of all available stream qualities - available_qualities: str = '' + available_qualities: str = "" try: async with self.channel._twitch.request( "GET", @@ -128,7 +128,7 @@ class Stream: if isinstance(available_json, list): available_json = available_json[0] if "error" in available_json: - logger.error(f"Stream URL get error: \"{available_json['error']}\"") + logger.error(f'Stream URL get error: "{available_json["error"]}"') self.channel.set_offline() return None # pick the last URL from the list, usually with the lowest quality stream @@ -141,8 +141,15 @@ class Stream: class Channel: __slots__ = ( - "_twitch", "_gui_channels", "id", "_login", "_display_name", "_spade_url", - "_stream", "_pending_stream_up", "acl_based" + "_twitch", + "_gui_channels", + "id", + "_login", + "_display_name", + "_spade_url", + "_stream", + "_pending_stream_up", + "acl_based", ) def __init__( @@ -289,9 +296,7 @@ class Channel: For mobile view, spade_url is available immediately from the page, skipping step #2. """ - SETTINGS_PATTERN: str = ( - r'src="(https://[\w.]+/config/settings\.[0-9a-f]{32}\.js)"' - ) + SETTINGS_PATTERN: str = r'src="(https://[\w.]+/config/settings\.[0-9a-f]{32}\.js)"' SPADE_PATTERN: str = ( r'"spade_?url": ?"(https://video-edge-[.\w\-/]+\.ts(?:\?allow_stream=true)?)"' ) @@ -441,7 +446,7 @@ class Channel: # the response may contain some invalid JSON with duplicate double quotes # in the value strings: we need to get rid of them by removing the "url" key entirely # if no JSON can be found within the response, this is a NOOP - available_chunks = re.sub(r'"url": ?".+}",', '', available_chunks) + available_chunks = re.sub(r'"url": ?".+}",', "", available_chunks) # try to decode the suspected JSON try: available_json: JsonType = json.loads(available_chunks) @@ -453,7 +458,7 @@ class Channel: if isinstance(available_json, list): available_json = available_json[0] if "error" in available_json: - logger.error(f"Send watch error: \"{available_json['error']}\"") + logger.error(f'Send watch error: "{available_json["error"]}"') return False # the list contains ~10-13 chunks of the stream at 2s intervals, # pick the last chunk URL available. Ensure it's not the end-of-stream tag, diff --git a/src/models/drop.py b/src/models/drop.py index 2c03ff9..94ddaa0 100644 --- a/src/models/drop.py +++ b/src/models/drop.py @@ -22,12 +22,12 @@ if TYPE_CHECKING: logger = logging.getLogger("TwitchDrops") -DIMS_PATTERN = re.compile(r'-\d+x\d+(?=\.(?:jpg|png|gif)$)', re.I) +DIMS_PATTERN = re.compile(r"-\d+x\d+(?=\.(?:jpg|png|gif)$)", re.I) def remove_dimensions(url: str) -> str: """Remove dimension suffix from Twitch image URLs (e.g., -285x380.jpg).""" - return DIMS_PATTERN.sub('', url) + return DIMS_PATTERN.sub("", url) class BaseDrop: @@ -71,7 +71,7 @@ class BaseDrop: elif self.can_earn(): additional = ", can_earn=True" else: - additional = '' + additional = "" return f"Drop({self.rewards_text()}{additional})" @property @@ -107,11 +107,9 @@ class BaseDrop: and self.starts_at < stamp ) - def can_earn( - self, channel: Channel | None = None, ignore_channel_status: bool = False - ) -> bool: - return ( - self._base_can_earn() and self.campaign._base_can_earn(channel, ignore_channel_status) + def can_earn(self, channel: Channel | None = None, ignore_channel_status: bool = False) -> bool: + return self._base_can_earn() and self.campaign._base_can_earn( + channel, ignore_channel_status ) @property @@ -152,7 +150,7 @@ class BaseDrop: # two different claim texts, becase a new line after the game name # looks ugly in the output window - replace it with a space self._twitch.print( - _("status", "claimed_drop").format(drop=claim_text.replace('\n', ' ')) + _("status", "claimed_drop").format(drop=claim_text.replace("\n", " ")) ) self._twitch.gui.tray.notify(claim_text, _("gui", "tray", "notification_title")) else: @@ -183,9 +181,9 @@ class BaseDrop: elif "claimDropRewards" in data: if not data["claimDropRewards"]: return False - elif ( - data["claimDropRewards"]["status"] - in ("ELIGIBLE_FOR_ALL", "DROP_INSTANCE_ALREADY_CLAIMED") + elif data["claimDropRewards"]["status"] in ( + "ELIGIBLE_FOR_ALL", + "DROP_INSTANCE_ALREADY_CLAIMED", ): return True return False @@ -211,11 +209,11 @@ class TimedDrop(BaseDrop): elif self.can_earn(): additional = ", can_earn=True" else: - additional = '' + additional = "" if 0 < self.current_minutes < self.required_minutes: minutes = f", {self.current_minutes}/{self.required_minutes}" else: - minutes = '' + minutes = "" return f"Drop({self.rewards_text()}{minutes}{additional})" @property @@ -257,6 +255,7 @@ class TimedDrop(BaseDrop): @property def availability(self) -> float: import math + now = datetime.now(timezone.utc) if self.required_minutes > 0 and self.total_remaining_minutes > 0 and now < self.ends_at: return ((self.ends_at - now).total_seconds() / 60) / self.total_remaining_minutes diff --git a/src/models/game.py b/src/models/game.py index 3a9f312..4108739 100644 --- a/src/models/game.py +++ b/src/models/game.py @@ -40,9 +40,9 @@ class Game: Converts the game name into a slug, useable for the GQL API. """ # remove specific characters - slug_text = re.sub(r'\'', '', self.name.lower()) + slug_text = re.sub(r"\'", "", self.name.lower()) # remove non alpha-numeric characters - slug_text = re.sub(r'\W+', '-', slug_text) + slug_text = re.sub(r"\W+", "-", slug_text) # strip and collapse dashes - slug_text = re.sub(r'-{2,}', '-', slug_text.strip('-')) + slug_text = re.sub(r"-{2,}", "-", slug_text.strip("-")) return slug_text diff --git a/src/services/channel_service.py b/src/services/channel_service.py index dac4c7e..74d79ef 100644 --- a/src/services/channel_service.py +++ b/src/services/channel_service.py @@ -108,14 +108,16 @@ class ChannelService: try: response = await self._twitch.gql_request( - GQL_OPERATIONS["GameDirectory"].with_variables({ - "limit": limit, - "slug": game.slug, - "options": { - "includeRestricted": ["SUB_ONLY_LIVE"], - "systemFilters": filters, - }, - }) + GQL_OPERATIONS["GameDirectory"].with_variables( + { + "limit": limit, + "slug": game.slug, + "options": { + "includeRestricted": ["SUB_ONLY_LIVE"], + "systemFilters": filters, + }, + } + ) ) except GQLException as exc: raise MinerException(f"Game: {game.slug}") from exc diff --git a/src/services/inventory_service.py b/src/services/inventory_service.py index 90d99ba..53a7dd1 100644 --- a/src/services/inventory_service.py +++ b/src/services/inventory_service.py @@ -8,16 +8,14 @@ managing the inventory state, and determining active campaigns. from __future__ import annotations import asyncio -import json import logging -from copy import deepcopy from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING, Any from dateutil.parser import isoparse from src.api import GQLClient -from src.config import DUMP_PATH, GQL_OPERATIONS +from src.config import GQL_OPERATIONS from src.exceptions import ExitRequest from src.i18n import _ from src.models import DropsCampaign @@ -111,8 +109,7 @@ class InventoryService: # this contains claimed benefit edge IDs, not drop IDs claimed_benefits: dict[str, datetime] = { - b["id"]: isoparse(b["lastAwardedAt"]) - for b in inventory["gameEventDrops"] + b["id"]: isoparse(b["lastAwardedAt"]) for b in inventory["gameEventDrops"] } inventory_data: dict[str, JsonType] = {c["id"]: c for c in ongoing_campaigns} @@ -150,30 +147,6 @@ class InventoryService: if inventory_data[campaign_id]["game"] is None: del inventory_data[campaign_id] - if self._twitch.settings.dump: - # dump the campaigns data to the dump file - with open(DUMP_PATH, 'a', encoding="utf8") as file: - # we need to pre-process the inventory dump a little - dump_data: JsonType = deepcopy(inventory_data) - for campaign_data in dump_data.values(): - # replace ACL lists with a simple text description - if ( - campaign_data["allow"] - and campaign_data["allow"].get("isEnabled", True) - and campaign_data["allow"]["channels"] - ): - # simply count the channels included in the ACL - campaign_data["allow"]["channels"] = ( - f"{len(campaign_data['allow']['channels'])} channels" - ) - # replace drop instance IDs, so they don't include user IDs - for drop_data in campaign_data["timeBasedDrops"]: - if "self" in drop_data and drop_data["self"]["dropInstanceID"]: - drop_data["self"]["dropInstanceID"] = "..." - json.dump(dump_data, file, indent=4, sort_keys=True) - file.write("\n\n") # add 2x new line spacer - json.dump(claimed_benefits, file, indent=4, sort_keys=True, default=str) - # use the merged data to create campaign objects campaigns: list[DropsCampaign] = [ DropsCampaign(self._twitch, campaign_data, claimed_benefits) @@ -212,12 +185,12 @@ class InventoryService: for i, coro in enumerate(asyncio.as_completed(add_campaign_tasks), start=1): await coro status_update( - _("gui", "status", "adding_campaigns").format( - counter=f"({i}/{len(campaigns)})" - ) + _("gui", "status", "adding_campaigns").format(counter=f"({i}/{len(campaigns)})") ) # this is needed here explicitly, because cache reads from disk don't raise this - if self._twitch.gui.close_requested: + from src.config import State + + if self._twitch._state == State.EXIT: raise ExitRequest() except Exception: # asyncio.as_completed doesn't cancel tasks on errors diff --git a/src/services/maintenance.py b/src/services/maintenance.py index 6731023..b632caf 100644 --- a/src/services/maintenance.py +++ b/src/services/maintenance.py @@ -57,7 +57,9 @@ class MaintenanceService: 3. After reaching the next hour boundary, request inventory reload """ now = datetime.now(timezone.utc) - next_period = now + timedelta(minutes=self._twitch.settings.minimum_refresh_interval_minutes) + next_period = now + timedelta( + minutes=self._twitch.settings.minimum_refresh_interval_minutes + ) while True: # exit if there's no need to repeat the loop @@ -75,7 +77,7 @@ class MaintenanceService: ( "Maintenance task waiting until: " f"{next_trigger.astimezone().strftime('%X')} ({trigger_type})" - ) + ), ) await asyncio.sleep((next_trigger - now).total_seconds()) diff --git a/src/services/message_handlers.py b/src/services/message_handlers.py index ee64512..05dcb7d 100644 --- a/src/services/message_handlers.py +++ b/src/services/message_handlers.py @@ -110,7 +110,7 @@ class MessageHandlerService: if message["old_game"] != message["game"]: game_change = f", game changed: {message['old_game']} -> {message['game']}" else: - game_change = '' + game_change = "" logger.log(CALL, f"Channel update from websocket: {channel.name}{game_change}") @@ -223,9 +223,9 @@ class MessageHandlerService: {"channelID": str(watching_channel.id)} ) ) - drop_data: JsonType | None = ( - context["data"]["currentUser"]["dropCurrentSession"] - ) + drop_data: JsonType | None = context["data"]["currentUser"][ + "dropCurrentSession" + ] if drop_data is None or drop_data["dropID"] != drop.id: break await asyncio.sleep(2) diff --git a/src/services/watch_service.py b/src/services/watch_service.py index 13df7b2..87b2116 100644 --- a/src/services/watch_service.py +++ b/src/services/watch_service.py @@ -214,13 +214,11 @@ class WatchService: # Solution 1: use GQL to query for the currently mined drop status try: context = await self._twitch.gql_request( - GQL_OPERATIONS["CurrentDrop"].with_variables( - {"channelID": str(channel.id)} - ) - ) - drop_data: JsonType | None = ( - context["data"]["currentUser"]["dropCurrentSession"] + GQL_OPERATIONS["CurrentDrop"].with_variables({"channelID": str(channel.id)}) ) + drop_data: JsonType | None = context["data"]["currentUser"][ + "dropCurrentSession" + ] except GQLException: drop_data = None diff --git a/src/utils/async_helpers.py b/src/utils/async_helpers.py index ee97a6e..2eb28a5 100644 --- a/src/utils/async_helpers.py +++ b/src/utils/async_helpers.py @@ -37,7 +37,7 @@ def format_traceback(exc: BaseException, **kwargs: Any) -> str: Like `traceback.print_exc` but returns a string. Uses the passed-in exception. Any additional `**kwargs` are passed to the underlaying `traceback.format_exception`. """ - return ''.join(traceback.format_exception(type(exc), exc, exc.__traceback__, **kwargs)) + return "".join(traceback.format_exception(type(exc), exc, exc.__traceback__, **kwargs)) def task_wrapper( @@ -53,8 +53,9 @@ def task_wrapper( Handles ExitRequest and ReloadRequest silently, logs other exceptions. Critical tasks will attempt to find and close the Twitch instance on failure. """ + def decorator( - afunc: abc.Callable[_P, abc.Coroutine[Any, Any, _T]] + afunc: abc.Callable[_P, abc.Coroutine[Any, Any, _T]], ) -> abc.Callable[_P, abc.Coroutine[Any, Any, _T]]: @wraps(afunc) async def wrapper(*args: _P.args, **kwargs: _P.kwargs): @@ -69,6 +70,7 @@ def task_wrapper( # there isn't an easy and sure way to obtain the Twitch instance here, # but we can improvise finding it from src.core.client import Twitch # cyclic import + probe = args and args[0] or None # extract from 'self' arg if isinstance(probe, Twitch): probe.close() @@ -77,7 +79,9 @@ def task_wrapper( if isinstance(probe, Twitch): probe.close() raise # raise up to the wrapping task + return wrapper + if afunc is None: return decorator return decorator(afunc) diff --git a/src/utils/backoff.py b/src/utils/backoff.py index aae211c..636b1e9 100644 --- a/src/utils/backoff.py +++ b/src/utils/backoff.py @@ -69,8 +69,7 @@ class ExponentialBackoff: def __next__(self) -> float: """Generate the next delay value.""" value: float = ( - pow(self.base, self.steps) - * random.uniform(self.variance_min, self.variance_max) + pow(self.base, self.steps) * random.uniform(self.variance_min, self.variance_max) + self.shift ) if value > self.maximum: diff --git a/src/utils/json_utils.py b/src/utils/json_utils.py index e4dee5a..6f72581 100644 --- a/src/utils/json_utils.py +++ b/src/utils/json_utils.py @@ -28,7 +28,7 @@ SERIALIZE_ENV: dict[str, Callable[[Any], object]] = { def json_minify(data: JsonType | list[JsonType]) -> str: """Return minified JSON string (no whitespace) for payload usage.""" - return json.dumps(data, separators=(',', ':')) + return json.dumps(data, separators=(",", ":")) def _serialize(obj: Any) -> Any: @@ -155,5 +155,5 @@ def json_save(path: Path, contents: Mapping[Any, Any], *, sort: bool = False) -> contents: Data to serialize sort: If True, sort keys alphabetically """ - with open(path, 'w', encoding="utf8") as file: + with open(path, "w", encoding="utf8") as file: json.dump(contents, file, default=_serialize, sort_keys=sort, indent=4) diff --git a/src/utils/string_utils.py b/src/utils/string_utils.py index 8ccc274..8359df0 100644 --- a/src/utils/string_utils.py +++ b/src/utils/string_utils.py @@ -18,14 +18,14 @@ _T = TypeVar("_T") def create_nonce(chars: str, length: int) -> str: """Generate a random nonce string of specified length from given characters.""" - return ''.join(random.choices(chars, k=length)) + return "".join(random.choices(chars, k=length)) def chunk(to_chunk: abc.Iterable[_T], chunk_length: int) -> abc.Generator[list[_T], None, None]: """Split an iterable into chunks of a specified length.""" list_to_chunk: list[_T] = list(to_chunk) for i in range(0, len(list_to_chunk), chunk_length): - yield list_to_chunk[i:i + chunk_length] + yield list_to_chunk[i : i + chunk_length] def deduplicate(iterable: abc.Iterable[_T]) -> list[_T]: diff --git a/src/web/app.py b/src/web/app.py index dc51064..2fed696 100644 --- a/src/web/app.py +++ b/src/web/app.py @@ -36,10 +36,7 @@ app.add_middleware( # Create Socket.IO server sio = socketio.AsyncServer( - async_mode='asgi', - cors_allowed_origins='*', - logger=False, - engineio_logger=False + async_mode="asgi", cors_allowed_origins="*", logger=False, engineio_logger=False ) # Wrap with ASGI app @@ -63,7 +60,7 @@ def set_managers(gui: WebGUIManager, twitch: Twitch): class LoginRequest(BaseModel): username: str password: str - token: str = '' + token: str = "" class ChannelSelectRequest(BaseModel): @@ -81,18 +78,21 @@ class SettingsUpdate(BaseModel): # ==================== REST API Endpoints ==================== + @app.get("/", response_class=HTMLResponse) async def serve_index(): """Serve the main web interface""" # Web files are in project_root/web/, we're in project_root/src/web/ web_dir = Path(__file__).parent.parent.parent / "web" index_file = web_dir / "index.html" - logger.debug(f"Looking for web files: __file__={__file__}, web_dir={web_dir}, index_file={index_file}, exists={index_file.exists()}") + logger.debug( + f"Looking for web files: __file__={__file__}, web_dir={web_dir}, index_file={index_file}, exists={index_file.exists()}" + ) if index_file.exists(): return FileResponse(index_file) return HTMLResponse( content=f"

Twitch Drops Miner

Web interface files not found. Please check installation.

Debug: Looking for {index_file}

", - status_code=500 + status_code=500, ) @@ -105,8 +105,7 @@ async def get_status(): return { "status": gui_manager.status.get(), "login": gui_manager.login.get_status(), - "close_requested": gui_manager.close_requested, - "manual_mode": twitch_client.get_manual_mode_info() + "manual_mode": twitch_client.get_manual_mode_info(), } @@ -116,9 +115,7 @@ async def get_channels(): if not gui_manager: raise HTTPException(status_code=503, detail="GUI not initialized") - return { - "channels": gui_manager.channels.get_channels() - } + return {"channels": gui_manager.channels.get_channels()} @app.post("/api/channels/select") @@ -144,6 +141,7 @@ async def select_channel(request: ChannelSelectRequest): # Trigger channel switch to apply the selection from src.config import State + twitch_client.change_state(State.CHANNEL_SWITCH) return {"success": True} @@ -155,9 +153,7 @@ async def get_campaigns(): if not gui_manager: raise HTTPException(status_code=503, detail="GUI not initialized") - return { - "campaigns": gui_manager.inv.get_campaigns() - } + return {"campaigns": gui_manager.inv.get_campaigns()} @app.get("/api/console") @@ -166,9 +162,7 @@ async def get_console_history(): if not gui_manager: raise HTTPException(status_code=503, detail="GUI not initialized") - return { - "lines": gui_manager.output.get_history() - } + return {"lines": gui_manager.output.get_history()} @app.get("/api/settings") @@ -197,11 +191,7 @@ async def submit_login(login_data: LoginRequest): if not gui_manager: raise HTTPException(status_code=503, detail="GUI not initialized") - gui_manager.login.submit_login( - login_data.username, - login_data.password, - login_data.token - ) + gui_manager.login.submit_login(login_data.username, login_data.password, login_data.token) return {"success": True} @@ -223,6 +213,7 @@ async def trigger_reload(): raise HTTPException(status_code=503, detail="Twitch client not initialized") from src.config import State + twitch_client.change_state(State.INVENTORY_FETCH) return {"success": True} @@ -230,10 +221,10 @@ async def trigger_reload(): @app.post("/api/close") async def trigger_close(): """Trigger application shutdown""" - if not gui_manager: - raise HTTPException(status_code=503, detail="GUI not initialized") + if not twitch_client: + raise HTTPException(status_code=503, detail="Twitch client not initialized") - gui_manager.close() + twitch_client.close() return {"success": True} @@ -252,6 +243,7 @@ async def exit_manual_mode(): # ==================== Socket.IO Events ==================== + @sio.event async def connect(sid, environ): """Client connected""" @@ -259,16 +251,20 @@ async def connect(sid, environ): # Send initial state to new client if gui_manager and twitch_client: - await sio.emit("initial_state", { - "status": gui_manager.status.get(), - "channels": gui_manager.channels.get_channels(), - "campaigns": gui_manager.inv.get_campaigns(), - "console": gui_manager.output.get_history(), - "settings": gui_manager.settings.get_settings(), - "login": gui_manager.login.get_status(), - "manual_mode": twitch_client.get_manual_mode_info(), - "current_drop": gui_manager.progress.get_current_drop() - }, room=sid) + await sio.emit( + "initial_state", + { + "status": gui_manager.status.get(), + "channels": gui_manager.channels.get_channels(), + "campaigns": gui_manager.inv.get_campaigns(), + "console": gui_manager.output.get_history(), + "settings": gui_manager.settings.get_settings(), + "login": gui_manager.login.get_status(), + "manual_mode": twitch_client.get_manual_mode_info(), + "current_drop": gui_manager.progress.get_current_drop(), + }, + room=sid, + ) @sio.event @@ -289,6 +285,7 @@ async def request_reload(sid): """Client requested application reload""" if twitch_client: from src.config import State + twitch_client.change_state(State.INVENTORY_FETCH) @@ -306,13 +303,8 @@ async def run_server(host: str = "0.0.0.0", port: int = 8080): """Run the web server (used for development/testing)""" global _server_instance import uvicorn - config = uvicorn.Config( - socket_app, - host=host, - port=port, - log_level="info", - access_log=False - ) + + config = uvicorn.Config(socket_app, host=host, port=port, log_level="info", access_log=False) server = uvicorn.Server(config) _server_instance = server try: diff --git a/src/web/gui_manager.py b/src/web/gui_manager.py index f19f6c8..e42fb6b 100644 --- a/src/web/gui_manager.py +++ b/src/web/gui_manager.py @@ -6,7 +6,6 @@ import asyncio import logging from typing import TYPE_CHECKING -from src.i18n import _ from src.models.game import Game from src.web.managers.broadcaster import WebSocketBroadcaster from src.web.managers.cache import ImageCache @@ -45,7 +44,6 @@ class WebGUIManager: def __init__(self, twitch: Twitch): self._twitch: Twitch = twitch self._broadcaster = WebSocketBroadcaster() - self._close_requested = asyncio.Event() # Create component managers self.status = StatusManager(self._broadcaster) @@ -75,79 +73,6 @@ class WebGUIManager: """ self._broadcaster.set_socketio(sio) - @property - def close_requested(self) -> bool: - """Check if application closure has been requested. - - Returns: - True if close was requested via GUI - """ - return self._close_requested.is_set() - - def start(self): - """Start the GUI (logs ready message in web mode).""" - logger.info("Web GUI started - access via browser") - self.status.update(_("gui", "status", "ready")) - - def close(self, *args) -> int: - """Request application closure. - - Returns: - Exit code (0 for normal shutdown) - """ - self._close_requested.set() - # notify client we're supposed to close - self._twitch.close() - logger.info("Close requested via web GUI") - return 0 - - def prevent_close(self): - """Prevent window from closing (no-op in web mode). - - In web mode, users can still navigate away from the page. - """ - pass - - def stop(self): - """Stop the GUI.""" - logger.info("Web GUI stopped") - - def close_window(self): - """Close the GUI window (no-op in web mode).""" - pass - - async def wait_until_closed(self): - """Wait until the GUI is closed by the user.""" - await self._close_requested.wait() - - async def coro_unless_closed(self, coro): - """Run a coroutine unless the GUI is closed. - - Races the provided coroutine against the close event, canceling - the coroutine if close is requested and raising ExitRequest. - - Args: - coro: Coroutine to run - - Returns: - Result of the coroutine if it completes first - - Raises: - ExitRequest: If close is requested during execution - """ - # Race the coroutine against the close event - tasks = [asyncio.ensure_future(coro), asyncio.ensure_future(self._close_requested.wait())] - done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - # Cancel any pending tasks - for task in pending: - task.cancel() - # If close was requested, raise ExitRequest - if self._close_requested.is_set(): - from src.exceptions import ExitRequest - raise ExitRequest() - # Otherwise return the result - return await next(iter(done)) - def save(self, *, force: bool = False): """Save GUI state and settings. @@ -195,9 +120,7 @@ class WebGUIManager: Args: sound: Whether to play notification sound """ - asyncio.create_task( - self._broadcaster.emit("attention_required", {"sound": sound}) - ) + asyncio.create_task(self._broadcaster.emit("attention_required", {"sound": sound})) def select_channel(self, channel_id: int): """Select a channel (called by webapp when user clicks channel). @@ -223,9 +146,7 @@ class WebGUIManager: Args: dark_mode: Whether to use dark theme """ - asyncio.create_task( - self._broadcaster.emit("theme_change", {"dark_mode": dark_mode}) - ) + asyncio.create_task(self._broadcaster.emit("theme_change", {"dark_mode": dark_mode})) def broadcast_manual_mode_change(self, manual_mode_info: dict): """Broadcast manual mode status change to connected clients. @@ -233,9 +154,7 @@ class WebGUIManager: Args: manual_mode_info: Manual mode status from get_manual_mode_info() """ - asyncio.create_task( - self._broadcaster.emit("manual_mode_update", manual_mode_info) - ) + asyncio.create_task(self._broadcaster.emit("manual_mode_update", manual_mode_info)) # Type aliases for backwards compatibility with code that imports from gui diff --git a/src/web/managers/campaigns.py b/src/web/managers/campaigns.py index 47274f6..840ada4 100644 --- a/src/web/managers/campaigns.py +++ b/src/web/managers/campaigns.py @@ -34,25 +34,26 @@ class CampaignProgressManager: self._remaining_seconds = remaining_seconds if drop: asyncio.create_task( - self._broadcaster.emit("drop_progress", { - "drop_id": drop.id, - "drop_name": drop.name, - "campaign_name": drop.campaign.name, - "campaign_id": drop.campaign.id, - "game_name": drop.campaign.game.name, - "current_minutes": drop.current_minutes, - "required_minutes": drop.required_minutes, - "progress": drop.progress, - "remaining_seconds": remaining_seconds - }) + self._broadcaster.emit( + "drop_progress", + { + "drop_id": drop.id, + "drop_name": drop.name, + "campaign_name": drop.campaign.name, + "campaign_id": drop.campaign.id, + "game_name": drop.campaign.game.name, + "current_minutes": drop.current_minutes, + "required_minutes": drop.required_minutes, + "progress": drop.progress, + "remaining_seconds": remaining_seconds, + }, + ) ) def stop_timer(self): """Stop the progress timer and clear the current drop.""" self._current_drop = None - asyncio.create_task( - self._broadcaster.emit("drop_progress_stop", {}) - ) + asyncio.create_task(self._broadcaster.emit("drop_progress_stop", {})) def minute_almost_done(self) -> bool: """Check if the current progress minute is almost complete. @@ -81,5 +82,5 @@ class CampaignProgressManager: "current_minutes": drop.current_minutes, "required_minutes": drop.required_minutes, "progress": drop.progress, - "remaining_seconds": self._remaining_seconds + "remaining_seconds": self._remaining_seconds, } diff --git a/src/web/managers/channels.py b/src/web/managers/channels.py index 8d0a0aa..cc5f60c 100644 --- a/src/web/managers/channels.py +++ b/src/web/managers/channels.py @@ -43,7 +43,7 @@ class ChannelListManager: "online": channel.online, "drops_enabled": channel.drops_enabled, "acl_based": channel.acl_based, - "watching": channel.id == self._watching_id + "watching": channel.id == self._watching_id, } self._channels[channel.id] = channel_data asyncio.create_task( @@ -58,16 +58,12 @@ class ChannelListManager: """ if channel.id in self._channels: del self._channels[channel.id] - asyncio.create_task( - self._broadcaster.emit("channel_remove", {"id": channel.id}) - ) + asyncio.create_task(self._broadcaster.emit("channel_remove", {"id": channel.id})) def clear(self): """Clear all channels from the display list.""" self._channels.clear() - asyncio.create_task( - self._broadcaster.emit("channels_clear", {}) - ) + asyncio.create_task(self._broadcaster.emit("channels_clear", {})) def set_watching(self, channel: Channel): """Mark a channel as currently being watched. @@ -76,16 +72,12 @@ class ChannelListManager: channel: The channel now being watched """ self._watching_id = channel.id - asyncio.create_task( - self._broadcaster.emit("channel_watching", {"id": channel.id}) - ) + asyncio.create_task(self._broadcaster.emit("channel_watching", {"id": channel.id})) def clear_watching(self): """Clear the currently watched channel indicator.""" self._watching_id = None - asyncio.create_task( - self._broadcaster.emit("channel_watching_clear", {}) - ) + asyncio.create_task(self._broadcaster.emit("channel_watching_clear", {})) def get_selection(self) -> Channel | None: """Get user's channel selection from web GUI. @@ -103,7 +95,8 @@ class ChannelListManager: # Get the Channel object from the Twitch client from src.core.client import Twitch - if hasattr(self._gui_manager, '_twitch'): + + if hasattr(self._gui_manager, "_twitch"): twitch: Twitch = self._gui_manager._twitch return twitch.channels.get(selected_id) @@ -133,7 +126,7 @@ class ChannelListManager: "online": channel.online, "drops_enabled": channel.drops_enabled, "acl_based": channel.acl_based, - "watching": channel.id == self._watching_id + "watching": channel.id == self._watching_id, } new_channels[channel.id] = channel_data channels_data.append(channel_data) diff --git a/src/web/managers/console.py b/src/web/managers/console.py index 0f21823..9e2c7ce 100644 --- a/src/web/managers/console.py +++ b/src/web/managers/console.py @@ -32,9 +32,7 @@ class ConsoleOutputManager: timestamp = datetime.now().strftime("%H:%M:%S") line = f"[{timestamp}] {message}" self._buffer.append(line) - asyncio.create_task( - self._broadcaster.emit("console_output", {"message": line}) - ) + asyncio.create_task(self._broadcaster.emit("console_output", {"message": line})) def get_history(self) -> list[str]: """Get the current console history buffer. diff --git a/src/web/managers/inventory.py b/src/web/managers/inventory.py index ba74402..c736bef 100644 --- a/src/web/managers/inventory.py +++ b/src/web/managers/inventory.py @@ -28,9 +28,7 @@ class InventoryManager: def clear(self): """Clear all campaigns from inventory.""" self._campaigns.clear() - asyncio.create_task( - self._broadcaster.emit("inventory_clear", {}) - ) + asyncio.create_task(self._broadcaster.emit("inventory_clear", {})) async def add_campaign(self, campaign: DropsCampaign): """Add a campaign to the inventory display. @@ -43,18 +41,20 @@ class InventoryManager: drops_data = [] for drop in campaign.drops: - drops_data.append({ - "id": drop.id, - "name": drop.name, - "current_minutes": drop.current_minutes, - "required_minutes": drop.required_minutes, - "progress": drop.progress, - "is_claimed": drop.is_claimed, - "can_claim": drop.can_claim, - "rewards": drop.rewards_text(), - "starts_at": drop.starts_at.isoformat(), - "ends_at": drop.ends_at.isoformat() - }) + drops_data.append( + { + "id": drop.id, + "name": drop.name, + "current_minutes": drop.current_minutes, + "required_minutes": drop.required_minutes, + "progress": drop.progress, + "is_claimed": drop.is_claimed, + "can_claim": drop.can_claim, + "rewards": drop.rewards_text(), + "starts_at": drop.starts_at.isoformat(), + "ends_at": drop.ends_at.isoformat(), + } + ) campaign_data = { "id": campaign.id, @@ -70,7 +70,7 @@ class InventoryManager: "expired": campaign.expired, "claimed_drops": campaign.claimed_drops, "total_drops": campaign.total_drops, - "drops": drops_data + "drops": drops_data, } self._campaigns[campaign.id] = campaign_data @@ -90,18 +90,19 @@ class InventoryManager: # Find and update the drop in the campaign for drop_data in self._campaigns[campaign_id]["drops"]: if drop_data["id"] == drop.id: - drop_data.update({ - "current_minutes": drop.current_minutes, - "required_minutes": drop.required_minutes, - "progress": drop.progress, - "is_claimed": drop.is_claimed, - "can_claim": drop.can_claim - }) + drop_data.update( + { + "current_minutes": drop.current_minutes, + "required_minutes": drop.required_minutes, + "progress": drop.progress, + "is_claimed": drop.is_claimed, + "can_claim": drop.can_claim, + } + ) asyncio.create_task( - self._broadcaster.emit("drop_update", { - "campaign_id": campaign_id, - "drop": drop_data - }) + self._broadcaster.emit( + "drop_update", {"campaign_id": campaign_id, "drop": drop_data} + ) ) break diff --git a/src/web/managers/login.py b/src/web/managers/login.py index e84b545..baf0d3a 100644 --- a/src/web/managers/login.py +++ b/src/web/managers/login.py @@ -17,6 +17,7 @@ if TYPE_CHECKING: @dataclass class LoginData: """Container for login credentials submitted by the user.""" + username: str password: str token: str @@ -36,7 +37,9 @@ class LoginFormManager: self._login_data: LoginData | None = None self._status = "Logged out" self._user_id: int | None = None - self._oauth_pending: dict[str, str] | None = None # Store OAuth code for late-connecting clients + self._oauth_pending: dict[str, str] | None = ( + None # Store OAuth code for late-connecting clients + ) def clear(self, login: bool = False, password: bool = False, token: bool = False): """Clear login form fields on the client side. @@ -47,11 +50,9 @@ class LoginFormManager: token: Clear the 2FA token field """ asyncio.create_task( - self._broadcaster.emit("login_clear", { - "login": login, - "password": password, - "token": token - }) + self._broadcaster.emit( + "login_clear", {"login": login, "password": password, "token": token} + ) ) def update(self, status: str, user_id: int | None): @@ -64,10 +65,7 @@ class LoginFormManager: self._status = status self._user_id = user_id asyncio.create_task( - self._broadcaster.emit("login_status", { - "status": status, - "user_id": user_id - }) + self._broadcaster.emit("login_status", {"status": status, "user_id": user_id}) ) async def ask_login(self) -> LoginData: @@ -79,8 +77,8 @@ class LoginFormManager: self.update(_("gui", "login", "required"), None) self._login_event.clear() await self._broadcaster.emit("login_required", {}) - # Use coro_unless_closed to handle shutdown during login - await self._manager.coro_unless_closed(self._login_event.wait()) + # Wait for user to submit login (will be cancelled on shutdown) + await self._login_event.wait() return self._login_data async def ask_enter_code(self, page_url, user_code: str): @@ -96,17 +94,14 @@ class LoginFormManager: self.update(_("gui", "login", "required"), None) self._login_event.clear() # Store OAuth code for late-connecting clients - self._oauth_pending = { - "url": str(page_url), - "code": user_code - } + self._oauth_pending = {"url": str(page_url), "code": user_code} await self._broadcaster.emit("oauth_code_required", self._oauth_pending) - # Use coro_unless_closed to handle shutdown during login - await self._manager.coro_unless_closed(self._login_event.wait()) + # Wait for user to confirm code entry (will be cancelled on shutdown) + await self._login_event.wait() # Clear OAuth state after confirmation self._oauth_pending = None - def submit_login(self, username: str, password: str, token: str = ''): + def submit_login(self, username: str, password: str, token: str = ""): """Submit login credentials (called by webapp when user submits form). Args: @@ -123,10 +118,7 @@ class LoginFormManager: Returns: Dictionary with status, user_id, and optional oauth_pending data """ - result = { - "status": self._status, - "user_id": self._user_id - } + result = {"status": self._status, "user_id": self._user_id} # Include OAuth code if pending if self._oauth_pending: result["oauth_pending"] = self._oauth_pending diff --git a/src/web/managers/settings.py b/src/web/managers/settings.py index 335ca37..e353f84 100644 --- a/src/web/managers/settings.py +++ b/src/web/managers/settings.py @@ -39,7 +39,7 @@ class SettingsManager: "proxy": str(self._settings.proxy), "tray_notifications": self._settings.tray_notifications, "connection_quality": self._settings.connection_quality, - "minimum_refresh_interval_minutes": self._settings.minimum_refresh_interval_minutes + "minimum_refresh_interval_minutes": self._settings.minimum_refresh_interval_minutes, } def update_settings(self, settings_data: dict[str, Any]): @@ -59,11 +59,11 @@ class SettingsManager: if "tray_notifications" in settings_data: self._settings.tray_notifications = settings_data["tray_notifications"] if "minimum_refresh_interval_minutes" in settings_data: - self._settings.minimum_refresh_interval_minutes = settings_data["minimum_refresh_interval_minutes"] + self._settings.minimum_refresh_interval_minutes = settings_data[ + "minimum_refresh_interval_minutes" + ] self._settings.alter() - asyncio.create_task( - self._broadcaster.emit("settings_updated", self.get_settings()) - ) + asyncio.create_task(self._broadcaster.emit("settings_updated", self.get_settings())) def set_games(self, games: set[Game]): """Update the list of available games for settings panel. @@ -74,6 +74,4 @@ class SettingsManager: # Store and broadcast available games for settings panel game_names = sorted([g.name for g in games]) self._available_games = game_names - asyncio.create_task( - self._broadcaster.emit("games_available", {"games": game_names}) - ) + asyncio.create_task(self._broadcaster.emit("games_available", {"games": game_names})) diff --git a/src/web/managers/status.py b/src/web/managers/status.py index f9900ff..53b01ca 100644 --- a/src/web/managers/status.py +++ b/src/web/managers/status.py @@ -24,9 +24,7 @@ class StatusManager: def update(self, status: str): """Update the current status and broadcast to all clients.""" self._current_status = status - asyncio.create_task( - self._broadcaster.emit("status_update", {"status": status}) - ) + asyncio.create_task(self._broadcaster.emit("status_update", {"status": status})) def get(self) -> str: """Get the current status message.""" @@ -65,11 +63,14 @@ class WebsocketStatusManager: # Broadcast the update asyncio.create_task( - self._broadcaster.emit("websocket_status", { - "idx": idx, - "status": self._websockets[idx]["status"], - "topics": self._websockets[idx]["topics"], - "total_websockets": len(self._websockets), - "total_topics": sum(ws["topics"] for ws in self._websockets.values()) - }) + self._broadcaster.emit( + "websocket_status", + { + "idx": idx, + "status": self._websockets[idx]["status"], + "topics": self._websockets[idx]["topics"], + "total_websockets": len(self._websockets), + "total_topics": sum(ws["topics"] for ws in self._websockets.values()), + }, + ) ) diff --git a/src/web/managers/tray.py b/src/web/managers/tray.py index 53e6684..613d4e9 100644 --- a/src/web/managers/tray.py +++ b/src/web/managers/tray.py @@ -28,9 +28,7 @@ class TrayIconStub: icon: Icon name/identifier to change to """ # Broadcast icon change for potential UI indicators - asyncio.create_task( - self._broadcaster.emit("tray_icon_change", {"icon": icon}) - ) + asyncio.create_task(self._broadcaster.emit("tray_icon_change", {"icon": icon})) def notify(self, message: str, title: str): """Send a system notification (translated to browser notification). @@ -41,10 +39,7 @@ class TrayIconStub: """ # Send browser notification asyncio.create_task( - self._broadcaster.emit("notification", { - "title": title, - "message": message - }) + self._broadcaster.emit("notification", {"title": title, "message": message}) ) def minimize(self): diff --git a/src/websocket/websocket.py b/src/websocket/websocket.py index 29e418e..a40eb42 100644 --- a/src/websocket/websocket.py +++ b/src/websocket/websocket.py @@ -148,9 +148,7 @@ class Websocket: """ asyncio.create_task(self.stop(remove=remove)) - async def _backoff_connect( - self, ws_url: str, **kwargs - ): + async def _backoff_connect(self, ws_url: str, **kwargs): """ Connect to websocket with exponential backoff retry logic. @@ -196,7 +194,8 @@ class Websocket: self._closed.clear() # Connect/Reconnect loop async for websocket in self._backoff_connect( - "wss://pubsub-edge.twitch.tv/v1", maximum=3*60 # 3 minutes maximum backoff time + "wss://pubsub-edge.twitch.tv/v1", + maximum=3 * 60, # 3 minutes maximum backoff time ): self._ws.set(websocket) self._reconnect_requested.clear() @@ -264,7 +263,7 @@ class Websocket: "data": { "topics": topics_list, "auth_token": auth_state.access_token, - } + }, } ) self._submitted.difference_update(removed) @@ -279,7 +278,7 @@ class Websocket: "data": { "topics": topics_list, "auth_token": auth_state.access_token, - } + }, } ) self._submitted.update(added)