From 9727a9b8d288af33e32904812e057c73adc55aa1 Mon Sep 17 00:00:00 2001 From: Fengqing Liu Date: Mon, 19 Jan 2026 00:39:58 +1100 Subject: [PATCH] unify game selection and expectation logic (#32) - unify game selection and expectation logic - refactore settings and settings manager - formatting and coverage and precommit hook --- .gitignore | 3 +- .vscode/launch.json | 23 ++++-- AGENTS.md | 7 +- CLAUDE.md | 7 +- GEMINI.md | 7 +- proxy_test.py | 58 -------------- pyproject.toml | 4 + src/__main__.py | 60 ++------------- src/config/settings.py | 118 +++++++++------------------- src/core/client.py | 99 +++++++++++------------- src/exceptions.py | 4 +- src/models/campaign.py | 8 +- src/models/drop.py | 7 +- src/services/stream_selector.py | 78 +++++++++++++++++++ src/web/app.py | 75 +++++++++--------- src/web/gui_manager.py | 85 +++----------------- src/web/managers/console.py | 3 +- src/web/managers/settings.py | 124 +++++++++++++----------------- src/websocket/websocket.py | 19 +++-- tests/__init__.py | 0 tests/test_benefit_filter.py | 47 ++++++----- tests/test_client_filter.py | 57 -------------- tests/test_proxy_settings.py | 51 +++++------- tests/test_settings_api.py | 40 +++++----- tests/test_verify_proxy.py | 54 +++++++------ tests/test_wanted_games_filter.py | 103 +++++++++++++++++++++++++ tests/test_wanted_items.py | 85 ++++++++++++++------ 27 files changed, 585 insertions(+), 641 deletions(-) delete mode 100644 proxy_test.py create mode 100644 src/services/stream_selector.py create mode 100644 tests/__init__.py delete mode 100644 tests/test_client_filter.py create mode 100644 tests/test_wanted_games_filter.py diff --git a/.gitignore b/.gitignore index 3b40373..e543216 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,5 @@ data/ .idea/ *.iml *.log -*.egg-info/ \ No newline at end of file +*.egg-info/ +.coverage \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index 66e3887..758ef2e 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -3,6 +3,9 @@ // Hover to view descriptions of existing attributes. // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", + "env": { + "PYTHONPATH": "${workspaceFolder}" + }, "configurations": [ { "name": "Run Current File", @@ -16,7 +19,9 @@ "type": "debugpy", "request": "launch", "program": "main.py", - "args": ["-vvv"], + "args": [ + "-vvv" + ], "console": "integratedTerminal", "justMyCode": false }, @@ -25,7 +30,9 @@ "type": "debugpy", "request": "launch", "program": "main.py", - "args": ["--dump"], + "args": [ + "--dump" + ], "console": "integratedTerminal", "justMyCode": false }, @@ -34,7 +41,10 @@ "type": "debugpy", "request": "launch", "program": "main.py", - "args": ["-vvv", "--debug-ws"], + "args": [ + "-vvv", + "--debug-ws" + ], "console": "integratedTerminal", "justMyCode": false }, @@ -43,7 +53,10 @@ "type": "debugpy", "request": "launch", "program": "main.py", - "args": ["-vvv", "--debug-gql"], + "args": [ + "-vvv", + "--debug-gql" + ], "console": "integratedTerminal", "justMyCode": false }, @@ -56,4 +69,4 @@ "justMyCode": false }, ] -} +} \ No newline at end of file diff --git a/AGENTS.md b/AGENTS.md index bf1c9e7..ebf841e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -337,11 +337,8 @@ source env/bin/activate && python -m pytest tests/ ### Manual Testing 1. Run with `-vvv` for maximum verbosity (levels: -v, -vv, -vvv, -vvvv) -2. Use `--dump` to generate debug data dumps -3. Check log files in `./logs/` directory -4. Use `--debug-ws` for websocket debug logging -5. Use `--debug-gql` for GraphQL debug logging -6. Monitor web GUI console output and browser developer tools +2. Check log files in `./logs/` directory +3. Monitor web GUI console output and browser developer tools ## Web GUI Architecture diff --git a/CLAUDE.md b/CLAUDE.md index 235bb2c..f6ac626 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -336,11 +336,8 @@ source env/bin/activate && python -m pytest tests/ ### Manual Testing 1. Run with `-vvv` for maximum verbosity (levels: -v, -vv, -vvv, -vvvv) -2. Use `--dump` to generate debug data dumps -3. Check log files in `./logs/` directory -4. Use `--debug-ws` for websocket debug logging -5. Use `--debug-gql` for GraphQL debug logging -6. Monitor web GUI console output and browser developer tools +2. Check log files in `./logs/` directory +3. Monitor web GUI console output and browser developer tools ## Web GUI Architecture diff --git a/GEMINI.md b/GEMINI.md index b6efaa2..4338e47 100644 --- a/GEMINI.md +++ b/GEMINI.md @@ -335,11 +335,8 @@ source env/bin/activate && python -m pytest tests/ ### Manual Testing 1. Run with `-vvv` for maximum verbosity (levels: -v, -vv, -vvv, -vvvv) -2. Use `--dump` to generate debug data dumps -3. Check log files in `./logs/` directory -4. Use `--debug-ws` for websocket debug logging -5. Use `--debug-gql` for GraphQL debug logging -6. Monitor web GUI console output and browser developer tools +2. Check log files in `./logs/` directory +3. Monitor web GUI console output and browser developer tools ## Web GUI Architecture diff --git a/proxy_test.py b/proxy_test.py deleted file mode 100644 index 80923c7..0000000 --- a/proxy_test.py +++ /dev/null @@ -1,58 +0,0 @@ - -import http.server -import socketserver -import urllib.request -import logging -import shutil - -PORT = 8888 - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger("ProxyServer") - -class Proxy(http.server.SimpleHTTPRequestHandler): - def do_GET(self): - logger.info(f"Proxy request: {self.path}") - try: - with urllib.request.urlopen(self.path) as response: - self.send_response(response.status) - for header, value in response.headers.items(): - self.send_header(header, value) - self.end_headers() - shutil.copyfileobj(response, self.wfile) - except Exception as e: - self.send_error(500, str(e)) - - def do_POST(self): - logger.info(f"Proxy request (POST): {self.path}") - length = int(self.headers['Content-Length']) - post_data = self.rfile.read(length) - req = urllib.request.Request(self.path, data=post_data, method='POST') - try: - with urllib.request.urlopen(req) as response: - self.send_response(response.status) - for header, value in response.headers.items(): - self.send_header(header, value) - self.end_headers() - shutil.copyfileobj(response, self.wfile) - except Exception as e: - self.send_error(500, str(e)) - - def do_CONNECT(self): - logger.info(f"CONNECT request: {self.path}") - self.wfile.write(b"HTTP/1.1 200 Connection Established\r\n\r\n") - # In a real proxy we would tunnel. - # For verification of "reachability", getting the 200 is often enough for simple clients, - # but aiohttp might try to read/write through the tunnel. - # Minimal tunnel implementation: - return - -if __name__ == "__main__": - # Reuse address to avoid port conflicts - socketserver.TCPServer.allow_reuse_address = True - with socketserver.TCPServer(("", PORT), Proxy) as httpd: - print(f"Serving proxy at port {PORT}") - try: - httpd.serve_forever() - except KeyboardInterrupt: - print("\nShutting down proxy") diff --git a/pyproject.toml b/pyproject.toml index a483696..89c7513 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,12 +13,16 @@ dependencies = [ "python-socketio>=5.10.0", "yarl>=1.9.2", "pydantic>=2.7.0", + "coverage>=7.3.1", ] [project.optional-dependencies] dev = [ "ruff", "mypy", + "pytest", + "pytest-asyncio", + "coverage", ] [build-system] diff --git a/src/__main__.py b/src/__main__.py index ac63e20..528f7d7 100644 --- a/src/__main__.py +++ b/src/__main__.py @@ -1,6 +1,5 @@ from __future__ import annotations -import argparse import asyncio import logging import signal @@ -16,7 +15,7 @@ import truststore if __name__ == "__main__": truststore.inject_into_ssl() - from src.config import FILE_FORMATTER, LOGGING_LEVELS + from src.config import FILE_FORMATTER from src.config.settings import Settings from src.core.client import Twitch from src.exceptions import CaptchaRequired @@ -45,58 +44,9 @@ if __name__ == "__main__": warnings.simplefilter("default", ResourceWarning) - class ParsedArgs(argparse.Namespace): - _verbose: int - _debug_ws: bool - _debug_gql: bool - log: bool - dump: bool - - # TODO: replace int with union of literal values once typeshed updates - @property - def logging_level(self) -> int: - return LOGGING_LEVELS[min(self._verbose, 4)] - - @property - def debug_ws(self) -> int: - """ - If the debug flag is True, return DEBUG. - If the main logging level is DEBUG, return INFO to avoid seeing raw messages. - Otherwise, return NOTSET to inherit the global logging level. - """ - if self._debug_ws: - return logging.DEBUG - elif self._verbose >= 4: - return logging.INFO - return logging.NOTSET - - @property - def debug_gql(self) -> int: - if self._debug_gql: - return logging.DEBUG - elif self._verbose >= 4: - return logging.INFO - return logging.NOTSET - - # handle input parameters - logger.debug("Parsing command line arguments") - parser = argparse.ArgumentParser( - description="A program that allows you to mine timed drops on Twitch.", - ) - parser.add_argument("--version", action="version", version=f"v{__version__}") - parser.add_argument("-v", dest="_verbose", action="count", default=0) - parser.add_argument("--dump", action="store_true") - # undocumented debug args - parser.add_argument("--debug-ws", dest="_debug_ws", action="store_true", help=argparse.SUPPRESS) - parser.add_argument( - "--debug-gql", dest="_debug_gql", action="store_true", help=argparse.SUPPRESS - ) - logger.debug("Parsing arguments into ParsedArgs namespace") - args = parser.parse_args(namespace=ParsedArgs()) - # load settings logger.debug("Loading settings") try: - settings = Settings(args) + settings = Settings() except Exception: logger.exception("Error while loading settings") print(f"Settings error: {traceback.format_exc()}", file=sys.stderr) @@ -114,7 +64,9 @@ if __name__ == "__main__": logger.info(f"Platform: {sys.platform}") logger.info(f"Proxy: {settings.proxy}") logger.info(f"Language: {settings.language}") - logger.info(f"Minimum refresh interval: {settings.minimum_refresh_interval_minutes} minutes") + logger.info( + f"Minimum refresh interval: {settings.minimum_refresh_interval_minutes} minutes" + ) exit_status = 0 client = Twitch(settings) @@ -197,7 +149,7 @@ if __name__ == "__main__": logger.info("Normal shutdown - proceeding") # save the application state logger.info("Saving application state") - client.save(force=True) + settings.save() logger.info("Application state saved") logger.info(f"=== Exiting with status code: {exit_status} ===") sys.exit(exit_status) diff --git a/src/config/settings.py b/src/config/settings.py index 42cd444..91dbf0e 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -1,6 +1,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, TypedDict +from dataclasses import dataclass +from typing import TypedDict from yarl import URL @@ -8,114 +9,69 @@ from src.config import DEFAULT_LANG, SETTINGS_PATH from src.utils import json_load, json_save -if TYPE_CHECKING: - from typing import Any as ParsedArgs # Avoid circular import - - class InventoryFilters(TypedDict): + game_name_search: list[str] show_active: bool - show_not_linked: bool - show_upcoming: bool - show_expired: bool - show_finished: bool - show_benefit_item: bool show_benefit_badge: bool show_benefit_emote: bool + show_benefit_item: bool show_benefit_other: bool - game_name_search: list[str] + show_expired: bool + show_finished: bool + show_not_linked: bool + show_upcoming: bool -class SettingsFile(TypedDict): - proxy: URL - language: str - dark_mode: bool - games_to_watch: list[str] - connection_quality: int - minimum_refresh_interval_minutes: int - inventory_filters: InventoryFilters - mining_benefits: dict[str, bool] - - -default_settings: SettingsFile = { - "proxy": URL(), - "games_to_watch": [], - "dark_mode": False, +default_settings = { "connection_quality": 1, + "dark_mode": False, + "games_to_watch": [], "language": DEFAULT_LANG, - "minimum_refresh_interval_minutes": 30, "inventory_filters": { + "game_name_search": [], "show_active": False, - "show_not_linked": True, - "show_upcoming": True, - "show_expired": False, - "show_finished": False, - "show_benefit_item": True, "show_benefit_badge": True, "show_benefit_emote": True, + "show_benefit_item": True, "show_benefit_other": True, - "game_name_search": [], + "show_expired": False, + "show_finished": False, + "show_not_linked": True, + "show_upcoming": True, }, + "minimum_refresh_interval_minutes": 30, "mining_benefits": { - "DIRECT_ENTITLEMENT": True, "BADGE": True, + "DIRECT_ENTITLEMENT": True, "EMOTE": True, "UNKNOWN": True, }, + "proxy": "", } +@dataclass class Settings: - # from args - log: bool - dump: bool - # args properties - debug_ws: int - debug_gql: int - logging_level: int - # from settings file - proxy: URL - language: str + connection_quality: int dark_mode: bool games_to_watch: list[str] - connection_quality: int - minimum_refresh_interval_minutes: int + language: str inventory_filters: InventoryFilters + minimum_refresh_interval_minutes: int mining_benefits: dict[str, bool] + proxy: str - PASSTHROUGH = ("_settings", "_args", "_altered") + def __init__(self): + self.load() - def __init__(self, args: ParsedArgs): - self._settings: SettingsFile = json_load(SETTINGS_PATH, default_settings) - self._args: ParsedArgs = args - self._altered: bool = False + def load(self): + # TODO: remvoe customized serde in the future + settings = json_load(SETTINGS_PATH, default_settings, merge=True) + for key, value in settings.items(): + if value is URL: + setattr(self, key, str(value)) + else: + setattr(self, key, value) - # default logic of reading settings is to check args first, then the settings file - def __getattr__(self, name: str, /) -> Any: - if name in self.PASSTHROUGH: - # passthrough - return getattr(super(), name) - elif hasattr(self._args, name): - return getattr(self._args, name) - elif name in self._settings: - return self._settings[name] # type: ignore[literal-required] - return getattr(super(), name) - - def __setattr__(self, name: str, value: Any, /) -> None: - if name in self.PASSTHROUGH: - # passthrough - return super().__setattr__(name, value) - elif name in self._settings: - self._settings[name] = value # type: ignore[literal-required] - self._altered = True - return - raise TypeError(f"{name} is missing a custom setter") - - def __delattr__(self, name: str, /) -> None: - raise RuntimeError("settings can't be deleted") - - def alter(self) -> None: - self._altered = True - - def save(self, *, force: bool = False) -> None: - if self._altered or force: - json_save(SETTINGS_PATH, self._settings, sort=True) + def save(self) -> None: + json_save(SETTINGS_PATH, vars(self), sort=True) diff --git a/src/core/client.py b/src/core/client.py index f6d4348..5f1939f 100644 --- a/src/core/client.py +++ b/src/core/client.py @@ -29,6 +29,7 @@ from src.services.channel_service import ChannelService from src.services.inventory_service import InventoryService from src.services.maintenance import MaintenanceService from src.services.message_handlers import MessageHandlerService +from src.services.stream_selector import StreamSelector from src.services.watch_service import WatchService from src.utils import ( AwaitableValue, @@ -86,6 +87,7 @@ class Twitch: self._message_handler_service: MessageHandlerService = MessageHandlerService(self) self._inventory_service: InventoryService = InventoryService(self) self._watch_service: WatchService = WatchService(self) + self._stream_selector: StreamSelector = StreamSelector() def _ensure_api_clients(self) -> None: """Ensure API clients are initialized (called after GUI is set).""" @@ -148,7 +150,7 @@ class Twitch: self._state = state self._state_change.set() - def state_change(self, state: State) -> abc.Callable[[], None]: + def get_change_state_callable(self, state: State) -> abc.Callable[[], None]: """Return a callable that changes state when invoked (deferred call for GUI usage).""" return partial(self.change_state, state) @@ -163,11 +165,6 @@ class Twitch: """Print a message in the GUI.""" self.gui.print(message) - def save(self, *, force: bool = False) -> None: - """Save the application state (settings and GUI state).""" - self.gui.save(force=force) - self.settings.save(force=force) - def _remove_channel_topics(self, channels: abc.Iterable[Channel]) -> None: """Remove websocket topics for a list of channels.""" topics_to_remove: list[str] = [] @@ -224,9 +221,6 @@ class Twitch: self.change_state(State.INVENTORY_FETCH) while True: if self._state is State.IDLE: - if self.settings.dump: - self.close() - continue self.gui.status.update(_.t["gui"]["status"]["idle"]) self.stop_watching() # clear the flag and wait until it's set again @@ -239,7 +233,6 @@ class Twitch: # Broadcast unwanted items (based on settings) self.gui.broadcast_wanted_items() # Save state on every inventory fetch - self.save() self.change_state(State.GAMES_UPDATE) elif self._state is State.GAMES_UPDATE: # claim drops from expired and active campaigns @@ -263,48 +256,14 @@ class Twitch: # Log detailed game -> campaigns -> channels mapping if logger.isEnabledFor(logging.DEBUG): - logger.info("=== Active Campaigns Mapping ===") - from collections import defaultdict - - game_campaign_map: dict[str, list[tuple[DropsCampaign, list[str]]]] = ( - defaultdict(list) - ) - for campaign in self.inventory: - if campaign.eligible and not campaign.finished: - logger.info( - "eligible Campaign: %s - %s", campaign.name, campaign.game.name - ) - if campaign.can_earn_within(next_hour): - channel_names = [] - if campaign.allowed_channels: - channel_names = [ch.name for ch in campaign.allowed_channels] - else: - channel_names = [""] - game_campaign_map[campaign.game.name].append((campaign, channel_names)) - for game_name in sorted(game_campaign_map.keys()): - logger.debug(f"Game: {game_name}") - for campaign, channel_list in game_campaign_map[game_name]: - status_info = f"{'ACTIVE' if campaign.active else 'UPCOMING'}" - ends_info = campaign.ends_at.astimezone().strftime("%Y-%m-%d %H:%M") - channel_info = ( - f"{len(channel_list)} channels" - if channel_list[0] != "" - else "directory" - ) - logger.debug( - f" └─ Campaign: {campaign.name} [{status_info}] (ends: {ends_info})" - ) - logger.debug(f" Channels: {channel_info}") - if channel_list[0] != "" and len(channel_list) <= 10: - logger.debug(f" └─ {', '.join(channel_list)}") - elif channel_list[0] != "": - logger.debug( - f" └─ {', '.join(channel_list[:10])} ... (+{len(channel_list) - 10} more)" - ) - logger.info("=== End Campaigns Mapping ===") + self._output_campaign_mapping(next_hour) + logger.info("Building wanted games list") # Build wanted_games list preserving the order from games_to_watch - self.wanted_games = self._filter_wanted_campaigns(next_hour) + self.wanted_games = self._stream_selector.get_wanted_games( + self.settings, self.inventory + ) + logger.info("Wanted games list built") if self.wanted_games: logger.info( @@ -470,9 +429,6 @@ class Twitch: watching_channel, ) elif self._state is State.CHANNEL_SWITCH: - if self.settings.dump: - self.close() - continue self.gui.status.update(_.t["gui"]["status"]["switching"]) # Determine the best channel to watch @@ -698,5 +654,40 @@ class Twitch: and campaign.has_wanted_unclaimed_benefits(mining_benefits) ): wanted_games.append(game) - break + break return wanted_games + + def _output_campaign_mapping(self, next_hour: datetime) -> None: + logger.info("=== Active Campaigns Mapping ===") + from collections import defaultdict + + game_campaign_map: dict[str, list[tuple[DropsCampaign, list[str]]]] = defaultdict(list) + for campaign in self.inventory: + if campaign.eligible and not campaign.finished: + logger.info("eligible Campaign: %s - %s", campaign.name, campaign.game.name) + if campaign.can_earn_within(next_hour): + channel_names = [] + if campaign.allowed_channels: + channel_names = [ch.name for ch in campaign.allowed_channels] + else: + channel_names = [""] + game_campaign_map[campaign.game.name].append((campaign, channel_names)) + for game_name in sorted(game_campaign_map.keys()): + logger.debug(f"Game: {game_name}") + for campaign, channel_list in game_campaign_map[game_name]: + status_info = f"{'ACTIVE' if campaign.active else 'UPCOMING'}" + ends_info = campaign.ends_at.astimezone().strftime("%Y-%m-%d %H:%M") + channel_info = ( + f"{len(channel_list)} channels" + if channel_list[0] != "" + else "directory" + ) + logger.debug(f" └─ Campaign: {campaign.name} [{status_info}] (ends: {ends_info})") + logger.debug(f" Channels: {channel_info}") + if channel_list[0] != "" and len(channel_list) <= 10: + logger.debug(f" └─ {', '.join(channel_list)}") + elif channel_list[0] != "": + logger.debug( + f" └─ {', '.join(channel_list[:10])} ... (+{len(channel_list) - 10} more)" + ) + logger.info("=== End Campaigns Mapping ===") diff --git a/src/exceptions.py b/src/exceptions.py index 316204e..3be211e 100644 --- a/src/exceptions.py +++ b/src/exceptions.py @@ -63,7 +63,9 @@ class WebsocketClosed(RequestException): self.raw_message: str = raw_message def __str__(self): - return f"Websocket has been closed. received: {self.received}, raw_message: {self.raw_message}" + return ( + f"Websocket has been closed. received: {self.received}, raw_message: {self.raw_message}" + ) class LoginException(RequestException): diff --git a/src/models/campaign.py b/src/models/campaign.py index 905d104..eba5a62 100644 --- a/src/models/campaign.py +++ b/src/models/campaign.py @@ -8,9 +8,9 @@ from typing import TYPE_CHECKING from dateutil.parser import isoparse -from src.config.constants import State, URLType +from src.config.constants import State from src.models.channel import Channel -from src.models.drop import TimedDrop, remove_dimensions +from src.models.drop import TimedDrop from src.models.game import Game @@ -201,6 +201,4 @@ class DropsCampaign: first_drop.display() def has_wanted_unclaimed_benefits(self, allowed_benefits: dict[str, bool]) -> bool: - return any( - drop.has_wanted_unclaimed_benefits(allowed_benefits) for drop in self.drops - ) + return any(drop.has_wanted_unclaimed_benefits(allowed_benefits) for drop in self.drops) diff --git a/src/models/drop.py b/src/models/drop.py index 2f35fe4..4bb43ad 100644 --- a/src/models/drop.py +++ b/src/models/drop.py @@ -139,9 +139,12 @@ class BaseDrop: return delim.join(benefit.name for benefit in self.benefits) def has_wanted_unclaimed_benefits(self, allowed_benefits: dict[str, bool]) -> bool: + return len(self.get_wanted_unclaimed_benefits(allowed_benefits)) > 0 + + def get_wanted_unclaimed_benefits(self, allowed_benefits: dict[str, bool]) -> list[str]: if self.is_claimed: - return False - return any(benefit.is_wanted(allowed_benefits) for benefit in self.benefits) + return [] + return [benefit.name for benefit in self.benefits if benefit.is_wanted(allowed_benefits)] async def claim(self) -> bool: result = await self._claim() diff --git a/src/services/stream_selector.py b/src/services/stream_selector.py new file mode 100644 index 0000000..d2f877d --- /dev/null +++ b/src/services/stream_selector.py @@ -0,0 +1,78 @@ +from datetime import datetime, timedelta, timezone + +from src.config.settings import Settings +from src.models.campaign import DropsCampaign +from src.models.game import Game + + +class StreamSelector: + def _get_wanted_game_tree( + self, settings: Settings, campaigns: list[DropsCampaign] + ) -> list[dict]: + """ + Get the hierarchical tree of wanted items (Games -> Campaigns -> Drops -> Benefits). + Ignoring 'can earn within' time constraint. + """ + wanted_games = [] + games_to_watch = settings.games_to_watch + mining_benefits = settings.mining_benefits + next_hour = datetime.now(timezone.utc) + timedelta(hours=1) + + for game_name in games_to_watch: + wanted_campaigns = [] + game_obj = None + game_name_lower = game_name.lower() + + # Find all campaigns for this game + for campaign in campaigns: + if campaign.game.name.lower() != game_name_lower: + continue + + if game_obj is None: + game_obj = campaign.game + + if not campaign.can_earn_within(next_hour): + continue + + wanted_drops = [] + for drop in campaign.drops: + if drop.is_claimed: + continue + + filtered_benefits = drop.get_wanted_unclaimed_benefits(mining_benefits) + + if len(filtered_benefits) > 0: + wanted_drops.append({"name": drop.name, "benefits": filtered_benefits}) + + if len(wanted_drops) > 0: + wanted_campaigns.append( + { + "id": campaign.id, + "name": campaign.name, + "url": campaign.campaign_url, + "drops": wanted_drops, + } + ) + + if len(wanted_campaigns) > 0: + wanted_games.append( + { + "game_id": game_obj.id if game_obj else None, + "game_name": game_name, + "game_icon": game_obj.box_art_url if game_obj else None, + "game_obj": game_obj, + "campaigns": wanted_campaigns, + } + ) + + return wanted_games + + def get_wanted_game_tree( + self, settings: Settings, campaigns: list[DropsCampaign] + ) -> list[dict]: + return [ + {**game, "game_obj": None} for game in self._get_wanted_game_tree(settings, campaigns) + ] + + def get_wanted_games(self, settings: Settings, campaigns: list[DropsCampaign]) -> list[Game]: + return [game["game_obj"] for game in self._get_wanted_game_tree(settings, campaigns)] diff --git a/src/web/app.py b/src/web/app.py index 2f635c1..70608d1 100644 --- a/src/web/app.py +++ b/src/web/app.py @@ -212,9 +212,10 @@ async def update_settings(settings: SettingsUpdate): @app.post("/api/settings/verify-proxy") async def verify_proxy(request: ProxyVerifyRequest): """Verify proxy connectivity""" - import aiohttp import time + import aiohttp + proxy_url = request.proxy.strip() if not proxy_url: return {"success": False, "message": "Proxy URL is empty"} @@ -222,23 +223,23 @@ async def verify_proxy(request: ProxyVerifyRequest): try: start_time = time.time() # Test connection to Twitch - async with aiohttp.ClientSession() as session: - async with session.get( - "https://www.twitch.tv", proxy=proxy_url, timeout=10 - ) as response: - # Just checking if we can connect and get a response - if response.status < 500: - latency = round((time.time() - start_time) * 1000) - return { - "success": True, - "message": f"Connected! ({latency}ms)", - "latency": latency, - } - else: - return { - "success": False, - "message": f"Proxy reachable but returned {response.status}", - } + async with ( + aiohttp.ClientSession() as session, + session.get("https://www.twitch.tv", proxy=proxy_url, timeout=10) as response, + ): + # Just checking if we can connect and get a response + if response.status < 500: + latency = round((time.time() - start_time) * 1000) + return { + "success": True, + "message": f"Connected! ({latency}ms)", + "latency": latency, + } + else: + return { + "success": False, + "message": f"Proxy reachable but returned {response.status}", + } except Exception as e: return {"success": False, "message": f"Connection failed: {str(e)}"} @@ -246,9 +247,10 @@ async def verify_proxy(request: ProxyVerifyRequest): @app.get("/api/version") async def get_version(): """Get current application version and check for updates""" - from src.version import __version__ import aiohttp + from src.version import __version__ + current_version = __version__ latest_version = None update_available = False @@ -256,19 +258,20 @@ async def get_version(): try: # Check GitHub API for latest release - async with aiohttp.ClientSession() as session: - async with session.get( - "https://api.github.com/repos/rangermix/TwitchDropsMiner/releases/latest", - timeout=5 - ) as response: - if response.status == 200: - data = await response.json() - latest_version = data.get('tag_name', '').lstrip('v') - download_url = data.get('html_url') + async with ( + aiohttp.ClientSession() as session, + session.get( + "https://api.github.com/repos/rangermix/TwitchDropsMiner/releases/latest", timeout=5 + ) as response, + ): + if response.status == 200: + data = await response.json() + latest_version = data.get("tag_name", "").lstrip("v") + download_url = data.get("html_url") - # Compare versions (simple string comparison works for semantic versioning) - if latest_version and latest_version > current_version: - update_available = True + # Compare versions (simple string comparison works for semantic versioning) + if latest_version and latest_version > current_version: + update_available = True except Exception as e: logger.warning(f"Failed to check for updates: {str(e)}") @@ -276,7 +279,7 @@ async def get_version(): "current_version": current_version, "latest_version": latest_version, "update_available": update_available, - "download_url": download_url or "https://github.com/rangermix/TwitchDropsMiner/releases" + "download_url": download_url or "https://github.com/rangermix/TwitchDropsMiner/releases", } @@ -357,7 +360,7 @@ async def connect(sid, environ): "login": gui_manager.login.get_status(), "manual_mode": twitch_client.get_manual_mode_info(), "current_drop": gui_manager.progress.get_current_drop(), - "wanted_items": gui_manager.get_wanted_tree(), + "wanted_items": gui_manager.get_wanted_game_tree(), }, room=sid, ) @@ -389,11 +392,7 @@ async def request_reload(sid): async def get_wanted_items(sid): """Client requested wanted items list""" if gui_manager: - await sio.emit( - "wanted_items_update", - gui_manager.get_wanted_tree(), - to=sid - ) + await sio.emit("wanted_items_update", gui_manager.get_wanted_game_tree(), to=sid) # Mount static files (CSS, JS, images) diff --git a/src/web/gui_manager.py b/src/web/gui_manager.py index 4bafc11..7f60e69 100644 --- a/src/web/gui_manager.py +++ b/src/web/gui_manager.py @@ -8,6 +8,7 @@ from typing import TYPE_CHECKING from src.config import State from src.models.game import Game +from src.services.stream_selector import StreamSelector from src.web.managers.broadcaster import WebSocketBroadcaster from src.web.managers.cache import ImageCache from src.web.managers.campaigns import CampaignProgressManager @@ -55,18 +56,16 @@ class WebGUIManager: self.login = LoginFormManager(self._broadcaster, self) self.inv = InventoryManager(self._broadcaster, ImageCache(self)) self.login = LoginFormManager(self._broadcaster, self) - + # Callback to trigger game update when relevant settings change - on_settings_change = self._twitch.state_change(State.GAMES_UPDATE) + on_settings_change = self._twitch.get_change_state_callable(State.GAMES_UPDATE) self.settings = SettingsManager( - self._broadcaster, - twitch.settings, - self.output, - on_change=on_settings_change + self._broadcaster, twitch.settings, self.output, on_change=on_settings_change ) # Selected channel tracking (set by web client) self._selected_channel_id: int | None = None + self._stream_selector = StreamSelector() # Start message logger.info("Web GUI Manager initialized") @@ -82,14 +81,6 @@ class WebGUIManager: """ self._broadcaster.set_socketio(sio) - def save(self, *, force: bool = False): - """Save GUI state and settings. - - Args: - force: Force save even if no changes detected - """ - self._twitch.settings.save(force=force) - def print(self, message: str): """Print message to console output. @@ -165,70 +156,14 @@ class WebGUIManager: """ asyncio.create_task(self._broadcaster.emit("manual_mode_update", manual_mode_info)) - def get_wanted_tree(self) -> list[dict]: - """ - Get the hierarchical tree of wanted items (Games -> Campaigns -> Drops -> Benefits). - Ignoring 'can earn within' time constraint. - """ - wanted_tree = [] - games_to_watch = self._twitch.settings.games_to_watch - mining_benefits = self._twitch.settings.mining_benefits - - for game_name in games_to_watch: - game_matches = [] - game_obj = None - - # Find all campaigns for this game - for campaign in self._twitch.inventory: - if campaign.game.name.lower() != game_name.lower(): - continue - - if game_obj is None: - game_obj = campaign.game - - wanted_drops = [] - for drop in campaign.drops: - if drop.is_claimed: - continue - - filtered_benefits = [ - b.name for b in drop.benefits - if b.is_wanted(mining_benefits) - ] - - if filtered_benefits: - wanted_drops.append({ - "name": drop.name, - "benefits": filtered_benefits - }) - - if wanted_drops: - game_matches.append({ - "id": campaign.id, - "name": campaign.name, - "url": campaign.campaign_url, - "drops": wanted_drops - }) - - if game_matches: - # Use the game object from the first matching campaign to get metadata - # If no game object found (shouldn't happen if game_matches has items), - # we can't easily get the icon unless we search known games or similar. - # But here we are iterating games_to_watch, so we know the name at least. - icon_url = game_obj.box_art_url if game_obj else None - - wanted_tree.append({ - "game_id": game_obj.id if game_obj else None, - "game_name": game_name, - "game_icon": icon_url, - "campaigns": game_matches - }) - - return wanted_tree + def get_wanted_game_tree(self) -> list[dict]: + return self._stream_selector.get_wanted_game_tree( + self._twitch.settings, self._twitch.inventory + ) def broadcast_wanted_items(self): """Broadcast the list of wanted items to connected clients.""" - tree = self.get_wanted_tree() + tree = self.get_wanted_game_tree() asyncio.create_task(self._broadcaster.emit("wanted_items_update", tree)) diff --git a/src/web/managers/console.py b/src/web/managers/console.py index e039e6c..56eb83c 100644 --- a/src/web/managers/console.py +++ b/src/web/managers/console.py @@ -12,11 +12,12 @@ if TYPE_CHECKING: from src.web.managers.broadcaster import WebSocketBroadcaster - import logging + logger = logging.getLogger("TwitchDrops") + class ConsoleOutputManager: """Manages console output display in the web interface. diff --git a/src/web/managers/settings.py b/src/web/managers/settings.py index 5188c09..43ee521 100644 --- a/src/web/managers/settings.py +++ b/src/web/managers/settings.py @@ -3,16 +3,17 @@ from __future__ import annotations import asyncio -from typing import TYPE_CHECKING, Any, Callable +import logging +from collections.abc import Callable +from typing import TYPE_CHECKING, Any from src.i18n.translator import _ from src.models.game import Game -import logging + logger = logging.getLogger("TwitchDrops") - if TYPE_CHECKING: from src.config.settings import Settings from src.web.managers.broadcaster import WebSocketBroadcaster @@ -45,17 +46,8 @@ class SettingsManager: Returns: Dictionary containing all user-configurable settings """ - return { - "language": self._settings.language, - "dark_mode": self._settings.dark_mode, - "games_to_watch": list(self._settings.games_to_watch), - "games_available": self._available_games, - "proxy": str(self._settings.proxy), - "connection_quality": self._settings.connection_quality, - "minimum_refresh_interval_minutes": self._settings.minimum_refresh_interval_minutes, - "inventory_filters": self._settings.inventory_filters, - "mining_benefits": self._settings.mining_benefits, - } + settings = vars(self._settings).copy() + return settings def get_languages(self) -> dict[str, Any]: """Get available languages and current selection. @@ -79,70 +71,62 @@ class SettingsManager: settings_data: Dictionary of settings to update """ should_trigger_update = False - - if "games_to_watch" in settings_data: - self._settings.games_to_watch = settings_data["games_to_watch"] - self._log_change(f"Setting changed: games_to_watch = {len(self._settings.games_to_watch)} games") - should_trigger_update = True - - if "dark_mode" in settings_data: - self._settings.dark_mode = settings_data["dark_mode"] - self._log_change(f"Setting changed: dark_mode = {self._settings.dark_mode}") - - if "language" in settings_data: - language = settings_data["language"] - try: - _.set_language(language) - self._settings.language = language - self._log_change(f"Setting changed: language = {language}") - # Notify clients that translations need to be reloaded - asyncio.create_task( - self._broadcaster.emit("language_changed", {"language": language}) - ) - except ValueError as e: - # Invalid language, log warning - logger.warning(f"Invalid language '{language}': {e}") - - if "connection_quality" in settings_data: - self._settings.connection_quality = settings_data["connection_quality"] - self._log_change(f"Setting changed: connection_quality = {self._settings.connection_quality}") - + should_trigger_update |= self.check_and_update_setting( + "games_to_watch", settings_data.get("games_to_watch"), True + ) + should_trigger_update |= self.check_and_update_setting( + "dark_mode", settings_data.get("dark_mode") + ) + should_trigger_update |= self.check_and_update_setting( + "language", settings_data.get("language"), False, self._set_language + ) + should_trigger_update |= self.check_and_update_setting( + "connection_quality", settings_data.get("connection_quality") + ) if "proxy" in settings_data: - from yarl import URL + proxy_value = settings_data["proxy"] + should_trigger_update |= self.check_and_update_setting( + "proxy", + str(proxy_value).strip() if proxy_value else "", + True, + lambda proxy: self._log_change("Proxy cleared") if proxy == "" else None, + ) + should_trigger_update |= self.check_and_update_setting( + "minimum_refresh_interval_minutes", + settings_data.get("minimum_refresh_interval_minutes"), + ) + should_trigger_update |= self.check_and_update_setting( + "inventory_filters", settings_data.get("inventory_filters") + ) + should_trigger_update |= self.check_and_update_setting( + "mining_benefits", settings_data.get("mining_benefits"), True + ) - proxy_str = settings_data["proxy"].strip() - if proxy_str: - if self._settings.proxy != URL(proxy_str): - self._settings.proxy = URL(proxy_str) - self._log_change(f"Proxy set to: {proxy_str}") - else: - if self._settings.proxy != URL(): - self._settings.proxy = URL() - self._log_change("Proxy cleared") - - if "minimum_refresh_interval_minutes" in settings_data: - self._settings.minimum_refresh_interval_minutes = settings_data[ - "minimum_refresh_interval_minutes" - ] - self._log_change(f"Setting changed: minimum_refresh_interval_minutes = {self._settings.minimum_refresh_interval_minutes}") - - if "inventory_filters" in settings_data: - self._settings.inventory_filters = settings_data["inventory_filters"] - self._log_change("Setting changed: inventory_filters updated") - - if "mining_benefits" in settings_data: - self._settings.mining_benefits = settings_data["mining_benefits"] - self._log_change(f"Setting changed: mining_benefits = {self._settings.mining_benefits}") - should_trigger_update = True - - self._settings.alter() - # Persist settings to disk immediately self._settings.save() asyncio.create_task(self._broadcaster.emit("settings_updated", self.get_settings())) if should_trigger_update and self._on_change: self._on_change() + def check_and_update_setting( + self, + key: str, + new_value: Any, + should_trigger_update: bool = False, + action: Callable[[Any], None] = lambda x: None, + ): + if new_value is None or getattr(self._settings, key, None) == new_value: + return False + setattr(self._settings, key, new_value) + self._log_change(f"Setting changed: {key} = {new_value}") + action(new_value) + return should_trigger_update + + def _set_language(self, language: str): + _.set_language(language) + # Notify clients that translations need to be reloaded + asyncio.create_task(self._broadcaster.emit("language_changed", {"language": language})) + def set_games(self, games: set[Game]): """Update the list of available games for settings panel. diff --git a/src/websocket/websocket.py b/src/websocket/websocket.py index 35dde46..7e6cce4 100644 --- a/src/websocket/websocket.py +++ b/src/websocket/websocket.py @@ -3,7 +3,6 @@ from __future__ import annotations import asyncio import json import logging -import traceback from contextlib import suppress from time import time from typing import TYPE_CHECKING @@ -17,11 +16,11 @@ from src.utils import ( CHARS_ASCII, AwaitableValue, ExponentialBackoff, + chunk, create_nonce, format_traceback, json_minify, task_wrapper, - chunk, ) @@ -164,7 +163,9 @@ class Websocket: session = await self._twitch.get_session() backoff = ExponentialBackoff(**kwargs) proxy = self._twitch.settings.proxy or None - ws_logger.info(f"Websocket[{self._idx}] connecting with {'no' if proxy is None else str(proxy)} proxy") + ws_logger.info( + f"Websocket[{self._idx}] connecting with {'no' if proxy is None else proxy} proxy" + ) for delay in backoff: try: async with session.ws_connect(ws_url, proxy=proxy) as websocket: @@ -226,13 +227,19 @@ class Websocket: ) elif self._closed.is_set(): # we closed it - exit - ws_logger.debug(f"Websocket[{self._idx}] to wss://pubsub-edge.twitch.tv/v1 stopped.") + ws_logger.debug( + f"Websocket[{self._idx}] to wss://pubsub-edge.twitch.tv/v1 stopped." + ) self.set_status(_.t["gui"]["websocket"]["disconnected"]) return except Exception: - ws_logger.exception(f"Exception in Websocket[{self._idx}] to wss://pubsub-edge.twitch.tv/v1") + ws_logger.exception( + f"Exception in Websocket[{self._idx}] to wss://pubsub-edge.twitch.tv/v1" + ) self.set_status(_.t["gui"]["websocket"]["reconnecting"]) - ws_logger.warning(f"Websocket[{self._idx}] to wss://pubsub-edge.twitch.tv/v1 reconnecting...") + ws_logger.warning( + f"Websocket[{self._idx}] to wss://pubsub-edge.twitch.tv/v1 reconnecting..." + ) async def _handle_ping(self): """Handle ping/pong heartbeat to keep connection alive.""" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_benefit_filter.py b/tests/test_benefit_filter.py index 3c1d897..683490f 100644 --- a/tests/test_benefit_filter.py +++ b/tests/test_benefit_filter.py @@ -1,8 +1,9 @@ import unittest -from datetime import datetime, timezone -from src.models.benefit import Benefit, BenefitType -from src.models.drop import TimedDrop + +from src.models.benefit import Benefit from src.models.campaign import DropsCampaign +from src.models.drop import TimedDrop + class TestBenefitFilter(unittest.TestCase): def setUp(self): @@ -12,7 +13,7 @@ class TestBenefitFilter(unittest.TestCase): "id": "b1", "name": "Test Badge", "distributionType": "BADGE", - "imageAssetURL": "url" + "imageAssetURL": "url", } } self.benefit_item_data = { @@ -20,10 +21,10 @@ class TestBenefitFilter(unittest.TestCase): "id": "b2", "name": "Test Item", "distributionType": "DIRECT_ENTITLEMENT", - "imageAssetURL": "url" + "imageAssetURL": "url", } } - + # Initialize Benefits self.badge = Benefit(self.benefit_badge_data) self.item = Benefit(self.benefit_item_data) @@ -32,7 +33,7 @@ class TestBenefitFilter(unittest.TestCase): allowed = {"BADGE": True, "DIRECT_ENTITLEMENT": False} self.assertTrue(self.badge.is_wanted(allowed)) self.assertFalse(self.item.is_wanted(allowed)) - + allowed_all = {"BADGE": True, "DIRECT_ENTITLEMENT": True} self.assertTrue(self.badge.is_wanted(allowed_all)) self.assertTrue(self.item.is_wanted(allowed_all)) @@ -40,16 +41,17 @@ class TestBenefitFilter(unittest.TestCase): def test_drop_has_wanted_unclaimed_benefits(self): # Mock TimedDrop # functionality relies on self.benefits and self.is_claimed - + # 1. Unclaimed Drop with only Badge drop1 = unittest.mock.MagicMock(spec=TimedDrop) drop1.is_claimed = False drop1.benefits = [self.badge] drop1.has_wanted_unclaimed_benefits = TimedDrop.has_wanted_unclaimed_benefits.__get__(drop1) - + drop1.get_wanted_unclaimed_benefits = TimedDrop.get_wanted_unclaimed_benefits.__get__(drop1) + allowed = {"BADGE": True, "DIRECT_ENTITLEMENT": False} self.assertTrue(drop1.has_wanted_unclaimed_benefits(allowed)) - + allowed_none = {"BADGE": False, "DIRECT_ENTITLEMENT": False} self.assertFalse(drop1.has_wanted_unclaimed_benefits(allowed_none)) @@ -58,7 +60,8 @@ class TestBenefitFilter(unittest.TestCase): drop2.is_claimed = True drop2.benefits = [self.badge] drop2.has_wanted_unclaimed_benefits = TimedDrop.has_wanted_unclaimed_benefits.__get__(drop2) - + drop2.get_wanted_unclaimed_benefits = TimedDrop.get_wanted_unclaimed_benefits.__get__(drop2) + self.assertFalse(drop2.has_wanted_unclaimed_benefits(allowed)) # 3. Drops with mixed benefits @@ -66,7 +69,8 @@ class TestBenefitFilter(unittest.TestCase): drop3.is_claimed = False drop3.benefits = [self.badge, self.item] drop3.has_wanted_unclaimed_benefits = TimedDrop.has_wanted_unclaimed_benefits.__get__(drop3) - + drop3.get_wanted_unclaimed_benefits = TimedDrop.get_wanted_unclaimed_benefits.__get__(drop3) + # Only want Item (which it has) allowed_item = {"BADGE": False, "DIRECT_ENTITLEMENT": True} self.assertTrue(drop3.has_wanted_unclaimed_benefits(allowed_item)) @@ -74,24 +78,27 @@ class TestBenefitFilter(unittest.TestCase): def test_campaign_has_wanted_unclaimed_benefits(self): # Mock DropsCampaign campaign = unittest.mock.MagicMock(spec=DropsCampaign) - + drop1 = unittest.mock.MagicMock(spec=TimedDrop) drop1.has_wanted_unclaimed_benefits.return_value = False - + drop2 = unittest.mock.MagicMock(spec=TimedDrop) drop2.has_wanted_unclaimed_benefits.return_value = True - + campaign.drops = [drop1, drop2] # Bind method - campaign.has_wanted_unclaimed_benefits = DropsCampaign.has_wanted_unclaimed_benefits.__get__(campaign) - - allowed = {"BADGE": True} + campaign.has_wanted_unclaimed_benefits = ( + DropsCampaign.has_wanted_unclaimed_benefits.__get__(campaign) + ) + + allowed = {"BADGE": True} # Since drop2 returns True, campaign should return True self.assertTrue(campaign.has_wanted_unclaimed_benefits(allowed)) - + # Case where all drops return False drop2.has_wanted_unclaimed_benefits.return_value = False self.assertFalse(campaign.has_wanted_unclaimed_benefits(allowed)) -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main() diff --git a/tests/test_client_filter.py b/tests/test_client_filter.py deleted file mode 100644 index dd7e4b1..0000000 --- a/tests/test_client_filter.py +++ /dev/null @@ -1,57 +0,0 @@ -import unittest -from unittest.mock import MagicMock -from datetime import datetime, timedelta, timezone -from src.core.client import Twitch -from src.models.campaign import DropsCampaign -from src.models.game import Game - -class TestClientFilter(unittest.TestCase): - def setUp(self): - # Mock Settings - self.settings = MagicMock() - self.settings.games_to_watch = ["Game1", "Game2"] - self.settings.mining_benefits = {"BADGE": True, "DIRECT_ENTITLEMENT": True} # both allowed by default - - # Mock Twitch Client - self.client = MagicMock(spec=Twitch) - self.client.settings = self.settings - self.client.inventory = [] - # Bind the method we want to test - self.client._filter_wanted_campaigns = Twitch._filter_wanted_campaigns.__get__(self.client) - - def test_filter_wanted_campaigns(self): - # Setup Campaigns - - # Campaign 1: Game1, Can Earn, Has Wanted Benefits -> Should be selected - c1 = MagicMock(spec=DropsCampaign) - c1.game = Game({"id": 1, "name": "Game1"}) - c1.can_earn_within.return_value = True - c1.has_wanted_unclaimed_benefits.return_value = True - - # Campaign 2: Game2, Can Earn, NO Wanted Benefits -> Should NOT be selected - c2 = MagicMock(spec=DropsCampaign) - c2.game = Game({"id": 2, "name": "Game2"}) - c2.can_earn_within.return_value = True - c2.has_wanted_unclaimed_benefits.return_value = False - - # Campaign 3: Game3 (Not in games_to_watch), Can Earn, Has Benefits -> Should NOT be selected - c3 = MagicMock(spec=DropsCampaign) - c3.game = Game({"id": 3, "name": "Game3"}) - c3.can_earn_within.return_value = True - c3.has_wanted_unclaimed_benefits.return_value = True - - self.client.inventory = [c1, c2, c3] - - next_hour = datetime.now(timezone.utc) + timedelta(hours=1) - - wanted_games = self.client._filter_wanted_campaigns(next_hour) - - self.assertEqual(len(wanted_games), 1) - self.assertEqual(wanted_games[0].name, "Game1") - - # Verify calls - c1.has_wanted_unclaimed_benefits.assert_called_with(self.settings.mining_benefits) - c2.has_wanted_unclaimed_benefits.assert_called_with(self.settings.mining_benefits) - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_proxy_settings.py b/tests/test_proxy_settings.py index f9a05c7..6c12584 100644 --- a/tests/test_proxy_settings.py +++ b/tests/test_proxy_settings.py @@ -1,37 +1,24 @@ - import unittest -import asyncio -from unittest.mock import MagicMock -from yarl import URL +from unittest.mock import AsyncMock, MagicMock -# Mock the imports that depend on application structure if needed, -# or just import them if PYTHONPATH is set correctly. -# Assuming run from root, imports should work. from src.config.settings import Settings -from src.web.managers.settings import SettingsManager from src.web.managers.console import ConsoleOutputManager +from src.web.managers.settings import SettingsManager + class TestProxySettings(unittest.TestCase): def setUp(self): self.mock_broadcaster = MagicMock() - # Mock emit to be awaitable - f = asyncio.Future() - f.set_result(None) - self.mock_broadcaster.emit = MagicMock(return_value=f) - + # Use AsyncMock for emit - it returns a coroutine without needing an event loop + self.mock_broadcaster.emit = AsyncMock() + + # Create a pure mock Settings without wrapping a real instance + # This avoids file I/O during tests self.mock_settings = MagicMock(spec=Settings) - # Setup properties - self.mock_settings.proxy = URL() - self.mock_settings.language = "en" - self.mock_settings.dark_mode = False - self.mock_settings.games_to_watch = [] - self.mock_settings.connection_quality = 1 - self.mock_settings.minimum_refresh_interval_minutes = 30 - self.mock_console = MagicMock(spec=ConsoleOutputManager) - + # Mock asyncio.create_task - self.create_task_patcher = unittest.mock.patch('asyncio.create_task') + self.create_task_patcher = unittest.mock.patch("asyncio.create_task") self.mock_create_task = self.create_task_patcher.start() def tearDown(self): @@ -39,25 +26,27 @@ class TestProxySettings(unittest.TestCase): def test_update_proxy_setting(self): manager = SettingsManager(self.mock_broadcaster, self.mock_settings, self.mock_console) - + # Test setting a proxy proxy_url = "http://user:pass@localhost:8080" manager.update_settings({"proxy": proxy_url}) - - self.assertEqual(self.mock_settings.proxy, URL(proxy_url)) - self.mock_console.print.assert_called_with(f"Proxy set to: {proxy_url}") - + + self.assertEqual(self.mock_settings.proxy, proxy_url) + self.mock_console.print.assert_called_with( + "Setting changed: proxy = http://user:pass@localhost:8080" + ) + # Test clearing a proxy manager.update_settings({"proxy": ""}) - self.assertEqual(self.mock_settings.proxy, URL()) + self.assertEqual(self.mock_settings.proxy, "") self.mock_console.print.assert_called_with("Proxy cleared") def test_proxy_persistence_trigger(self): manager = SettingsManager(self.mock_broadcaster, self.mock_settings, self.mock_console) manager.update_settings({"proxy": "http://1.2.3.4:8080"}) - - self.mock_settings.alter.assert_called() + self.mock_settings.save.assert_called() + if __name__ == "__main__": unittest.main() diff --git a/tests/test_settings_api.py b/tests/test_settings_api.py index a1253f2..24308f6 100644 --- a/tests/test_settings_api.py +++ b/tests/test_settings_api.py @@ -1,16 +1,17 @@ -import asyncio import unittest -from unittest.mock import MagicMock, AsyncMock, patch +from unittest.mock import AsyncMock, MagicMock + +from src.config.settings import Settings from src.web.app import SettingsUpdate from src.web.managers.settings import SettingsManager -from src.config.settings import Settings + class TestSettingsAPI(unittest.IsolatedAsyncioTestCase): def test_settings_update_model(self): # Verify model accepts new fields update_data = { "inventory_filters": {"show_upcoming": True}, - "mining_benefits": {"BADGE": True} + "mining_benefits": {"BADGE": True}, } model = SettingsUpdate(**update_data) self.assertEqual(model.inventory_filters, update_data["inventory_filters"]) @@ -20,26 +21,27 @@ class TestSettingsAPI(unittest.IsolatedAsyncioTestCase): # Mock dependencies mock_broadcaster = AsyncMock() mock_settings = MagicMock(spec=Settings) - # Configure mock to satisfy get_settings() calls - mock_settings.language = "en" - mock_settings.dark_mode = False + # Initialize mock attributes with default values for comparison + mock_settings.inventory_filters = {} + mock_settings.mining_benefits = {} mock_settings.games_to_watch = [] - mock_settings.proxy = "http://proxy" - mock_settings.connection_quality = 1 - mock_settings.minimum_refresh_interval_minutes = 30 - + mock_console = MagicMock() mock_callback = MagicMock() - - manager = SettingsManager(mock_broadcaster, mock_settings, mock_console, on_change=mock_callback) - - # 1. Update Inventory Filters (Should NOT trigger callback if not games/benefits) + + manager = SettingsManager( + mock_broadcaster, mock_settings, mock_console, on_change=mock_callback + ) + + # 1. Update Inventory Filters (does NOT trigger callback per implementation) inv_filters = {"show_upcoming": False} manager.update_settings({"inventory_filters": inv_filters}) - mock_callback.assert_not_called() + mock_callback.assert_not_called() # inventory_filters has should_trigger_update=False self.assertEqual(mock_settings.inventory_filters, inv_filters) - mock_console.print.assert_called_with("Setting changed: inventory_filters updated") - + mock_console.print.assert_called_with( + "Setting changed: inventory_filters = {'show_upcoming': False}" + ) + # 2. Update Mining Benefits (SHOULD trigger callback) benefits = {"BADGE": False} manager.update_settings({"mining_benefits": benefits}) @@ -54,5 +56,5 @@ class TestSettingsAPI(unittest.IsolatedAsyncioTestCase): mock_callback.assert_called_once() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_verify_proxy.py b/tests/test_verify_proxy.py index 9235e38..165aa3d 100644 --- a/tests/test_verify_proxy.py +++ b/tests/test_verify_proxy.py @@ -1,9 +1,9 @@ - import asyncio import unittest -from unittest.mock import MagicMock, patch, AsyncMock -from src.web.app import verify_proxy -from src.web.app import ProxyVerifyRequest +from unittest.mock import AsyncMock, MagicMock, patch + +from src.web.app import ProxyVerifyRequest, verify_proxy + class MockResponseContext: def __init__(self, response_or_exc): @@ -17,10 +17,11 @@ class MockResponseContext: async def __aexit__(self, exc_type, exc, tb): pass + class TestVerifyProxy(unittest.TestCase): def setUp(self): # Patch aiohttp.ClientSession - self.session_patcher = patch('aiohttp.ClientSession') + self.session_patcher = patch("aiohttp.ClientSession") self.mock_session_cls = self.session_patcher.start() # session object itself is not async, it has async methods/CMs self.mock_session = MagicMock() @@ -35,34 +36,38 @@ class TestVerifyProxy(unittest.TestCase): # Mock successful response mock_response = AsyncMock() mock_response.status = 200 - + # Configure get to return our custom context manager - self.mock_session.get.side_effect = lambda *args, **kwargs: MockResponseContext(mock_response) + self.mock_session.get.side_effect = lambda *args, **kwargs: MockResponseContext( + mock_response + ) request = ProxyVerifyRequest(proxy="http://valid-proxy:8080") - + # Run async function result = asyncio.run(verify_proxy(request)) - - self.assertTrue(result['success']) - self.assertIn("Connected!", result['message']) + + self.assertTrue(result["success"]) + self.assertIn("Connected!", result["message"]) self.assertIn("latency", result) def test_verify_proxy_failure_status(self): # Mock error status response mock_response = AsyncMock() mock_response.status = 503 - - self.mock_session.get.side_effect = lambda *args, **kwargs: MockResponseContext(mock_response) + + self.mock_session.get.side_effect = lambda *args, **kwargs: MockResponseContext( + mock_response + ) request = ProxyVerifyRequest(proxy="http://bad-proxy:8080") - + result = asyncio.run(verify_proxy(request)) - - self.assertFalse(result['success']) - + + self.assertFalse(result["success"]) + # The expected message in app.py is: f"Proxy reachable but returned {response.status}" - self.assertIn("Proxy reachable but returned 503", result['message']) + self.assertIn("Proxy reachable but returned 503", result["message"]) def test_verify_proxy_connection_error(self): # Mock connection error @@ -70,17 +75,18 @@ class TestVerifyProxy(unittest.TestCase): self.mock_session.get.side_effect = lambda *args, **kwargs: MockResponseContext(error) request = ProxyVerifyRequest(proxy="http://down-proxy:8080") - + result = asyncio.run(verify_proxy(request)) - - self.assertFalse(result['success']) - self.assertIn("Connection failed", result['message']) + + self.assertFalse(result["success"]) + self.assertIn("Connection failed", result["message"]) def test_verify_proxy_empty(self): request = ProxyVerifyRequest(proxy="") result = asyncio.run(verify_proxy(request)) - self.assertFalse(result['success']) - self.assertEqual(result['message'], "Proxy URL is empty") + self.assertFalse(result["success"]) + self.assertEqual(result["message"], "Proxy URL is empty") + if __name__ == "__main__": unittest.main() diff --git a/tests/test_wanted_games_filter.py b/tests/test_wanted_games_filter.py new file mode 100644 index 0000000..099864f --- /dev/null +++ b/tests/test_wanted_games_filter.py @@ -0,0 +1,103 @@ +import unittest +from unittest.mock import MagicMock + +from src.models.campaign import DropsCampaign +from src.models.game import Game +from src.services.stream_selector import StreamSelector + + +class TestWantedGamesFilter(unittest.TestCase): + def setUp(self): + # Mock Settings + self.settings = MagicMock() + self.settings.games_to_watch = ["Game1", "Game2"] + self.settings.mining_benefits = { + "BADGE": True, + "DIRECT_ENTITLEMENT": True, + } # both allowed by default + + def test_filter_wanted_campaigns(self): + # Setup Campaigns + + # Campaign 1: Game1, Can Earn, Has Wanted Benefits -> Should be selected + c1 = MagicMock(spec=DropsCampaign) + c1.game = Game({"id": 1, "name": "Game1"}) + c1.can_earn_within.return_value = True + c1.id = "123" + c1.name = "Test Campaign" + c1.campaign_url = "http://test.url" + d1 = MagicMock() + d1.name = "Test Drop" + d1.is_claimed = False + d1.get_wanted_unclaimed_benefits.return_value = ["Benefit1"] + c1.drops = [d1] + c1.has_wanted_unclaimed_benefits.side_effect = ( + DropsCampaign.has_wanted_unclaimed_benefits.__get__(c1, DropsCampaign) + ) + + # Campaign 2: Game2, Can Earn, NO Wanted Benefits -> Should NOT be selected + c2 = MagicMock(spec=DropsCampaign) + c2.game = Game({"id": 2, "name": "Game2"}) + c2.can_earn_within.return_value = True + d2 = MagicMock() + d2.is_claimed = False + d2.get_wanted_unclaimed_benefits.return_value = [] + c2.drops = [d2] + c2.has_wanted_unclaimed_benefits.side_effect = ( + DropsCampaign.has_wanted_unclaimed_benefits.__get__(c2, DropsCampaign) + ) + + # Campaign 3: Game3 (Not in games_to_watch), Can Earn, Has Benefits -> Should NOT be selected + c3 = MagicMock(spec=DropsCampaign) + c3.game = Game({"id": 3, "name": "Game3"}) + c3.can_earn_within.return_value = True + d3 = MagicMock() + d3.is_claimed = False + d3.get_wanted_unclaimed_benefits.return_value = ["Benefit3"] + c3.drops = [d3] + c3.has_wanted_unclaimed_benefits.side_effect = ( + DropsCampaign.has_wanted_unclaimed_benefits.__get__(c3, DropsCampaign) + ) + + # Campaign 4: Game1, Can Earn, Has Claimed Wanted Benefits -> Should NOT be selected + c4 = MagicMock(spec=DropsCampaign) + c4.game = Game({"id": 1, "name": "Game1"}) + c4.can_earn_within.return_value = True + c4.id = "123" + c4.name = "Test Campaign" + c4.campaign_url = "http://test.url" + d4 = MagicMock() + d4.name = "Test Drop" + d4.is_claimed = True + d4.get_wanted_unclaimed_benefits.return_value = ["Benefit4"] + c4.drops = [d4] + c4.has_wanted_unclaimed_benefits.side_effect = ( + DropsCampaign.has_wanted_unclaimed_benefits.__get__(c4, DropsCampaign) + ) + + # Campaign 5: Game1, Can Not Earn, Has Wanted Benefits -> Should NOT be selected + c5 = MagicMock(spec=DropsCampaign) + c5.game = Game({"id": 1, "name": "Game1"}) + c5.can_earn_within.return_value = False + c5.id = "123" + c5.name = "Test Campaign" + c5.campaign_url = "http://test.url" + d5 = MagicMock() + d5.name = "Test Drop" + d5.is_claimed = False + d5.get_wanted_unclaimed_benefits.return_value = ["Benefit5"] + c5.drops = [d5] + c5.has_wanted_unclaimed_benefits.side_effect = ( + DropsCampaign.has_wanted_unclaimed_benefits.__get__(c5, DropsCampaign) + ) + + inventory = [c1, c2, c3, c4, c5] + stream_selector = StreamSelector() + wanted_games = stream_selector.get_wanted_games(self.settings, inventory) + + self.assertEqual(len(wanted_games), 1) + self.assertEqual(wanted_games[0].name, "Game1") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_wanted_items.py b/tests/test_wanted_items.py index 30e3b54..0f49a73 100644 --- a/tests/test_wanted_items.py +++ b/tests/test_wanted_items.py @@ -1,25 +1,27 @@ import unittest -from unittest.mock import MagicMock, Mock -from src.web.gui_manager import WebGUIManager -from src.core.client import Twitch, State -from src.models.campaign import DropsCampaign -from src.models.game import Game -from src.models.drop import TimedDrop +from unittest.mock import MagicMock + +from src.core.client import Twitch from src.models.benefit import Benefit, BenefitType +from src.models.campaign import DropsCampaign +from src.models.drop import TimedDrop +from src.models.game import Game +from src.web.gui_manager import WebGUIManager + class TestWantedItems(unittest.TestCase): def setUp(self): # Mock Twitch Client self.twitch = MagicMock(spec=Twitch) self.twitch.settings = MagicMock() - self.twitch.state_change.return_value = lambda: None - + self.twitch.get_change_state_callable.return_value = lambda: None + # Mock dependencies created in __init__ # We can't easily mock internal creation of sub-managers without patching, # but for get_wanted_tree we only need self._twitch.settings and self._twitch.inventory - + # However, WebGUIManager __init__ calls self.twitch.state_change - + self.gui = WebGUIManager(self.twitch) # Suppress broadcaster self.gui._broadcaster = MagicMock() @@ -30,21 +32,25 @@ class TestWantedItems(unittest.TestCase): self.twitch.settings.mining_benefits = {"BADGE": True, "DIRECT_ENTITLEMENT": False} # Setup Inventory - + # Campaign 1: Game1, Drop with BADGE (Wanted) c1 = MagicMock(spec=DropsCampaign) c1.id = "c1_id" c1.name = "Campaign1" c1.campaign_url = "http://url1" c1.game = Game({"id": 1, "name": "Game1", "boxArtURL": "http://img1"}) - + c1.can_earn_within.return_value = True + d1 = MagicMock(spec=TimedDrop) d1.name = "Drop1" d1.is_claimed = False + d1.get_wanted_unclaimed_benefits = TimedDrop.get_wanted_unclaimed_benefits.__get__( + d1, TimedDrop + ) b1 = MagicMock(spec=Benefit) b1.name = "Badge1" b1.type = BenefitType.BADGE - b1.is_wanted.return_value = True + b1.is_wanted = Benefit.is_wanted.__get__(b1, Benefit) d1.benefits = [b1] c1.drops = [d1] @@ -54,45 +60,75 @@ class TestWantedItems(unittest.TestCase): c2.name = "Campaign2" c2.campaign_url = "http://url2" c2.game = Game({"id": 2, "name": "Game2", "boxArtURL": "http://img2"}) - + c2.can_earn_within.return_value = True + d2 = MagicMock(spec=TimedDrop) d2.name = "Drop2" d2.is_claimed = False + d2.get_wanted_unclaimed_benefits = TimedDrop.get_wanted_unclaimed_benefits.__get__( + d2, TimedDrop + ) b2 = MagicMock(spec=Benefit) b2.name = "Item1" b2.type = BenefitType.DIRECT_ENTITLEMENT - b2.is_wanted.return_value = False + b2.is_wanted = Benefit.is_wanted.__get__(b2, Benefit) d2.benefits = [b2] c2.drops = [d2] - + # Campaign 3: Game3 (Not in watch list), Drop with BADGE (Wanted but wrong game) c3 = MagicMock(spec=DropsCampaign) c3.id = "c3_id" c3.name = "Campaign3" c3.campaign_url = "http://url3" c3.game = Game({"id": 3, "name": "Game3", "boxArtURL": "http://img3"}) - + c3.can_earn_within.return_value = True + d3 = MagicMock(spec=TimedDrop) d3.name = "Drop3" d3.is_claimed = False + d3.get_wanted_unclaimed_benefits = TimedDrop.get_wanted_unclaimed_benefits.__get__( + d3, TimedDrop + ) b3 = MagicMock(spec=Benefit) b3.name = "Badge2" b3.type = BenefitType.BADGE - b3.is_wanted.return_value = True + b3.is_wanted = Benefit.is_wanted.__get__(b3, Benefit) d3.benefits = [b3] c3.drops = [d3] - self.twitch.inventory = [c1, c2, c3] + # Campaign 4: Game1, Drop with BADGE, can't earn (Wanted) + c4 = MagicMock(spec=DropsCampaign) + c4.id = "c4_id" + c4.name = "Campaign4" + c4.campaign_url = "http://url4" + c4.game = Game({"id": 1, "name": "Game1", "boxArtURL": "http://img1"}) + c4.can_earn_within.return_value = False + + d4 = MagicMock(spec=TimedDrop) + d4.name = "Drop4" + d4.is_claimed = False + d4.get_wanted_unclaimed_benefits = TimedDrop.get_wanted_unclaimed_benefits.__get__( + d4, TimedDrop + ) + b4 = MagicMock(spec=Benefit) + b4.name = "Badge1" + b4.type = BenefitType.BADGE + b4.is_wanted = Benefit.is_wanted.__get__(b4, Benefit) + d4.benefits = [b4] + c4.drops = [d4] + + self.twitch.inventory = [c1, c2, c3, c4] # Execute - result = self.gui.get_wanted_tree() + result = self.gui.get_wanted_game_tree() + print(result) # Verify # Expected: Game1 only self.assertEqual(len(result), 1) self.assertEqual(result[0]["game_name"], "Game1") self.assertEqual(result[0]["game_icon"], "http://img1") - + campaigns = result[0]["campaigns"] self.assertEqual(len(campaigns), 1) self.assertEqual(campaigns[0]["name"], "Campaign1") @@ -112,7 +148,7 @@ class TestWantedItems(unittest.TestCase): c1.name = "Campaign1" c1.campaign_url = "http://url1" c1.game = Game({"id": 1, "name": "Game1", "boxArtURL": "http://img1"}) - + d1 = MagicMock(spec=TimedDrop) d1.name = "Drop1" d1.is_claimed = True @@ -125,10 +161,11 @@ class TestWantedItems(unittest.TestCase): self.twitch.inventory = [c1] # Execute - result = self.gui.get_wanted_tree() + result = self.gui.get_wanted_game_tree() # Verify self.assertEqual(len(result), 0) -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main()