diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8c37365..bdf4b4e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,8 +13,37 @@ env: USE_UPX: false jobs: + lint: + name: Code Quality (Ruff & Mypy) + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{env.PYTHON_VERSION}} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install ruff mypy + + - name: Run Ruff linter + run: | + ruff check src/ + echo "✅ Ruff checks passed!" + + - name: Run Mypy type checker + run: | + mypy src/ + echo "✅ Mypy checks passed!" + continue-on-error: true # Don't fail CI on mypy errors yet + validate: - name: Validate + name: Validate Language Files runs-on: ubuntu-latest steps: @@ -45,6 +74,7 @@ jobs: name: Docker Build runs-on: ubuntu-latest needs: + - lint - validate permissions: contents: read diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..68ef7f7 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,61 @@ +[tool.ruff] +# Set the maximum line length to 100 (more reasonable than 88) +line-length = 100 +target-version = "py310" + +[tool.ruff.lint] +# Enable specific rule sets +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # Pyflakes + "I", # isort (import sorting) + "UP", # pyupgrade + "B", # flake8-bugbear + "SIM", # flake8-simplify + "C4", # flake8-comprehensions +] + +# Ignore specific rules +ignore = [ + "E501", # line too long (handled by formatter) + "UP036", # outdated version block (we want to keep Python version check) +] + +[tool.ruff.lint.isort] +force-single-line = false +lines-after-imports = 2 + +[tool.mypy] +python_version = "3.10" +warn_return_any = false # Too noisy with JSON responses +warn_unused_configs = true +disallow_untyped_defs = false # Start lenient, can tighten later +check_untyped_defs = true +warn_redundant_casts = true +warn_unused_ignores = false # We use many type: ignore comments +no_implicit_reexport = true +strict_equality = true +show_error_codes = true + +# Per-module options +[[tool.mypy.overrides]] +module = [ + "aiohttp.*", + "socketio.*", + "truststore.*", + "dateutil.*", + "yarl.*", + "PIL.*", +] +ignore_missing_imports = true + +# GraphQL responses are dynamically typed - allow more flexibility +[[tool.mypy.overrides]] +module = [ + "src.api.gql_client", + "src.models.*", + "src.services.*", +] +warn_return_any = false +disable_error_code = ["call-overload", "assignment"] diff --git a/src/__main__.py b/src/__main__.py index 9dc410f..4bc6bcf 100644 --- a/src/__main__.py +++ b/src/__main__.py @@ -6,25 +6,23 @@ from multiprocessing import freeze_support if __name__ == "__main__": freeze_support() - import sys - import signal + import argparse import asyncio import logging - import argparse - import warnings + import signal + import sys import traceback - from typing import NoReturn + import warnings import truststore truststore.inject_into_ssl() - from src.i18n import _ - from src.core.client import Twitch + from src.config import FILE_FORMATTER, LOG_PATH, LOGGING_LEVELS, SELF_PATH from src.config.settings import Settings - from src.version import __version__ + from src.core.client import Twitch from src.exceptions import CaptchaRequired - from src.config.paths import _resource_path as resource_path - from src.config import LOGGING_LEVELS, SELF_PATH, FILE_FORMATTER, LOG_PATH, LOCK_PATH + from src.i18n import _ + from src.version import __version__ logger = logging.getLogger("TwitchDrops") @@ -111,11 +109,10 @@ if __name__ == "__main__": logger.debug("Defining main async function") async def main(): # set language - try: - _.set_language(settings.language) - except ValueError: + from contextlib import suppress + with suppress(ValueError): # this language doesn't exist - stick to English - pass + _.set_language(settings.language) # Always log to file with timestamped filename in ./logs/ directory from datetime import datetime diff --git a/src/api/__init__.py b/src/api/__init__.py index 4d1e1aa..9a9bf1a 100644 --- a/src/api/__init__.py +++ b/src/api/__init__.py @@ -7,8 +7,9 @@ with Twitch's API endpoints. from __future__ import annotations -from src.api.http_client import HTTPClient from src.api.gql_client import GQLClient +from src.api.http_client import HTTPClient + __all__ = [ "HTTPClient", diff --git a/src/api/gql_client.py b/src/api/gql_client.py index fce1f46..6a317d6 100644 --- a/src/api/gql_client.py +++ b/src/api/gql_client.py @@ -8,17 +8,17 @@ from __future__ import annotations import asyncio import logging -from typing import overload, TYPE_CHECKING from itertools import chain +from typing import TYPE_CHECKING, overload -from src.utils import RateLimiter, ExponentialBackoff from src.exceptions import GQLException, MinerException -from src.config import GQL_OPERATIONS +from src.utils import ExponentialBackoff, RateLimiter + if TYPE_CHECKING: - from src.config import JsonType, GQLOperation, ClientInfo from src.api.http_client import HTTPClient from src.auth import _AuthState + from src.config import ClientInfo, GQLOperation, JsonType logger = logging.getLogger("TwitchDrops") @@ -115,10 +115,7 @@ class GQLClient: orig_response = response_json # Normalize to list for unified error handling - if isinstance(response_json, list): - response_list = response_json - else: - response_list = [response_json] + response_list = response_json if isinstance(response_json, list) else [response_json] force_retry: bool = False for response_json in response_list: diff --git a/src/api/http_client.py b/src/api/http_client.py index 5b3e3e6..5f01c46 100644 --- a/src/api/http_client.py +++ b/src/api/http_client.py @@ -8,23 +8,24 @@ from __future__ import annotations import asyncio import logging -from datetime import datetime, timedelta, timezone -from contextlib import asynccontextmanager -from typing import TYPE_CHECKING from collections import abc +from contextlib import asynccontextmanager +from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING import aiohttp from yarl import URL -from src.i18n import _ -from src.exceptions import ExitRequest, RequestInvalid -from src.utils import ExponentialBackoff from src.config import COOKIES_PATH +from src.exceptions import ExitRequest, RequestInvalid +from src.i18n import _ +from src.utils import ExponentialBackoff + if TYPE_CHECKING: + from src.config import ClientInfo from src.config.settings import Settings from src.web.gui_manager import WebGUIManager - from src.config import ClientInfo logger = logging.getLogger("TwitchDrops") diff --git a/src/auth/__init__.py b/src/auth/__init__.py index e129263..43695d5 100644 --- a/src/auth/__init__.py +++ b/src/auth/__init__.py @@ -2,4 +2,5 @@ from .auth_state import _AuthState + __all__ = ["_AuthState"] diff --git a/src/auth/auth_state.py b/src/auth/auth_state.py index ce091c2..6b24729 100644 --- a/src/auth/auth_state.py +++ b/src/auth/auth_state.py @@ -10,14 +10,14 @@ import aiohttp from yarl import URL from src.config import COOKIES_PATH -from src.exceptions import CaptchaRequired, LoginException from src.i18n import _ from src.utils import CHARS_HEX_LOWER, create_nonce + if TYPE_CHECKING: - from src.core.client import Twitch - from src.web.gui_manager import LoginForm, LoginFormManager from src.config import ClientInfo, JsonType + from src.core.client import Twitch + from src.web.gui_manager import LoginForm logger = logging.getLogger("TwitchDrops") @@ -29,7 +29,6 @@ class _AuthState: This class handles: - OAuth device code flow for authentication - - Legacy password-based login (deprecated) - Access token validation and management - Session and device ID management - Cookie persistence @@ -101,6 +100,7 @@ class _AuthState: while True: try: from datetime import datetime, timedelta, timezone + from src.exceptions import RequestInvalid now = datetime.now(timezone.utc) @@ -155,176 +155,6 @@ class _AuthState: # the device_code has expired, request a new code continue - async def _login(self) -> str: - """ - Perform legacy password-based login flow. - - This method implements the old Twitch login API flow using username/password. - It handles: - - Username/password authentication - - Two-factor authentication (TOTP and email codes) - - CAPTCHA detection - - Error handling and user feedback - - NOTE: This flow is deprecated and may trigger CAPTCHA or be blocked by Twitch. - OAuth device code flow (_oauth_login) is the preferred method. - - Returns: - str: The access token - - Raises: - CaptchaRequired: When CAPTCHA is detected - LoginException: On login failure - """ - logger.info("Login flow started") - gui_print = self._twitch.gui.print - login_form: LoginFormManager = self._twitch.gui.login - client_info: ClientInfo = self._twitch._client_type - - token_kind: str = '' - use_chrome: bool = False - payload: JsonType = { - # username and password are added later - # "username": str, - # "password": str, - # client ID to-be associated with the access token - "client_id": client_info.CLIENT_ID, - "undelete_user": False, # purpose unknown - "remember_me": True, # persist the session via the cookie - # "authy_token": str, # 2FA token - # "twitchguard_code": str, # email code - # "captcha": str, # self-fed captcha - # 'force_twitchguard': False, # force email code confirmation - } - - def _safe_loads(s: str): - """JSON loads that skips extra data after the first valid JSON object.""" - import json - - class SkipExtraJsonDecoder(json.JSONDecoder): - def decode(self, s: str, *args): - # skip whitespace check - obj, end = self.raw_decode(s) - return obj - - return json.loads(s, cls=SkipExtraJsonDecoder) - - while True: - login_data = await login_form.ask_login() - payload["username"] = login_data.username - payload["password"] = login_data.password - # reinstate the 2FA token, if present - payload.pop("authy_token", None) - payload.pop("twitchguard_code", None) - if login_data.token: - # if there's no token kind set yet, and the user has entered a token, - # we can immediately assume it's an authenticator token and not an email one - if not token_kind: - token_kind = "authy" - if token_kind == "authy": - payload["authy_token"] = login_data.token - elif token_kind == "email": - payload["twitchguard_code"] = login_data.token - - # use fancy headers to mimic the twitch android app - headers = { - "Accept": "application/vnd.twitchtv.v3+json", - "Accept-Encoding": "gzip", - "Accept-Language": "en-US", - "Client-Id": client_info.CLIENT_ID, - "Content-Type": "application/json; charset=UTF-8", - "Host": "passport.twitch.tv", - "User-Agent": client_info.USER_AGENT, - "X-Device-Id": self.device_id, - # "X-Device-Id": ''.join(random.choices('0123456789abcdef', k=32)), - } - async with self._twitch.request( - "POST", "https://passport.twitch.tv/login", headers=headers, json=payload - ) as response: - login_response: JsonType = await response.json(loads=_safe_loads) - - # Feed this back in to avoid running into CAPTCHA if possible - if "captcha_proof" in login_response: - payload["captcha"] = {"proof": login_response["captcha_proof"]} - - # Error handling - if "error_code" in login_response: - error_code: int = login_response["error_code"] - logger.info(f"Login error code: {error_code}") - if error_code == 1000: - logger.info("1000: CAPTCHA is required") - use_chrome = True - break - elif error_code in (2004, 3001): - logger.info("3001: Login failed due to incorrect username or password") - gui_print(_("login", "incorrect_login_pass")) - if error_code == 2004: - # invalid username - login_form.clear(login=True) - login_form.clear(password=True) - continue - elif error_code in ( - 3012, # Invalid authy token - 3023, # Invalid email code - ): - logger.info("3012/23: Login failed due to incorrect 2FA code") - if error_code == 3023: - token_kind = "email" - gui_print(_("login", "incorrect_email_code")) - else: - token_kind = "authy" - gui_print(_("login", "incorrect_twofa_code")) - login_form.clear(token=True) - continue - elif error_code in ( - 3011, # Authy token needed - 3022, # Email code needed - ): - # 2FA handling - logger.info("3011/22: 2FA token required") - # user didn't provide a token, so ask them for it - if error_code == 3022: - token_kind = "email" - gui_print(_("login", "email_code_required")) - else: - token_kind = "authy" - gui_print(_("login", "twofa_code_required")) - continue - elif error_code >= 5000: - # Special errors, usually from Twitch telling the user to "go away" - # We print the code out to inform the user, and just use chrome flow instead - # { - # "error_code":5023, - # "error":"Please update your app to continue", - # "error_description":"client is not supported for this feature" - # } - # { - # "error_code":5027, - # "error":"Please update your app to continue", - # "error_description":"client blocked from this operation" - # } - gui_print(_("login", "error_code").format(error_code=error_code)) - logger.info(str(login_response)) - use_chrome = True - break - else: - ext_msg = str(login_response) - logger.info(ext_msg) - raise LoginException(ext_msg) - # Success handling - if "access_token" in login_response: - self.access_token = cast(str, login_response["access_token"]) - logger.info("Access token granted") - login_form.clear() - break - - if use_chrome: - # await self._chrome_login() - raise CaptchaRequired() - - if hasattr(self, "access_token"): - return self.access_token - raise LoginException("Login flow finished without setting the access token") def headers(self, *, user_agent: str = '', gql: bool = False) -> JsonType: """ @@ -404,8 +234,8 @@ class _AuthState: login_form: LoginForm = self._twitch.gui.login logger.info("Checking login") login_form.update(_("gui", "login", "logging_in"), None) - for client_mismatch_attempt in range(2): - for invalid_token_attempt in range(2): + for _client_mismatch_attempt in range(2): + for _invalid_token_attempt in range(2): cookie = jar.filter_cookies(client_info.CLIENT_URL) if "auth-token" not in cookie: self.access_token = await self._oauth_login() diff --git a/src/config/__init__.py b/src/config/__init__.py index 60c0612..f8c5158 100644 --- a/src/config/__init__.py +++ b/src/config/__init__.py @@ -2,59 +2,61 @@ from __future__ import annotations +from .client_info import ClientInfo, ClientType + # Re-export all public symbols for convenience from .constants import ( - CALL, - FILE_FORMATTER, - OUTPUT_FORMATTER, - LOGGING_LEVELS, - State, - WebsocketTopic, - WEBSOCKET_TOPICS, - JsonType, - URLType, - TopicProcess, - GQLOperation, - MAX_INT, - MAX_EXTRA_MINUTES, BASE_TOPICS, - MAX_WEBSOCKETS, - WS_TOPICS_LIMIT, - TOPICS_PER_CHANNEL, - MAX_TOPICS, - MAX_CHANNELS, + CALL, DEFAULT_LANG, + FILE_FORMATTER, + LOGGING_LEVELS, + MAX_CHANNELS, + MAX_EXTRA_MINUTES, + MAX_INT, + MAX_TOPICS, + MAX_WEBSOCKETS, + ONLINE_DELAY, + OUTPUT_FORMATTER, PING_INTERVAL, PING_TIMEOUT, - ONLINE_DELAY, + TOPICS_PER_CHANNEL, WATCH_INTERVAL, + WEBSOCKET_TOPICS, WINDOW_TITLE, + WS_TOPICS_LIMIT, + GQLOperation, + JsonType, + State, + TopicProcess, + URLType, + WebsocketTopic, ) -from .paths import ( - IS_APPIMAGE, - IS_PACKAGED, - IS_DOCKER, - SYS_SITE_PACKAGES, - SYS_SCRIPTS, - SELF_PATH, - WORKING_DIR, - DATA_DIR, - VENV_PATH, - SITE_PACKAGES_PATH, - SCRIPTS_PATH, - LANG_PATH, - LOG_PATH, - DUMP_PATH, - LOCK_PATH, - CACHE_PATH, - CACHE_DB, - COOKIES_PATH, - SETTINGS_PATH, - _resource_path, - _merge_vars, -) -from .client_info import ClientInfo, ClientType from .operations import GQL_OPERATIONS +from .paths import ( + CACHE_DB, + CACHE_PATH, + COOKIES_PATH, + DATA_DIR, + DUMP_PATH, + IS_APPIMAGE, + IS_DOCKER, + IS_PACKAGED, + LANG_PATH, + LOCK_PATH, + LOG_PATH, + SCRIPTS_PATH, + SELF_PATH, + SETTINGS_PATH, + SITE_PACKAGES_PATH, + SYS_SCRIPTS, + SYS_SITE_PACKAGES, + VENV_PATH, + WORKING_DIR, + _merge_vars, + _resource_path, +) + __all__ = [ # constants.py diff --git a/src/config/client_info.py b/src/config/client_info.py index f7cc4b5..5571e4f 100644 --- a/src/config/client_info.py +++ b/src/config/client_info.py @@ -3,6 +3,7 @@ from __future__ import annotations import random + from yarl import URL diff --git a/src/config/constants.py b/src/config/constants.py index 762af72..0b04b7f 100644 --- a/src/config/constants.py +++ b/src/config/constants.py @@ -2,18 +2,19 @@ from __future__ import annotations -import sys import logging -from enum import Enum, auto -from datetime import timedelta -from typing import Any, Dict, Literal, NewType, TYPE_CHECKING +import sys from copy import deepcopy +from datetime import timedelta +from enum import Enum, auto +from typing import TYPE_CHECKING, Any, Literal, NewType from src.version import __version__ + if TYPE_CHECKING: from collections import abc # noqa - from typing_extensions import TypeAlias + from typing import TypeAlias # Logging special levels @@ -36,7 +37,7 @@ FILE_FORMATTER = logging.Formatter( OUTPUT_FORMATTER = logging.Formatter("{levelname}: {message}", style='{', datefmt="%H:%M:%S") # Type aliases -JsonType = Dict[str, Any] +JsonType = dict[str, Any] URLType = NewType("URLType", str) TopicProcess: TypeAlias = "abc.Callable[[int, JsonType], Any]" diff --git a/src/config/paths.py b/src/config/paths.py index b4d07d2..6d2362c 100644 --- a/src/config/paths.py +++ b/src/config/paths.py @@ -5,11 +5,11 @@ from __future__ import annotations import os import sys from pathlib import Path -from typing import Any, Dict +from typing import Any # Type alias for path operations -JsonType = Dict[str, Any] +JsonType = dict[str, Any] # Environment detection @@ -43,7 +43,7 @@ def _resource_path(relative_path: Path | str) -> Path: base_path = Path(sys.argv[0]).resolve().parent elif IS_PACKAGED: # PyInstaller's folder where the one-file app is unpacked - meipass: str = getattr(sys, "_MEIPASS") + meipass: str = sys._MEIPASS # type: ignore[attr-defined] base_path = Path(meipass) else: base_path = WORKING_DIR diff --git a/src/config/settings.py b/src/config/settings.py index 33705af..d2cf242 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -1,14 +1,15 @@ from __future__ import annotations -from typing import Any, TypedDict, TYPE_CHECKING +from typing import TYPE_CHECKING, Any, TypedDict from yarl import URL +from src.config import DEFAULT_LANG, SETTINGS_PATH from src.utils import json_load, json_save -from src.config import SETTINGS_PATH, DEFAULT_LANG + if TYPE_CHECKING: - from main import ParsedArgs + from typing import Any as ParsedArgs # Avoid circular import class SettingsFile(TypedDict): diff --git a/src/core/client.py b/src/core/client.py index 7d1a503..829589a 100644 --- a/src/core/client.py +++ b/src/core/client.py @@ -1,79 +1,57 @@ from __future__ import annotations -import json import asyncio import logging -from time import time -from copy import deepcopy -from functools import partial -from collections import abc, deque, OrderedDict +from collections import OrderedDict, abc, deque from datetime import datetime, timedelta, timezone -from contextlib import suppress -from typing import Any, Literal, Final, NoReturn, TYPE_CHECKING -from dateutil.parser import isoparse -from src.web.gui_manager import WebGUIManager +from functools import partial +from time import time +from typing import TYPE_CHECKING, Any, Final, Literal, NoReturn import aiohttp -from src.i18n import _ -from src.models.channel import Channel -from src.websocket import WebsocketPool -from src.models.campaign import DropsCampaign +from src.api import GQLClient, HTTPClient from src.auth import _AuthState -from src.api import HTTPClient, GQLClient -from src.services.maintenance import MaintenanceService -from src.services.channel_service import ChannelService -from src.services.message_handlers import MessageHandlerService -from src.services.inventory_service import InventoryService -from src.services.watch_service import WatchService +from src.config import ( + DUMP_PATH, + MAX_CHANNELS, + ClientType, + State, + WebsocketTopic, +) from src.exceptions import ( ExitRequest, ReloadRequest, RequestException, - GQLException, - MinerException, ) +from src.i18n import _ +from src.models.campaign import DropsCampaign +from src.models.channel import Channel +from src.services.channel_service import ChannelService +from src.services.inventory_service import InventoryService +from src.services.maintenance import MaintenanceService +from src.services.message_handlers import MessageHandlerService +from src.services.watch_service import WatchService from src.utils import ( - chunk, - task_wrapper, AwaitableValue, + task_wrapper, ) -from src.config import ( - CALL, - MAX_INT, - DUMP_PATH, - MAX_CHANNELS, - GQL_OPERATIONS, - WATCH_INTERVAL, - State, - ClientType, - WebsocketTopic, -) +from src.websocket import WebsocketPool + if TYPE_CHECKING: - from src.models.game import Game - from src.models.channel import Stream + from src.config import ClientInfo, GQLOperation, JsonType from src.config.settings import Settings + from src.models.channel import Stream from src.models.drop import TimedDrop - from src.config import ClientInfo, JsonType, GQLOperation + from src.models.game import Game + from src.web.gui_manager import WebGUIManager logger = logging.getLogger("TwitchDrops") gql_logger = logging.getLogger("TwitchDrops.gql") -class SkipExtraJsonDecoder(json.JSONDecoder): - def decode(self, s: str, _w: Any = None) -> Any: # type: ignore[override] - # skip whitespace check - obj, end = self.raw_decode(s) - return obj - - -def _safe_loads(s: str) -> Any: - """JSON loads that skips extra data after the first valid JSON object.""" - return json.loads(s, cls=SkipExtraJsonDecoder) - - class Twitch: def __init__(self, settings: Settings): self.settings: Settings = settings @@ -201,26 +179,13 @@ class Twitch: self.settings.save(force=force) def get_priority(self, channel: Channel) -> int: - """ - Return a priority number for a given channel based on games_to_watch order. - - 0 has the highest priority (first in games_to_watch list). - Higher numbers -> lower priority. - MAX_INT (a really big number) signifies the lowest possible priority. - """ - if ( - (game := channel.game) is None # None when OFFLINE or no game set - or game not in self.wanted_games # we don't care about the played game - ): - return MAX_INT - return self.wanted_games.index(game) + """Delegate to ChannelService.""" + return self._channel_service.get_priority(channel) @staticmethod def _viewers_key(channel: Channel) -> int: - """Sort key for channels by viewer count (descending).""" - if (viewers := channel.viewers) is not None: - return viewers - return -1 + """Delegate to ChannelService.""" + return ChannelService.get_viewers_key(channel) def _remove_channel_topics(self, channels: abc.Iterable[Channel]) -> None: """Remove websocket topics for a list of channels.""" @@ -295,7 +260,7 @@ class Twitch: # ensure the websocket is running await self.websocket.start() await self.fetch_inventory() - self.gui.set_games(set(campaign.game for campaign in self.inventory)) + self.gui.set_games({campaign.game for campaign in self.inventory}) # Save state on every inventory fetch self.save() self.change_state(State.GAMES_UPDATE) @@ -515,6 +480,7 @@ class Twitch: to_add_topics, ordered_channels, watching_channel, + new_watching, ) elif self._state is State.CHANNEL_SWITCH: if self.settings.dump: @@ -523,9 +489,9 @@ class Twitch: self.gui.status.update(_("gui", "status", "switching")) # Determine the best channel to watch - new_watching: Channel | None = None + new_watching: Channel | None = None # type: ignore[no-redef] selected_channel: Channel | None = self.gui.channels.get_selection() - watching_channel: Channel | None = self.watching_channel.get_with_default(None) + watching_channel: Channel | None = self.watching_channel.get_with_default(None) # type: ignore[no-redef] # Handle user selection if selected_channel is not None and self.can_watch(selected_channel): @@ -544,7 +510,8 @@ class Twitch: if channel.game == self._manual_target_game and self.can_watch(channel): new_watching = channel self._manual_target_channel = channel - logger.info(f"Manual mode: switching to {channel.name} (same game: {self._manual_target_game.name})") + game_name = self._manual_target_game.name if self._manual_target_game else "Unknown" + logger.info(f"Manual mode: switching to {channel.name} (same game: {game_name})") break # No channels available for manual game -> exit manual mode if new_watching is None: @@ -586,168 +553,38 @@ class Twitch: await self._state_change.wait() async def _watch_sleep(self, delay: float) -> None: - # we use wait_for here to allow an asyncio.sleep-like that can be ended prematurely - self._watching_restart.clear() - with suppress(asyncio.TimeoutError): - await asyncio.wait_for(self._watching_restart.wait(), timeout=delay) + """Delegate to WatchService.""" + await self._watch_service.watch_sleep(delay) @task_wrapper(critical=True) async def _watch_loop(self) -> NoReturn: - interval: float = WATCH_INTERVAL.total_seconds() - while True: - channel: Channel = await self.watching_channel.get() - if not channel.online: - # if the channel isn't online anymore, we stop watching it - self.stop_watching() - continue - # logger.log(CALL, f"Sending watch payload to: {channel.name}") - succeeded: bool = await channel.send_watch() - last_sent: float = time() - if not succeeded: - logger.log(CALL, f"Watch requested failed for channel: {channel.name}") - # wait ~20 seconds for a progress update - await asyncio.sleep(20) - if self.gui.progress.minute_almost_done(): - # If the previous update was more than ~60s ago, and the progress tracker - # isn't counting down anymore, that means Twitch has temporarily - # stopped reporting drop's progress. To ensure the timer keeps at least somewhat - # accurate time, we can use GQL to query for the current drop, - # or even "pretend" mining as a last resort option. - handled: bool = False - - # Solution 1: use GQL to query for the currently mined drop status - try: - context = await self.gql_request( - GQL_OPERATIONS["CurrentDrop"].with_variables( - {"channelID": str(channel.id)} - ) - ) - assert isinstance(context, dict) - drop_data: JsonType | None = ( - context["data"]["currentUser"]["dropCurrentSession"] # type: ignore[index] - ) - except GQLException: - drop_data = None - if drop_data is not None: - gql_drop: TimedDrop | None = self._drops.get(drop_data["dropID"]) - if gql_drop is not None and gql_drop.can_earn(channel): - gql_drop.update_minutes(drop_data["currentMinutesWatched"]) - drop_text: str = ( - f"{gql_drop.name} ({gql_drop.campaign.game}, " - f"{gql_drop.current_minutes}/{gql_drop.required_minutes})" - ) - logger.log(CALL, f"Drop progress from GQL: {drop_text}") - handled = True - - # Solution 2: If GQL fails, figure out which campaign we're most likely mining - # right now, and then bump up the minutes on it's drops - if not handled: - if (active_campaign := self.get_active_campaign(channel)) is not None: - active_campaign.bump_minutes(channel) - # NOTE: This usually gets overwritten below - drop_text = f"Unknown drop ({active_campaign.game})" - if (active_drop := active_campaign.first_drop) is not None: - active_drop.display() - drop_text = ( - f"{active_drop.name} ({active_drop.campaign.game}, " - f"{active_drop.current_minutes}/{active_drop.required_minutes})" - ) - logger.log(CALL, f"Drop progress from active search: {drop_text}") - handled = True - else: - logger.log(CALL, "No active drop could be determined") - await self._watch_sleep(interval - min(time() - last_sent, interval)) + """Delegate to WatchService.""" + await self._watch_service.watch_loop() # type: ignore[misc] @task_wrapper(critical=True) async def _maintenance_task(self) -> None: - now = datetime.now(timezone.utc) - next_period = now + timedelta(minutes=1) - while True: - # exit if there's no need to repeat the loop - now = datetime.now(timezone.utc) - if now >= next_period: - break - next_trigger = next_period - while self._mnt_triggers and self._mnt_triggers[0] <= next_trigger: - next_trigger = self._mnt_triggers.popleft() - trigger_type: str = "Reload" if next_trigger == next_period else "Cleanup" - logger.log( - CALL, - ( - "Maintenance task waiting until: " - f"{next_trigger.astimezone().strftime('%X')} ({trigger_type})" - ) - ) - await asyncio.sleep((next_trigger - now).total_seconds()) - # exit after waiting, before the actions - now = datetime.now(timezone.utc) - if now >= next_period: - break - if next_trigger != next_period: - logger.log(CALL, "Maintenance task requests channels cleanup") - self.change_state(State.CHANNELS_CLEANUP) - # this triggers a restart of this task every (up to) 60 minutes - logger.log(CALL, "Maintenance task requests a reload") - self.change_state(State.INVENTORY_FETCH) + """Delegate to MaintenanceService.""" + await self._maintenance_service.run_maintenance_task() def can_watch(self, channel: Channel) -> bool: - """ - Determines if the given channel qualifies as a watching candidate. - """ - if not self.wanted_games: - return False - # exit early if stream is offline or drops aren't enabled - if not channel.online or not channel.drops_enabled: - return False - # check if we can progress any campaign for the played game - if channel.game is None or channel.game not in self.wanted_games: - return False - for campaign in self.inventory: - if campaign.can_earn(channel): - return True - return False + """Delegate to WatchService.""" + return self._watch_service.can_watch(channel) def should_switch(self, channel: Channel) -> bool: - """ - Determines if the given channel qualifies as a switch candidate. - """ - watching_channel = self.watching_channel.get_with_default(None) - if watching_channel is None: - return True - channel_order = self.get_priority(channel) - watching_order = self.get_priority(watching_channel) - return ( - # this channel's game is higher order than the watching one's - channel_order < watching_order - or channel_order == watching_order # or the order is the same - # and this channel is ACL-based and the watching channel isn't - and channel.acl_based > watching_channel.acl_based - ) + """Delegate to WatchService.""" + return self._watch_service.should_switch(channel) def watch(self, channel: Channel, *, update_status: bool = True) -> None: - """Start watching a specific channel.""" - self.gui.tray.change_icon("active") - self.gui.channels.set_watching(channel) - self.watching_channel.set(channel) - if update_status: - if self.is_manual_mode() and self._manual_target_game: - status_text: str = f"🎯 Manual Mode: Watching {channel.name} for {self._manual_target_game.name}" - else: - status_text: str = _("status", "watching").format(channel=channel.name) - self.print(status_text) - self.gui.status.update(status_text) + """Delegate to WatchService.""" + self._watch_service.watch(channel, update_status=update_status) def stop_watching(self) -> None: - """Stop watching the current channel.""" - self.gui.clear_drop() - self.watching_channel.clear() - self.gui.channels.clear_watching() + """Delegate to WatchService.""" + self._watch_service.stop_watching() def restart_watching(self) -> None: - """Restart the watch loop (forces immediate re-send of watch payload).""" - # Don't stop the timer - this would clear the drop display on the frontend - # The timer will naturally update when the next drop progress arrives - self._watching_restart.set() + """Delegate to WatchService.""" + self._watch_service.restart_watching() def is_manual_mode(self) -> bool: """Check if manual mode is currently active.""" @@ -810,182 +647,29 @@ class Twitch: @task_wrapper async def process_stream_state(self, channel_id: int, message: JsonType) -> None: - """Process websocket stream state updates (viewcount, stream-up, stream-down).""" - msg_type: str = message["type"] - channel: Channel | None = self.channels.get(channel_id) - if channel is None: - logger.error(f"Stream state change for a non-existing channel: {channel_id}") - return - if msg_type == "viewcount": - if not channel.online: - # if it's not online for some reason, set it so - channel.check_online() - else: - viewers = message["viewers"] - channel.viewers = viewers - channel.display() - # logger.debug(f"{channel.name} viewers: {viewers}") - elif msg_type == "stream-down": - channel.set_offline() - elif msg_type == "stream-up": - channel.check_online() - elif msg_type == "commercial": - # skip these - pass - else: - logger.warning(f"Unknown stream state: {msg_type}") + """Delegate to MessageHandlerService.""" + await self._message_handler_service.process_stream_state(channel_id, message) @task_wrapper async def process_stream_update(self, channel_id: int, message: JsonType) -> None: - """Process websocket broadcast settings updates (game/title changes).""" - # message = { - # "channel_id": "12345678", - # "type": "broadcast_settings_update", - # "channel": "channel._login", - # "old_status": "Old title", - # "status": "New title", - # "old_game": "Old game name", - # "game": "New game name", - # "old_game_id": 123456, - # "game_id": 123456 - # } - channel: Channel | None = self.channels.get(channel_id) - if channel is None: - logger.error(f"Broadcast settings update for a non-existing channel: {channel_id}") - return - if message["old_game"] != message["game"]: - game_change = f", game changed: {message['old_game']} -> {message['game']}" - else: - game_change = '' - logger.log(CALL, f"Channel update from websocket: {channel.name}{game_change}") - # There's no information about channel tags here, but this event is triggered - # when the tags change. We can use this to just update the stream data after the change. - # Use 'check_online' to introduce a delay, allowing for multiple title and tags - # changes before we update. This eventually calls 'on_channel_update' below. - channel.check_online() + """Delegate to MessageHandlerService.""" + await self._message_handler_service.process_stream_update(channel_id, message) def on_channel_update( self, channel: Channel, stream_before: Stream | None, stream_after: Stream | None ) -> None: - """ - Called by a Channel when its status is updated (ONLINE, OFFLINE, title/tags change). - - NOTE: 'stream_before' gets deallocated once this function finishes. - """ - watching_channel: Channel | None = self.watching_channel.get_with_default(None) - is_watching_this: bool = watching_channel is not None and watching_channel == channel - - # Channel going from OFFLINE to ONLINE - if stream_before is None and stream_after is not None: - if self.can_watch(channel) and self.should_switch(channel): - self.print(_("status", "goes_online").format(channel=channel.name)) - self.watch(channel) - else: - logger.info(f"{channel.name} goes ONLINE") - - # Channel going from ONLINE to OFFLINE - elif stream_before is not None and stream_after is None: - if is_watching_this: - self.print(_("status", "goes_offline").format(channel=channel.name)) - self.change_state(State.CHANNEL_SWITCH) - else: - logger.info(f"{channel.name} goes OFFLINE") - - # Channel staying ONLINE but with updates - elif stream_before is not None and stream_after is not None: - drops_status: str = ( - f"(🎁: {stream_before.drops_enabled and '✔' or '❌'} -> " - f"{stream_after.drops_enabled and '✔' or '❌'})" - ) - - if is_watching_this and not self.can_watch(channel): - # Watching this channel but can't watch it anymore - logger.info(f"{channel.name} status updated, switching... {drops_status}") - self.change_state(State.CHANNEL_SWITCH) - elif not is_watching_this: - # Not watching this channel - logger.info(f"{channel.name} status updated {drops_status}") - if self.can_watch(channel) and self.should_switch(channel): - self.watch(channel) - - # Channel was OFFLINE and stays OFFLINE - else: - logger.log(CALL, f"{channel.name} stays OFFLINE") - - channel.display() + """Delegate to MessageHandlerService.""" + self._message_handler_service.on_channel_update(channel, stream_before, stream_after) @task_wrapper async def process_drops(self, user_id: int, message: JsonType) -> None: - """Process websocket drop progress and claim updates.""" - # Message examples: - # {"type": "drop-progress", data: {"current_progress_min": 3, "required_progress_min": 10}} - # {"type": "drop-claim", data: {"drop_instance_id": ...}} - msg_type: str = message["type"] - if msg_type not in ("drop-progress", "drop-claim"): - return - drop_id: str = message["data"]["drop_id"] - drop: TimedDrop | None = self._drops.get(drop_id) - watching_channel: Channel | None = self.watching_channel.get_with_default(None) - if msg_type == "drop-claim": - if drop is None: - logger.error( - f"Received a drop claim ID for a non-existing drop: {drop_id}\n" - f"Drop claim ID: {message['data']['drop_instance_id']}" - ) - return - drop.update_claim(message["data"]["drop_instance_id"]) - campaign = drop.campaign - await drop.claim() - drop.display() - # About 4-20s after claiming the drop, next drop can be started - # by re-sending the watch payload. We can test for it by fetching the current drop - # via GQL, and then comparing drop IDs. - await asyncio.sleep(4) - if watching_channel is not None: - for attempt in range(8): - context = await self.gql_request( - GQL_OPERATIONS["CurrentDrop"].with_variables( - {"channelID": str(watching_channel.id)} - ) - ) - assert isinstance(context, dict) - drop_data: JsonType | None = ( - context["data"]["currentUser"]["dropCurrentSession"] # type: ignore[index] - ) - if drop_data is None or drop_data["dropID"] != drop.id: - break - await asyncio.sleep(2) - if campaign.can_earn(watching_channel): - self.restart_watching() - else: - self.change_state(State.INVENTORY_FETCH) - return - assert msg_type == "drop-progress" - if drop is not None: - drop_text = ( - f"{drop.name} ({drop.campaign.game}, " - f"{message['data']['current_progress_min']}/" - f"{message['data']['required_progress_min']})" - ) - else: - drop_text = "" - logger.log(CALL, f"Drop update from websocket: {drop_text}") - if drop is not None and drop.can_earn(self.watching_channel.get_with_default(None)): - # the received payload is for the drop we expected - drop.update_minutes(message["data"]["current_progress_min"]) + """Delegate to MessageHandlerService.""" + await self._message_handler_service.process_drops(user_id, message) @task_wrapper async def process_notifications(self, user_id: int, message: JsonType) -> None: - """Process websocket notification updates.""" - if message["type"] == "create-notification": - data: JsonType = message["data"]["notification"] - if data["type"] == "user_drop_reward_reminder_notification": - self.change_state(State.INVENTORY_FETCH) - await self.gql_request( - GQL_OPERATIONS["NotificationsDelete"].with_variables( - {"input": {"id": data["id"]}} - ) - ) + """Delegate to MessageHandlerService.""" + await self._message_handler_service.process_notifications(user_id, message) async def get_auth(self) -> _AuthState: """Get authentication state (validates token if needed).""" @@ -1007,259 +691,23 @@ class Twitch: async def fetch_campaigns( self, campaigns_chunk: list[tuple[str, JsonType]] ) -> dict[str, JsonType]: - campaign_ids: dict[str, JsonType] = dict(campaigns_chunk) - auth_state = await self.get_auth() - response_list_raw = await self.gql_request( - [ - GQL_OPERATIONS["CampaignDetails"].with_variables( - {"channelLogin": str(auth_state.user_id), "dropID": cid} - ) - for cid in campaign_ids - ] - ) - # Ensure we have a list - response_list: list[JsonType] = ( - response_list_raw if isinstance(response_list_raw, list) else [response_list_raw] - ) - fetched_data: dict[str, JsonType] = { - (campaign_data := response_json["data"]["user"]["dropCampaign"])["id"]: campaign_data # type: ignore[index] - for response_json in response_list - } - return GQLClient.merge_data(campaign_ids, fetched_data) + """Delegate to InventoryService.""" + return await self._inventory_service.fetch_campaigns(campaigns_chunk) async def fetch_inventory(self) -> None: - status_update = self.gui.status.update - status_update(_("gui", "status", "fetching_inventory")) - # fetch in-progress campaigns (inventory) - response = await self.gql_request(GQL_OPERATIONS["Inventory"]) - assert isinstance(response, dict) - inventory: JsonType = response["data"]["currentUser"]["inventory"] # type: ignore[index] - ongoing_campaigns: list[JsonType] = inventory["dropCampaignsInProgress"] or [] - # this contains claimed benefit edge IDs, not drop IDs - claimed_benefits: dict[str, datetime] = { - b["id"]: isoparse(b["lastAwardedAt"]) for b in inventory["gameEventDrops"] - } - inventory_data: dict[str, JsonType] = {c["id"]: c for c in ongoing_campaigns} - # fetch general available campaigns data (campaigns) - response = await self.gql_request(GQL_OPERATIONS["Campaigns"]) - assert isinstance(response, dict) - available_list: list[JsonType] = response["data"]["currentUser"]["dropCampaigns"] or [] # type: ignore[index] - applicable_statuses = ("ACTIVE", "UPCOMING") - available_campaigns: dict[str, JsonType] = { - c["id"]: c - for c in available_list - if c["status"] in applicable_statuses # that are currently not expired - } - # fetch detailed data for each campaign, in chunks - status_update(_("gui", "status", "fetching_campaigns")) - fetch_campaigns_tasks: list[asyncio.Task[Any]] = [ - asyncio.create_task(self.fetch_campaigns(campaigns_chunk)) - for campaigns_chunk in chunk(available_campaigns.items(), 20) - ] - try: - for coro in asyncio.as_completed(fetch_campaigns_tasks): - chunk_campaigns_data = await coro - # merge the inventory and campaigns datas together - inventory_data = GQLClient.merge_data(inventory_data, chunk_campaigns_data) - except Exception: - # asyncio.as_completed doesn't cancel tasks on errors - for task in fetch_campaigns_tasks: - task.cancel() - raise - # filter out invalid campaigns - for campaign_id in list(inventory_data.keys()): - if inventory_data[campaign_id]["game"] is None: - del inventory_data[campaign_id] - - if self.settings.dump: - # dump the campaigns data to the dump file - with open(DUMP_PATH, 'a', encoding="utf8") as file: - # we need to pre-process the inventory dump a little - dump_data: JsonType = deepcopy(inventory_data) - for campaign_data in dump_data.values(): - # replace ACL lists with a simple text description - if ( - campaign_data["allow"] - and campaign_data["allow"].get("isEnabled", True) - and campaign_data["allow"]["channels"] - ): - # simply count the channels included in the ACL - campaign_data["allow"]["channels"] = ( - f"{len(campaign_data['allow']['channels'])} channels" - ) - # replace drop instance IDs, so they don't include user IDs - for drop_data in campaign_data["timeBasedDrops"]: - if "self" in drop_data and drop_data["self"]["dropInstanceID"]: - drop_data["self"]["dropInstanceID"] = "..." - json.dump(dump_data, file, indent=4, sort_keys=True) - file.write("\n\n") # add 2x new line spacer - json.dump(claimed_benefits, file, indent=4, sort_keys=True, default=str) - - # use the merged data to create campaign objects - campaigns: list[DropsCampaign] = [ - DropsCampaign(self, campaign_data, claimed_benefits) - for campaign_data in inventory_data.values() - ] - campaigns.sort(key=lambda c: c.active, reverse=True) - campaigns.sort(key=lambda c: c.upcoming and c.starts_at or c.ends_at) - campaigns.sort(key=lambda c: c.eligible, reverse=True) - - self._drops.clear() - self.inventory.clear() - self._mnt_triggers.clear() - switch_triggers: set[datetime] = set() - next_hour = datetime.now(timezone.utc) + timedelta(hours=1) - # add the campaigns to the internal inventory - for campaign in campaigns: - self._drops.update({drop.id: drop for drop in campaign.drops}) - if campaign.can_earn_within(next_hour): - switch_triggers.update(campaign.time_triggers) - self.inventory.append(campaign) - self._campaigns[campaign.id] = campaign - # concurrently add the campaigns into the GUI - # NOTE: this fetches pictures from the CDN, so might be slow without a cache - # Start batch mode to prevent individual emissions - self.gui.inv.start_batch() - status_update( - _("gui", "status", "adding_campaigns").format(counter=f"(0/{len(campaigns)})") - ) - add_campaign_tasks: list[asyncio.Task[None]] = [ - asyncio.create_task(self.gui.inv.add_campaign(campaign)) - for campaign in campaigns - ] - try: - for i, coro in enumerate(asyncio.as_completed(add_campaign_tasks), start=1): - await coro - status_update( - _("gui", "status", "adding_campaigns").format( - counter=f"({i}/{len(campaigns)})" - ) - ) - # this is needed here explicitly, because cache reads from disk don't raise this - if self.gui.close_requested: - raise ExitRequest() - except Exception: - # asyncio.as_completed doesn't cancel tasks on errors - for task in add_campaign_tasks: - task.cancel() - raise - # Finalize batch mode - emit all campaigns atomically - await self.gui.inv.finalize_batch() - self._mnt_triggers.extend(sorted(switch_triggers)) - # trim out all triggers that we're already past - now = datetime.now(timezone.utc) - while self._mnt_triggers and self._mnt_triggers[0] <= now: - self._mnt_triggers.popleft() - # NOTE: maintenance task is restarted at the end of each inventory fetch - if self._mnt_task is not None and not self._mnt_task.done(): - self._mnt_task.cancel() - self._mnt_task = asyncio.create_task(self._maintenance_task()) + """Delegate to InventoryService.""" + await self._inventory_service.fetch_inventory() def get_active_campaign(self, channel: Channel | None = None) -> DropsCampaign | None: - if not self.wanted_games: - return None - watching_channel = self.watching_channel.get_with_default(channel) - if watching_channel is None: - # if we aren't watching anything, we can't earn any drops - return None - campaigns: list[DropsCampaign] = [] - for campaign in self.inventory: - if campaign.can_earn(watching_channel): - campaigns.append(campaign) - if campaigns: - campaigns.sort(key=lambda c: c.remaining_minutes) - return campaigns[0] - return None + """Delegate to InventoryService.""" + return self._inventory_service.get_active_campaign(channel) async def get_live_streams( self, game: Game, *, limit: int = 20, drops_enabled: bool = True ) -> list[Channel]: - filters: list[str] = [] - if drops_enabled: - filters.append("DROPS_ENABLED") - try: - response = await self.gql_request( - GQL_OPERATIONS["GameDirectory"].with_variables({ - "limit": limit, - "slug": game.slug, - "options": { - "includeRestricted": ["SUB_ONLY_LIVE"], - "systemFilters": filters, - }, - }) - ) - except GQLException as exc: - raise MinerException(f"Game: {game.slug}") from exc - assert isinstance(response, dict) - if "game" in response["data"]: # type: ignore[operator] - return [ - Channel.from_directory( - self, stream_channel_data["node"], drops_enabled=drops_enabled - ) - for stream_channel_data in response["data"]["game"]["streams"]["edges"] # type: ignore[index] - if stream_channel_data["node"]["broadcaster"] is not None - ] - return [] + """Delegate to ChannelService.""" + return await self._channel_service.get_live_streams(game, limit=limit, drops_enabled=drops_enabled) async def bulk_check_online(self, channels: abc.Iterable[Channel]): - """ - Utilize batch GQL requests to check ONLINE status for a lot of channels at once. - Also handles the drops_enabled check. - """ - acl_streams_map: dict[int, JsonType] = {} - stream_gql_ops: list[GQLOperation] = [channel.stream_gql for channel in channels] - if not stream_gql_ops: - # shortcut for nothing to process - # NOTE: Have to do this here, becase "channels" can be any iterable - return - stream_gql_tasks: list[asyncio.Task[JsonType | list[JsonType]]] = [ - asyncio.create_task(self.gql_request(stream_gql_chunk)) - for stream_gql_chunk in chunk(stream_gql_ops, 20) - ] - try: - for coro in asyncio.as_completed(stream_gql_tasks): - response = await coro - response_list: list[JsonType] = response if isinstance(response, list) else [response] - for response_json in response_list: - channel_data: JsonType = response_json["data"]["user"] # type: ignore[index] - if channel_data is not None: - acl_streams_map[int(channel_data["id"])] = channel_data - except Exception: - # asyncio.as_completed doesn't cancel tasks on errors - for task in stream_gql_tasks: - task.cancel() - raise - # for all channels with an active stream, check the available drops as well - # acl_available_drops_map: dict[int, list[JsonType]] = {} - # available_gql_ops: list[GQLOperation] = [ - # GQL_OPERATIONS["AvailableDrops"].with_variables({"channelID": str(channel_id)}) - # for channel_id, channel_data in acl_streams_map.items() - # if channel_data["stream"] is not None # only do this for ONLINE channels - # ] - # available_gql_tasks: list[asyncio.Task[list[JsonType]]] = [ - # asyncio.create_task(self.gql_request(available_gql_chunk)) - # for available_gql_chunk in chunk(available_gql_ops, 20) - # ] - # try: - # for coro in asyncio.as_completed(available_gql_tasks): - # response_list = await coro - # for response_json in response_list: - # available_info: JsonType = response_json["data"]["channel"] - # acl_available_drops_map[int(available_info["id"])] = ( - # available_info["viewerDropCampaigns"] or [] - # ) - # except Exception: - # # asyncio.as_completed doesn't cancel tasks on errors - # for task in available_gql_tasks: - # task.cancel() - # raise - for channel in channels: - channel_id = channel.id - if channel_id not in acl_streams_map: - continue - channel_data = acl_streams_map[channel_id] - if channel_data["stream"] is None: - continue - # available_drops: list[JsonType] = acl_available_drops_map[channel_id] - # channel.external_update(channel_data, available_drops) - channel.external_update(channel_data, []) + """Delegate to ChannelService.""" + await self._channel_service.bulk_check_online(channels) diff --git a/src/i18n/__init__.py b/src/i18n/__init__.py index 220f360..6f0831d 100644 --- a/src/i18n/__init__.py +++ b/src/i18n/__init__.py @@ -3,33 +3,34 @@ from __future__ import annotations from .translator import ( - StatusMessages, ChromeMessages, - LoginMessages, ErrorMessages, + GUIChannelHeadings, + GUIChannels, + GUIHelp, + GUIHelpLinks, + GUIInventory, + GUIInvFilter, + GUIInvStatus, + GUILoginForm, + GUIMessages, + GUIPriorityModes, + GUIProgress, + GUISettings, + GUISettingsGeneral, GUIStatus, GUITabs, GUITray, - GUILoginForm, GUIWebsocket, - GUIProgress, - GUIChannelHeadings, - GUIChannels, - GUIInvFilter, - GUIInvStatus, - GUIInventory, - GUISettingsGeneral, - GUIPriorityModes, - GUISettings, - GUIHelpLinks, - GUIHelp, - GUIMessages, + LoginMessages, + StatusMessages, Translation, - default_translation, Translator, _, + default_translation, ) + __all__ = [ "StatusMessages", "ChromeMessages", diff --git a/src/i18n/translator.py b/src/i18n/translator.py index 5559105..cca7c63 100644 --- a/src/i18n/translator.py +++ b/src/i18n/translator.py @@ -1,11 +1,12 @@ from __future__ import annotations from collections import abc -from typing import Any, TypedDict, TYPE_CHECKING +from typing import TYPE_CHECKING, Any, TypedDict +from src.config import DEFAULT_LANG, IS_PACKAGED, LANG_PATH from src.exceptions import MinerException from src.utils.json_utils import json_load, json_save -from src.config import IS_PACKAGED, LANG_PATH, DEFAULT_LANG + if TYPE_CHECKING: from typing_extensions import NotRequired @@ -471,12 +472,12 @@ class Translator: try: for key in path: v = v[key] - except KeyError: + except KeyError as err: # this can only really happen for the default translation raise MinerException( f"{self.current} translation is missing the '{' -> '.join(path)}' translation key" - ) - return v + ) from err + return str(v) _ = Translator() diff --git a/src/models/__init__.py b/src/models/__init__.py index bcdd97b..83c8181 100644 --- a/src/models/__init__.py +++ b/src/models/__init__.py @@ -1,10 +1,11 @@ """Domain models for Twitch drops mining.""" -from src.models.game import Game from src.models.benefit import Benefit, BenefitType -from src.models.drop import BaseDrop, TimedDrop, remove_dimensions from src.models.campaign import DropsCampaign from src.models.channel import Channel, Stream +from src.models.drop import BaseDrop, TimedDrop, remove_dimensions +from src.models.game import Game + __all__ = [ "Game", diff --git a/src/models/benefit.py b/src/models/benefit.py index 7b67e01..7a328a3 100644 --- a/src/models/benefit.py +++ b/src/models/benefit.py @@ -3,6 +3,7 @@ from __future__ import annotations from enum import Enum from typing import TYPE_CHECKING + if TYPE_CHECKING: from src.config.constants import JsonType, URLType @@ -28,7 +29,7 @@ class Benefit: self.name: str = benefit_data["name"] self.type: BenefitType = ( BenefitType(benefit_data["distributionType"]) - if benefit_data["distributionType"] in BenefitType.__members__.keys() + if benefit_data["distributionType"] in BenefitType.__members__ else BenefitType.UNKNOWN ) self.image_url: URLType = benefit_data["imageAssetURL"] diff --git a/src/models/campaign.py b/src/models/campaign.py index b9c15fc..7f3d207 100644 --- a/src/models/campaign.py +++ b/src/models/campaign.py @@ -1,22 +1,24 @@ from __future__ import annotations import logging +from datetime import datetime, timezone +from functools import cached_property from itertools import chain from typing import TYPE_CHECKING -from functools import cached_property -from datetime import datetime, timezone + from dateutil.parser import isoparse +from src.config.constants import State, URLType from src.models.channel import Channel -from src.models.game import Game from src.models.drop import TimedDrop, remove_dimensions -from src.config.constants import URLType, State +from src.models.game import Game + if TYPE_CHECKING: from collections import abc - from src.core.client import Twitch from src.config.constants import JsonType + from src.core.client import Twitch logger = logging.getLogger("TwitchDrops") @@ -187,7 +189,7 @@ class DropsCampaign: Used when websocket updates aren't available. """ # NOTE: Use a temporary list to ensure all drops are bumped before checking - if any([drop._bump_minutes(channel) for drop in self.drops]): + if any(drop._bump_minutes(channel) for drop in self.drops): # Executes if any drop's extra_current_minutes reach MAX_ESTIMATED_MINUTES # TODO: Figure out a better way to handle this case logger.warning( diff --git a/src/models/channel.py b/src/models/channel.py index 05c7af0..d45a570 100644 --- a/src/models/channel.py +++ b/src/models/channel.py @@ -1,21 +1,22 @@ from __future__ import annotations -import re -import json import asyncio +import json import logging +import re from base64 import b64encode from functools import cached_property -from typing import Any, SupportsInt, cast, TYPE_CHECKING +from typing import TYPE_CHECKING, Any, SupportsInt, cast import aiohttp from yarl import URL -from src.models.game import Game -from src.config.constants import JsonType, GQLOperation, URLType, CALL, ONLINE_DELAY +from src.config.constants import CALL, ONLINE_DELAY, GQLOperation, JsonType, URLType from src.config.operations import GQL_OPERATIONS -from src.utils.json_utils import json_minify from src.exceptions import MinerException, RequestException +from src.models.game import Game +from src.utils.json_utils import json_minify + if TYPE_CHECKING: from src.core.client import Twitch diff --git a/src/models/drop.py b/src/models/drop.py index d1953ca..2c03ff9 100644 --- a/src/models/drop.py +++ b/src/models/drop.py @@ -1,22 +1,24 @@ from __future__ import annotations -import re import logging -from typing import TYPE_CHECKING +import re from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING + from dateutil.parser import isoparse +from src.config.constants import MAX_EXTRA_MINUTES +from src.config.operations import GQL_OPERATIONS +from src.exceptions import GQLException from src.i18n import _ from src.models.benefit import Benefit -from src.exceptions import GQLException -from src.config.constants import MAX_EXTRA_MINUTES, State -from src.config.operations import GQL_OPERATIONS + if TYPE_CHECKING: - from src.core.client import Twitch - from src.models.channel import Channel - from src.models.campaign import DropsCampaign from src.config.constants import JsonType + from src.core.client import Twitch + from src.models.campaign import DropsCampaign + from src.models.channel import Channel logger = logging.getLogger("TwitchDrops") diff --git a/src/models/game.py b/src/models/game.py index 8642deb..3a9f312 100644 --- a/src/models/game.py +++ b/src/models/game.py @@ -4,6 +4,7 @@ import re from functools import cached_property from typing import TYPE_CHECKING + if TYPE_CHECKING: from src.config.constants import JsonType diff --git a/src/services/channel_service.py b/src/services/channel_service.py index 3271f80..dac4c7e 100644 --- a/src/services/channel_service.py +++ b/src/services/channel_service.py @@ -9,18 +9,19 @@ from __future__ import annotations import asyncio import logging -from typing import TYPE_CHECKING from collections import abc +from typing import TYPE_CHECKING -from src.utils import chunk -from src.config import MAX_INT, GQL_OPERATIONS -from src.models.channel import Channel +from src.config import GQL_OPERATIONS, MAX_INT from src.exceptions import GQLException, MinerException +from src.models.channel import Channel +from src.utils import chunk + if TYPE_CHECKING: + from src.config import GQLOperation, JsonType from src.core.client import Twitch from src.models.game import Game - from src.config import JsonType, GQLOperation logger = logging.getLogger("TwitchDrops") diff --git a/src/services/inventory_service.py b/src/services/inventory_service.py index 9d32fae..90d99ba 100644 --- a/src/services/inventory_service.py +++ b/src/services/inventory_service.py @@ -13,19 +13,21 @@ import logging from copy import deepcopy from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING, Any + from dateutil.parser import isoparse -from src.utils import chunk -from src.i18n import _ -from src.config import DUMP_PATH, GQL_OPERATIONS -from src.models import DropsCampaign -from src.exceptions import ExitRequest from src.api import GQLClient +from src.config import DUMP_PATH, GQL_OPERATIONS +from src.exceptions import ExitRequest +from src.i18n import _ +from src.models import DropsCampaign +from src.utils import chunk + if TYPE_CHECKING: + from src.config import JsonType from src.core.client import Twitch from src.models.channel import Channel - from src.config import JsonType logger = logging.getLogger("TwitchDrops") diff --git a/src/services/maintenance.py b/src/services/maintenance.py index d442d4f..07a68fd 100644 --- a/src/services/maintenance.py +++ b/src/services/maintenance.py @@ -12,8 +12,9 @@ import logging from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING -from src.utils import task_wrapper from src.config import CALL, State +from src.utils import task_wrapper + if TYPE_CHECKING: from src.core.client import Twitch diff --git a/src/services/message_handlers.py b/src/services/message_handlers.py index db4437f..ee64512 100644 --- a/src/services/message_handlers.py +++ b/src/services/message_handlers.py @@ -11,15 +11,16 @@ import asyncio import logging from typing import TYPE_CHECKING -from src.utils import task_wrapper -from src.config import CALL, State, GQL_OPERATIONS +from src.config import CALL, GQL_OPERATIONS, State from src.i18n import _ +from src.utils import task_wrapper + if TYPE_CHECKING: - from src.core.client import Twitch - from src.models.channel import Channel, Stream - from src.models import TimedDrop from src.config import JsonType + from src.core.client import Twitch + from src.models import TimedDrop + from src.models.channel import Channel, Stream logger = logging.getLogger("TwitchDrops") @@ -216,7 +217,7 @@ class MessageHandlerService: await asyncio.sleep(4) if watching_channel is not None: - for attempt in range(8): + for _attempt in range(8): context = await self._twitch.gql_request( GQL_OPERATIONS["CurrentDrop"].with_variables( {"channelID": str(watching_channel.id)} diff --git a/src/services/watch_service.py b/src/services/watch_service.py index 663ac1b..13df7b2 100644 --- a/src/services/watch_service.py +++ b/src/services/watch_service.py @@ -9,20 +9,21 @@ from __future__ import annotations import asyncio import logging +from contextlib import suppress from time import time from typing import TYPE_CHECKING, NoReturn -from contextlib import suppress -from src.utils import task_wrapper, AwaitableValue -from src.i18n import _ -from src.config import CALL, WATCH_INTERVAL, GQL_OPERATIONS +from src.config import CALL, GQL_OPERATIONS, WATCH_INTERVAL from src.exceptions import GQLException +from src.i18n import _ +from src.utils import task_wrapper + if TYPE_CHECKING: - from src.core.client import Twitch - from src.models.channel import Channel - from src.models import TimedDrop from src.config import JsonType + from src.core.client import Twitch + from src.models import TimedDrop + from src.models.channel import Channel logger = logging.getLogger("TwitchDrops") @@ -77,11 +78,7 @@ class WatchService: if channel.game is None or channel.game not in self._twitch.wanted_games: return False - for campaign in self._twitch.inventory: - if campaign.can_earn(channel): - return True - - return False + return any(campaign.can_earn(channel) for campaign in self._twitch.inventory) def should_switch(self, channel: Channel) -> bool: """ @@ -129,7 +126,11 @@ class WatchService: self._twitch.watching_channel.set(channel) if update_status: - status_text: str = _("status", "watching").format(channel=channel.name) + # Check if manual mode is active for custom status message + if self._twitch.is_manual_mode() and self._twitch._manual_target_game: + status_text = f"🎯 Manual Mode: Watching {channel.name} for {self._twitch._manual_target_game.name}" + else: + status_text = _("status", "watching").format(channel=channel.name) self._twitch.print(status_text) self._twitch.gui.status.update(status_text) diff --git a/src/utils/__init__.py b/src/utils/__init__.py index 905aca8..8ad16b4 100644 --- a/src/utils/__init__.py +++ b/src/utils/__init__.py @@ -2,40 +2,40 @@ from __future__ import annotations +# Async helpers +from .async_helpers import ( + AwaitableValue, + first_to_complete, + format_traceback, + invalidate_cache, + task_wrapper, +) + +# Backoff +from .backoff import ExponentialBackoff + +# JSON utilities +from .json_utils import ( + SERIALIZE_ENV, + json_load, + json_minify, + json_save, + merge_json, +) + +# Rate limiting +from .rate_limiter import RateLimiter + # String utilities from .string_utils import ( CHARS_ASCII, CHARS_HEX_LOWER, CHARS_HEX_UPPER, - create_nonce, chunk, + create_nonce, deduplicate, ) -# JSON utilities -from .json_utils import ( - json_minify, - json_load, - json_save, - merge_json, - SERIALIZE_ENV, -) - -# Async helpers -from .async_helpers import ( - first_to_complete, - format_traceback, - task_wrapper, - invalidate_cache, - AwaitableValue, -) - -# Rate limiting -from .rate_limiter import RateLimiter - -# Backoff -from .backoff import ExponentialBackoff - __all__ = [ # String utilities diff --git a/src/utils/async_helpers.py b/src/utils/async_helpers.py index 2a66c63..ee97a6e 100644 --- a/src/utils/async_helpers.py +++ b/src/utils/async_helpers.py @@ -2,16 +2,13 @@ from __future__ import annotations -import io -import sys import asyncio import logging import traceback -from pathlib import Path -from functools import wraps -from contextlib import suppress -from typing import Any, Literal, Generic, TypeVar, ParamSpec from collections import abc +from contextlib import suppress +from functools import wraps +from typing import Any, Generic, Literal, ParamSpec, TypeVar from src.exceptions import ExitRequest, ReloadRequest diff --git a/src/utils/cache.py b/src/utils/cache.py deleted file mode 100644 index a061217..0000000 --- a/src/utils/cache.py +++ /dev/null @@ -1,136 +0,0 @@ -from __future__ import annotations - -import asyncio -from datetime import datetime, timedelta, timezone - -import io -import json -from typing import Dict, TypedDict, NewType, TYPE_CHECKING - -from src.utils import json_load, json_save -from src.config import URLType, CACHE_PATH, CACHE_DB - -from PIL import Image as Image_module -from PIL.ImageTk import PhotoImage - - -if TYPE_CHECKING: - from src.web.gui_manager import GUIManager - from PIL.Image import Image - from typing_extensions import TypeAlias - - -ImageHash = NewType("ImageHash", str) -ImageSize: TypeAlias = "tuple[int, int]" - - -class ExpiringHash(TypedDict): - hash: ImageHash - expires: datetime - - -Hashes = Dict[URLType, ExpiringHash] -default_database: Hashes = {} - - -class ImageCache: - LIFETIME = timedelta(days=7) - - def __init__(self, manager: GUIManager) -> None: - self._root = manager._root - self._twitch = manager._twitch - cleanup: bool = False - CACHE_PATH.mkdir(parents=True, exist_ok=True) - try: - self._hashes: Hashes = json_load(CACHE_DB, default_database, merge=False) - except json.JSONDecodeError: - # if we can't load the mapping file, delete all existing files, - # then reinitialize the image cache anew - cleanup = True - self._hashes = default_database.copy() - self._images: dict[ImageHash, Image] = {} - self._photos: dict[tuple[ImageHash, ImageSize], PhotoImage] = {} - self._lock = asyncio.Lock() - self._altered: bool = False - # cleanup the URLs - hash_counts: dict[ImageHash, int] = {} - now = datetime.now(timezone.utc) - for url, hash_dict in list(self._hashes.items()): - img_hash = hash_dict["hash"] - if img_hash not in hash_counts: - hash_counts[img_hash] = 0 - if now >= hash_dict["expires"]: - del self._hashes[url] - self._altered = True - else: - hash_counts[img_hash] += 1 - for img_hash, count in hash_counts.items(): - if count == 0: - # hashes come with an extension already - CACHE_PATH.joinpath(img_hash).unlink(missing_ok=True) - # NOTE: The hashes are deleted from self._hashes above - if cleanup: - # This cleanups the cache folder from unused PNG files - orphans = [ - file.name for file in CACHE_PATH.glob("*.png") if file.name not in hash_counts - ] - for filename in orphans: - CACHE_PATH.joinpath(filename).unlink(missing_ok=True) - - def save(self, *, force: bool = False) -> None: - if self._altered or force: - json_save(CACHE_DB, self._hashes, sort=True) - - def _new_expires(self) -> datetime: - return datetime.now(timezone.utc) + self.LIFETIME - - def _hash(self, image: Image) -> ImageHash: - pixel_data = list( - image.resize((10, 10), Image_module.Resampling.LANCZOS).convert('L').getdata() - ) - avg_pixel = sum(pixel_data) / len(pixel_data) - bits = ''.join('1' if px >= avg_pixel else '0' for px in pixel_data) - return ImageHash(f"{int(bits, 2):x}.png") - - async def get(self, url: URLType, size: ImageSize | None = None) -> PhotoImage: - async with self._lock: - image: Image | None = None - if url in self._hashes: - img_hash = self._hashes[url]["hash"] - self._hashes[url]["expires"] = self._new_expires() - if img_hash in self._images: - image = self._images[img_hash] - else: - try: - self._images[img_hash] = image = Image_module.open(CACHE_PATH / img_hash) - except (FileNotFoundError, Image_module.UnidentifiedImageError): - pass - if image is None: - try: - async with self._twitch.request("GET", url) as response: - if response.status != 404: - image = Image_module.open(io.BytesIO(await response.read())) - except Exception: - pass - if image is None: - # use a blank white image as a fallback - image = Image_module.new("RGB", (10, 10), (255, 255, 255)) - img_hash = self._hash(image) - self._images[img_hash] = image - image.save(CACHE_PATH / img_hash) - self._hashes[url] = { - "hash": img_hash, - "expires": self._new_expires() - } - # NOTE: If self._hashes ever stops being updated in both above if cases, - # this will need to be moved - self._altered = True - if size is None: - size = image.size - photo_key = (img_hash, size) - if photo_key in self._photos: - return self._photos[photo_key] - if image.size != size: - image = image.resize(size, Image_module.Palette.ADAPTIVE) - self._photos[photo_key] = photo = PhotoImage(master=self._root, image=image) - return photo diff --git a/src/utils/json_utils.py b/src/utils/json_utils.py index 6f2a2cc..e4dee5a 100644 --- a/src/utils/json_utils.py +++ b/src/utils/json_utils.py @@ -3,10 +3,11 @@ from __future__ import annotations import json +from collections.abc import Callable, Mapping from datetime import datetime, timezone from enum import Enum from pathlib import Path -from typing import Any, Callable, Mapping, TypeVar, cast +from typing import Any, TypeVar, cast from yarl import URL @@ -117,7 +118,7 @@ def merge_json(obj: JsonType, template: Mapping[Any, Any]) -> None: assert isinstance(template[k], dict) merge_json(v, template[k]) # ensure the object is not missing any keys - for k in template.keys(): + for k in template: if k not in obj: obj[k] = template[k] @@ -136,7 +137,7 @@ def json_load(path: Path, defaults: _JSON_T, *, merge: bool = True) -> _JSON_T: """ defaults_dict: JsonType = dict(defaults) if path.exists(): - with open(path, 'r', encoding="utf8") as file: + with open(path, encoding="utf8") as file: combined: JsonType = _remove_missing(json.load(file, object_hook=_deserialize)) if merge: merge_json(combined, defaults_dict) diff --git a/src/utils/string_utils.py b/src/utils/string_utils.py index adcb686..8ccc274 100644 --- a/src/utils/string_utils.py +++ b/src/utils/string_utils.py @@ -4,8 +4,7 @@ from __future__ import annotations import random import string -from collections import OrderedDict -from collections import abc +from collections import OrderedDict, abc from typing import TypeVar diff --git a/src/web/app.py b/src/web/app.py index cce248c..0c6a2e5 100644 --- a/src/web/app.py +++ b/src/web/app.py @@ -5,16 +5,19 @@ import logging from pathlib import Path from typing import TYPE_CHECKING -from fastapi import FastAPI, HTTPException -from fastapi.staticfiles import StaticFiles -from fastapi.responses import HTMLResponse, FileResponse -from fastapi.middleware.cors import CORSMiddleware import socketio +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import FileResponse, HTMLResponse +from fastapi.staticfiles import StaticFiles from pydantic import BaseModel + if TYPE_CHECKING: - from src.web.gui_manager import WebGUIManager + import uvicorn + from src.core.client import Twitch + from src.web.gui_manager import WebGUIManager logger = logging.getLogger("TwitchDrops") @@ -45,7 +48,7 @@ socket_app = socketio.ASGIApp(sio, app) # Global references (set by main.py) gui_manager: WebGUIManager | None = None twitch_client: Twitch | None = None -_server_instance: 'uvicorn.Server | None' = None +_server_instance: uvicorn.Server | None = None def set_managers(gui: WebGUIManager, twitch: Twitch): @@ -319,7 +322,6 @@ async def run_server(host: str = "0.0.0.0", port: int = 8080): async def shutdown_server(): """Gracefully shutdown the web server""" - global _server_instance if _server_instance: logger.info("Setting server.should_exit = True") _server_instance.should_exit = True diff --git a/src/web/gui_manager.py b/src/web/gui_manager.py index 8d95422..f19f6c8 100644 --- a/src/web/gui_manager.py +++ b/src/web/gui_manager.py @@ -6,21 +6,23 @@ import asyncio import logging from typing import TYPE_CHECKING -from src.models.game import Game from src.i18n import _ +from src.models.game import Game from src.web.managers.broadcaster import WebSocketBroadcaster -from src.web.managers.status import StatusManager, WebsocketStatusManager -from src.web.managers.console import ConsoleOutputManager +from src.web.managers.cache import ImageCache from src.web.managers.campaigns import CampaignProgressManager from src.web.managers.channels import ChannelListManager +from src.web.managers.console import ConsoleOutputManager from src.web.managers.inventory import InventoryManager from src.web.managers.login import LoginFormManager -from src.web.managers.tray import TrayIconStub from src.web.managers.settings import SettingsManager -from src.web.managers.cache import ImageCache +from src.web.managers.status import StatusManager, WebsocketStatusManager +from src.web.managers.tray import TrayIconStub + if TYPE_CHECKING: from socketio import AsyncServer + from src.core.client import Twitch from src.models import TimedDrop diff --git a/src/web/managers/__init__.py b/src/web/managers/__init__.py index 0aa89d4..24defe9 100644 --- a/src/web/managers/__init__.py +++ b/src/web/managers/__init__.py @@ -15,15 +15,16 @@ This package contains all component managers for the web-based GUI: """ from src.web.managers.broadcaster import WebSocketBroadcaster -from src.web.managers.status import StatusManager, WebsocketStatusManager -from src.web.managers.console import ConsoleOutputManager +from src.web.managers.cache import ImageCache from src.web.managers.campaigns import CampaignProgressManager from src.web.managers.channels import ChannelListManager +from src.web.managers.console import ConsoleOutputManager from src.web.managers.inventory import InventoryManager -from src.web.managers.login import LoginFormManager, LoginData +from src.web.managers.login import LoginData, LoginFormManager from src.web.managers.settings import SettingsManager +from src.web.managers.status import StatusManager, WebsocketStatusManager from src.web.managers.tray import TrayIconStub -from src.web.managers.cache import ImageCache + __all__ = [ "WebSocketBroadcaster", diff --git a/src/web/managers/broadcaster.py b/src/web/managers/broadcaster.py index 58d477f..775d5cb 100644 --- a/src/web/managers/broadcaster.py +++ b/src/web/managers/broadcaster.py @@ -2,7 +2,8 @@ from __future__ import annotations -from typing import Any, TYPE_CHECKING +from typing import TYPE_CHECKING, Any + if TYPE_CHECKING: from socketio import AsyncServer diff --git a/src/web/managers/cache.py b/src/web/managers/cache.py index f338518..d322215 100644 --- a/src/web/managers/cache.py +++ b/src/web/managers/cache.py @@ -4,6 +4,7 @@ from __future__ import annotations from typing import TYPE_CHECKING + if TYPE_CHECKING: from src.web.gui_manager import WebGUIManager diff --git a/src/web/managers/campaigns.py b/src/web/managers/campaigns.py index 258533c..47274f6 100644 --- a/src/web/managers/campaigns.py +++ b/src/web/managers/campaigns.py @@ -5,9 +5,10 @@ from __future__ import annotations import asyncio from typing import TYPE_CHECKING + if TYPE_CHECKING: - from src.web.managers.broadcaster import WebSocketBroadcaster from src.models import TimedDrop + from src.web.managers.broadcaster import WebSocketBroadcaster class CampaignProgressManager: diff --git a/src/web/managers/channels.py b/src/web/managers/channels.py index 1a8e297..8d0a0aa 100644 --- a/src/web/managers/channels.py +++ b/src/web/managers/channels.py @@ -3,11 +3,12 @@ from __future__ import annotations import asyncio -from typing import Any, TYPE_CHECKING +from typing import TYPE_CHECKING, Any + if TYPE_CHECKING: - from src.web.managers.broadcaster import WebSocketBroadcaster from src.models.channel import Channel + from src.web.managers.broadcaster import WebSocketBroadcaster class ChannelListManager: diff --git a/src/web/managers/console.py b/src/web/managers/console.py index b2174d2..0f21823 100644 --- a/src/web/managers/console.py +++ b/src/web/managers/console.py @@ -3,10 +3,11 @@ from __future__ import annotations import asyncio -from datetime import datetime from collections import deque +from datetime import datetime from typing import TYPE_CHECKING + if TYPE_CHECKING: from src.web.managers.broadcaster import WebSocketBroadcaster diff --git a/src/web/managers/inventory.py b/src/web/managers/inventory.py index dbc094a..ba74402 100644 --- a/src/web/managers/inventory.py +++ b/src/web/managers/inventory.py @@ -3,12 +3,13 @@ from __future__ import annotations import asyncio -from typing import Any, TYPE_CHECKING +from typing import TYPE_CHECKING, Any + if TYPE_CHECKING: + from src.models import DropsCampaign, TimedDrop from src.web.managers.broadcaster import WebSocketBroadcaster from src.web.managers.cache import ImageCache - from src.models import DropsCampaign, TimedDrop class InventoryManager: diff --git a/src/web/managers/login.py b/src/web/managers/login.py index 75f1f09..e84b545 100644 --- a/src/web/managers/login.py +++ b/src/web/managers/login.py @@ -4,13 +4,14 @@ from __future__ import annotations import asyncio from dataclasses import dataclass -from typing import Any, TYPE_CHECKING +from typing import TYPE_CHECKING, Any from src.i18n import _ + if TYPE_CHECKING: - from src.web.managers.broadcaster import WebSocketBroadcaster from src.web.gui_manager import WebGUIManager + from src.web.managers.broadcaster import WebSocketBroadcaster @dataclass diff --git a/src/web/managers/settings.py b/src/web/managers/settings.py index 3154f73..9ad8d1a 100644 --- a/src/web/managers/settings.py +++ b/src/web/managers/settings.py @@ -3,13 +3,14 @@ from __future__ import annotations import asyncio -from typing import Any, TYPE_CHECKING +from typing import TYPE_CHECKING, Any from src.models.game import Game + if TYPE_CHECKING: - from src.web.managers.broadcaster import WebSocketBroadcaster from src.config.settings import Settings + from src.web.managers.broadcaster import WebSocketBroadcaster class SettingsManager: diff --git a/src/web/managers/status.py b/src/web/managers/status.py index 2001fb6..f9900ff 100644 --- a/src/web/managers/status.py +++ b/src/web/managers/status.py @@ -3,7 +3,8 @@ from __future__ import annotations import asyncio -from typing import Any, TYPE_CHECKING +from typing import TYPE_CHECKING, Any + if TYPE_CHECKING: from src.web.managers.broadcaster import WebSocketBroadcaster diff --git a/src/web/managers/tray.py b/src/web/managers/tray.py index c6f1366..53e6684 100644 --- a/src/web/managers/tray.py +++ b/src/web/managers/tray.py @@ -5,6 +5,7 @@ from __future__ import annotations import asyncio from typing import TYPE_CHECKING + if TYPE_CHECKING: from src.web.managers.broadcaster import WebSocketBroadcaster diff --git a/src/websocket/__init__.py b/src/websocket/__init__.py index 1b230b4..772ed12 100644 --- a/src/websocket/__init__.py +++ b/src/websocket/__init__.py @@ -10,7 +10,8 @@ Classes: WebsocketPool: Manages multiple websocket connections for topic distribution """ -from src.websocket.websocket import Websocket from src.websocket.pool import WebsocketPool +from src.websocket.websocket import Websocket + __all__ = ["Websocket", "WebsocketPool"] diff --git a/src/websocket/pool.py b/src/websocket/pool.py index b825177..b4f67ac 100644 --- a/src/websocket/pool.py +++ b/src/websocket/pool.py @@ -2,17 +2,18 @@ from __future__ import annotations import asyncio import logging -from typing import Any, Literal, TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Literal -from src.exceptions import MinerException from src.config import MAX_WEBSOCKETS, WS_TOPICS_LIMIT +from src.exceptions import MinerException from src.websocket.websocket import Websocket + if TYPE_CHECKING: from collections import abc - from src.core.client import Twitch from src.config import WebsocketTopic + from src.core.client import Twitch logger = logging.getLogger("TwitchDrops") diff --git a/src/websocket/websocket.py b/src/websocket/websocket.py index 6ebf03b..11d4086 100644 --- a/src/websocket/websocket.py +++ b/src/websocket/websocket.py @@ -1,31 +1,32 @@ from __future__ import annotations -import json import asyncio +import json import logging -from time import time from contextlib import suppress +from time import time from typing import TYPE_CHECKING import aiohttp -from src.i18n import _ -from src.exceptions import WebsocketClosed from src.config import PING_INTERVAL, PING_TIMEOUT, WS_TOPICS_LIMIT +from src.exceptions import WebsocketClosed +from src.i18n import _ from src.utils import ( CHARS_ASCII, - task_wrapper, - create_nonce, - json_minify, - format_traceback, AwaitableValue, ExponentialBackoff, + create_nonce, + format_traceback, + json_minify, + task_wrapper, ) + if TYPE_CHECKING: + from src.config import JsonType, WebsocketTopic from src.core.client import Twitch from src.web.gui_manager import WebsocketStatus - from src.config import JsonType, WebsocketTopic WSMsgType = aiohttp.WSMsgType @@ -162,10 +163,7 @@ class Websocket: """ session = await self._twitch.get_session() backoff = ExponentialBackoff(**kwargs) - if self._twitch.settings.proxy: - proxy = self._twitch.settings.proxy - else: - proxy = None + proxy = self._twitch.settings.proxy or None for delay in backoff: try: async with session.ws_connect(ws_url, proxy=proxy) as websocket: