unify game selection and expectation logic (#32)

- unify game selection and expectation logic
- refactore settings and settings manager
- formatting and coverage and precommit hook
This commit is contained in:
Fengqing Liu
2026-01-19 00:39:58 +11:00
committed by GitHub
parent 8bf69abfda
commit 9727a9b8d2
27 changed files with 585 additions and 641 deletions

1
.gitignore vendored
View File

@@ -26,3 +26,4 @@ data/
*.iml
*.log
*.egg-info/
.coverage

21
.vscode/launch.json vendored
View File

@@ -3,6 +3,9 @@
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"env": {
"PYTHONPATH": "${workspaceFolder}"
},
"configurations": [
{
"name": "Run Current File",
@@ -16,7 +19,9 @@
"type": "debugpy",
"request": "launch",
"program": "main.py",
"args": ["-vvv"],
"args": [
"-vvv"
],
"console": "integratedTerminal",
"justMyCode": false
},
@@ -25,7 +30,9 @@
"type": "debugpy",
"request": "launch",
"program": "main.py",
"args": ["--dump"],
"args": [
"--dump"
],
"console": "integratedTerminal",
"justMyCode": false
},
@@ -34,7 +41,10 @@
"type": "debugpy",
"request": "launch",
"program": "main.py",
"args": ["-vvv", "--debug-ws"],
"args": [
"-vvv",
"--debug-ws"
],
"console": "integratedTerminal",
"justMyCode": false
},
@@ -43,7 +53,10 @@
"type": "debugpy",
"request": "launch",
"program": "main.py",
"args": ["-vvv", "--debug-gql"],
"args": [
"-vvv",
"--debug-gql"
],
"console": "integratedTerminal",
"justMyCode": false
},

View File

@@ -337,11 +337,8 @@ source env/bin/activate && python -m pytest tests/
### Manual Testing
1. Run with `-vvv` for maximum verbosity (levels: -v, -vv, -vvv, -vvvv)
2. Use `--dump` to generate debug data dumps
3. Check log files in `./logs/` directory
4. Use `--debug-ws` for websocket debug logging
5. Use `--debug-gql` for GraphQL debug logging
6. Monitor web GUI console output and browser developer tools
2. Check log files in `./logs/` directory
3. Monitor web GUI console output and browser developer tools
## Web GUI Architecture

View File

@@ -336,11 +336,8 @@ source env/bin/activate && python -m pytest tests/
### Manual Testing
1. Run with `-vvv` for maximum verbosity (levels: -v, -vv, -vvv, -vvvv)
2. Use `--dump` to generate debug data dumps
3. Check log files in `./logs/` directory
4. Use `--debug-ws` for websocket debug logging
5. Use `--debug-gql` for GraphQL debug logging
6. Monitor web GUI console output and browser developer tools
2. Check log files in `./logs/` directory
3. Monitor web GUI console output and browser developer tools
## Web GUI Architecture

View File

@@ -335,11 +335,8 @@ source env/bin/activate && python -m pytest tests/
### Manual Testing
1. Run with `-vvv` for maximum verbosity (levels: -v, -vv, -vvv, -vvvv)
2. Use `--dump` to generate debug data dumps
3. Check log files in `./logs/` directory
4. Use `--debug-ws` for websocket debug logging
5. Use `--debug-gql` for GraphQL debug logging
6. Monitor web GUI console output and browser developer tools
2. Check log files in `./logs/` directory
3. Monitor web GUI console output and browser developer tools
## Web GUI Architecture

View File

@@ -1,58 +0,0 @@
import http.server
import socketserver
import urllib.request
import logging
import shutil
PORT = 8888
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ProxyServer")
class Proxy(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
logger.info(f"Proxy request: {self.path}")
try:
with urllib.request.urlopen(self.path) as response:
self.send_response(response.status)
for header, value in response.headers.items():
self.send_header(header, value)
self.end_headers()
shutil.copyfileobj(response, self.wfile)
except Exception as e:
self.send_error(500, str(e))
def do_POST(self):
logger.info(f"Proxy request (POST): {self.path}")
length = int(self.headers['Content-Length'])
post_data = self.rfile.read(length)
req = urllib.request.Request(self.path, data=post_data, method='POST')
try:
with urllib.request.urlopen(req) as response:
self.send_response(response.status)
for header, value in response.headers.items():
self.send_header(header, value)
self.end_headers()
shutil.copyfileobj(response, self.wfile)
except Exception as e:
self.send_error(500, str(e))
def do_CONNECT(self):
logger.info(f"CONNECT request: {self.path}")
self.wfile.write(b"HTTP/1.1 200 Connection Established\r\n\r\n")
# In a real proxy we would tunnel.
# For verification of "reachability", getting the 200 is often enough for simple clients,
# but aiohttp might try to read/write through the tunnel.
# Minimal tunnel implementation:
return
if __name__ == "__main__":
# Reuse address to avoid port conflicts
socketserver.TCPServer.allow_reuse_address = True
with socketserver.TCPServer(("", PORT), Proxy) as httpd:
print(f"Serving proxy at port {PORT}")
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\nShutting down proxy")

View File

@@ -13,12 +13,16 @@ dependencies = [
"python-socketio>=5.10.0",
"yarl>=1.9.2",
"pydantic>=2.7.0",
"coverage>=7.3.1",
]
[project.optional-dependencies]
dev = [
"ruff",
"mypy",
"pytest",
"pytest-asyncio",
"coverage",
]
[build-system]

View File

@@ -1,6 +1,5 @@
from __future__ import annotations
import argparse
import asyncio
import logging
import signal
@@ -16,7 +15,7 @@ import truststore
if __name__ == "__main__":
truststore.inject_into_ssl()
from src.config import FILE_FORMATTER, LOGGING_LEVELS
from src.config import FILE_FORMATTER
from src.config.settings import Settings
from src.core.client import Twitch
from src.exceptions import CaptchaRequired
@@ -45,58 +44,9 @@ if __name__ == "__main__":
warnings.simplefilter("default", ResourceWarning)
class ParsedArgs(argparse.Namespace):
_verbose: int
_debug_ws: bool
_debug_gql: bool
log: bool
dump: bool
# TODO: replace int with union of literal values once typeshed updates
@property
def logging_level(self) -> int:
return LOGGING_LEVELS[min(self._verbose, 4)]
@property
def debug_ws(self) -> int:
"""
If the debug flag is True, return DEBUG.
If the main logging level is DEBUG, return INFO to avoid seeing raw messages.
Otherwise, return NOTSET to inherit the global logging level.
"""
if self._debug_ws:
return logging.DEBUG
elif self._verbose >= 4:
return logging.INFO
return logging.NOTSET
@property
def debug_gql(self) -> int:
if self._debug_gql:
return logging.DEBUG
elif self._verbose >= 4:
return logging.INFO
return logging.NOTSET
# handle input parameters
logger.debug("Parsing command line arguments")
parser = argparse.ArgumentParser(
description="A program that allows you to mine timed drops on Twitch.",
)
parser.add_argument("--version", action="version", version=f"v{__version__}")
parser.add_argument("-v", dest="_verbose", action="count", default=0)
parser.add_argument("--dump", action="store_true")
# undocumented debug args
parser.add_argument("--debug-ws", dest="_debug_ws", action="store_true", help=argparse.SUPPRESS)
parser.add_argument(
"--debug-gql", dest="_debug_gql", action="store_true", help=argparse.SUPPRESS
)
logger.debug("Parsing arguments into ParsedArgs namespace")
args = parser.parse_args(namespace=ParsedArgs())
# load settings
logger.debug("Loading settings")
try:
settings = Settings(args)
settings = Settings()
except Exception:
logger.exception("Error while loading settings")
print(f"Settings error: {traceback.format_exc()}", file=sys.stderr)
@@ -114,7 +64,9 @@ if __name__ == "__main__":
logger.info(f"Platform: {sys.platform}")
logger.info(f"Proxy: {settings.proxy}")
logger.info(f"Language: {settings.language}")
logger.info(f"Minimum refresh interval: {settings.minimum_refresh_interval_minutes} minutes")
logger.info(
f"Minimum refresh interval: {settings.minimum_refresh_interval_minutes} minutes"
)
exit_status = 0
client = Twitch(settings)
@@ -197,7 +149,7 @@ if __name__ == "__main__":
logger.info("Normal shutdown - proceeding")
# save the application state
logger.info("Saving application state")
client.save(force=True)
settings.save()
logger.info("Application state saved")
logger.info(f"=== Exiting with status code: {exit_status} ===")
sys.exit(exit_status)

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any, TypedDict
from dataclasses import dataclass
from typing import TypedDict
from yarl import URL
@@ -8,114 +9,69 @@ from src.config import DEFAULT_LANG, SETTINGS_PATH
from src.utils import json_load, json_save
if TYPE_CHECKING:
from typing import Any as ParsedArgs # Avoid circular import
class InventoryFilters(TypedDict):
game_name_search: list[str]
show_active: bool
show_not_linked: bool
show_upcoming: bool
show_expired: bool
show_finished: bool
show_benefit_item: bool
show_benefit_badge: bool
show_benefit_emote: bool
show_benefit_item: bool
show_benefit_other: bool
game_name_search: list[str]
show_expired: bool
show_finished: bool
show_not_linked: bool
show_upcoming: bool
class SettingsFile(TypedDict):
proxy: URL
language: str
dark_mode: bool
games_to_watch: list[str]
connection_quality: int
minimum_refresh_interval_minutes: int
inventory_filters: InventoryFilters
mining_benefits: dict[str, bool]
default_settings: SettingsFile = {
"proxy": URL(),
"games_to_watch": [],
"dark_mode": False,
default_settings = {
"connection_quality": 1,
"dark_mode": False,
"games_to_watch": [],
"language": DEFAULT_LANG,
"minimum_refresh_interval_minutes": 30,
"inventory_filters": {
"game_name_search": [],
"show_active": False,
"show_not_linked": True,
"show_upcoming": True,
"show_expired": False,
"show_finished": False,
"show_benefit_item": True,
"show_benefit_badge": True,
"show_benefit_emote": True,
"show_benefit_item": True,
"show_benefit_other": True,
"game_name_search": [],
"show_expired": False,
"show_finished": False,
"show_not_linked": True,
"show_upcoming": True,
},
"minimum_refresh_interval_minutes": 30,
"mining_benefits": {
"DIRECT_ENTITLEMENT": True,
"BADGE": True,
"DIRECT_ENTITLEMENT": True,
"EMOTE": True,
"UNKNOWN": True,
},
"proxy": "",
}
@dataclass
class Settings:
# from args
log: bool
dump: bool
# args properties
debug_ws: int
debug_gql: int
logging_level: int
# from settings file
proxy: URL
language: str
connection_quality: int
dark_mode: bool
games_to_watch: list[str]
connection_quality: int
minimum_refresh_interval_minutes: int
language: str
inventory_filters: InventoryFilters
minimum_refresh_interval_minutes: int
mining_benefits: dict[str, bool]
proxy: str
PASSTHROUGH = ("_settings", "_args", "_altered")
def __init__(self):
self.load()
def __init__(self, args: ParsedArgs):
self._settings: SettingsFile = json_load(SETTINGS_PATH, default_settings)
self._args: ParsedArgs = args
self._altered: bool = False
def load(self):
# TODO: remvoe customized serde in the future
settings = json_load(SETTINGS_PATH, default_settings, merge=True)
for key, value in settings.items():
if value is URL:
setattr(self, key, str(value))
else:
setattr(self, key, value)
# default logic of reading settings is to check args first, then the settings file
def __getattr__(self, name: str, /) -> Any:
if name in self.PASSTHROUGH:
# passthrough
return getattr(super(), name)
elif hasattr(self._args, name):
return getattr(self._args, name)
elif name in self._settings:
return self._settings[name] # type: ignore[literal-required]
return getattr(super(), name)
def __setattr__(self, name: str, value: Any, /) -> None:
if name in self.PASSTHROUGH:
# passthrough
return super().__setattr__(name, value)
elif name in self._settings:
self._settings[name] = value # type: ignore[literal-required]
self._altered = True
return
raise TypeError(f"{name} is missing a custom setter")
def __delattr__(self, name: str, /) -> None:
raise RuntimeError("settings can't be deleted")
def alter(self) -> None:
self._altered = True
def save(self, *, force: bool = False) -> None:
if self._altered or force:
json_save(SETTINGS_PATH, self._settings, sort=True)
def save(self) -> None:
json_save(SETTINGS_PATH, vars(self), sort=True)

View File

@@ -29,6 +29,7 @@ from src.services.channel_service import ChannelService
from src.services.inventory_service import InventoryService
from src.services.maintenance import MaintenanceService
from src.services.message_handlers import MessageHandlerService
from src.services.stream_selector import StreamSelector
from src.services.watch_service import WatchService
from src.utils import (
AwaitableValue,
@@ -86,6 +87,7 @@ class Twitch:
self._message_handler_service: MessageHandlerService = MessageHandlerService(self)
self._inventory_service: InventoryService = InventoryService(self)
self._watch_service: WatchService = WatchService(self)
self._stream_selector: StreamSelector = StreamSelector()
def _ensure_api_clients(self) -> None:
"""Ensure API clients are initialized (called after GUI is set)."""
@@ -148,7 +150,7 @@ class Twitch:
self._state = state
self._state_change.set()
def state_change(self, state: State) -> abc.Callable[[], None]:
def get_change_state_callable(self, state: State) -> abc.Callable[[], None]:
"""Return a callable that changes state when invoked (deferred call for GUI usage)."""
return partial(self.change_state, state)
@@ -163,11 +165,6 @@ class Twitch:
"""Print a message in the GUI."""
self.gui.print(message)
def save(self, *, force: bool = False) -> None:
"""Save the application state (settings and GUI state)."""
self.gui.save(force=force)
self.settings.save(force=force)
def _remove_channel_topics(self, channels: abc.Iterable[Channel]) -> None:
"""Remove websocket topics for a list of channels."""
topics_to_remove: list[str] = []
@@ -224,9 +221,6 @@ class Twitch:
self.change_state(State.INVENTORY_FETCH)
while True:
if self._state is State.IDLE:
if self.settings.dump:
self.close()
continue
self.gui.status.update(_.t["gui"]["status"]["idle"])
self.stop_watching()
# clear the flag and wait until it's set again
@@ -239,7 +233,6 @@ class Twitch:
# Broadcast unwanted items (based on settings)
self.gui.broadcast_wanted_items()
# Save state on every inventory fetch
self.save()
self.change_state(State.GAMES_UPDATE)
elif self._state is State.GAMES_UPDATE:
# claim drops from expired and active campaigns
@@ -263,48 +256,14 @@ class Twitch:
# Log detailed game -> campaigns -> channels mapping
if logger.isEnabledFor(logging.DEBUG):
logger.info("=== Active Campaigns Mapping ===")
from collections import defaultdict
game_campaign_map: dict[str, list[tuple[DropsCampaign, list[str]]]] = (
defaultdict(list)
)
for campaign in self.inventory:
if campaign.eligible and not campaign.finished:
logger.info(
"eligible Campaign: %s - %s", campaign.name, campaign.game.name
)
if campaign.can_earn_within(next_hour):
channel_names = []
if campaign.allowed_channels:
channel_names = [ch.name for ch in campaign.allowed_channels]
else:
channel_names = ["<directory>"]
game_campaign_map[campaign.game.name].append((campaign, channel_names))
for game_name in sorted(game_campaign_map.keys()):
logger.debug(f"Game: {game_name}")
for campaign, channel_list in game_campaign_map[game_name]:
status_info = f"{'ACTIVE' if campaign.active else 'UPCOMING'}"
ends_info = campaign.ends_at.astimezone().strftime("%Y-%m-%d %H:%M")
channel_info = (
f"{len(channel_list)} channels"
if channel_list[0] != "<directory>"
else "directory"
)
logger.debug(
f" └─ Campaign: {campaign.name} [{status_info}] (ends: {ends_info})"
)
logger.debug(f" Channels: {channel_info}")
if channel_list[0] != "<directory>" and len(channel_list) <= 10:
logger.debug(f" └─ {', '.join(channel_list)}")
elif channel_list[0] != "<directory>":
logger.debug(
f" └─ {', '.join(channel_list[:10])} ... (+{len(channel_list) - 10} more)"
)
logger.info("=== End Campaigns Mapping ===")
self._output_campaign_mapping(next_hour)
logger.info("Building wanted games list")
# Build wanted_games list preserving the order from games_to_watch
self.wanted_games = self._filter_wanted_campaigns(next_hour)
self.wanted_games = self._stream_selector.get_wanted_games(
self.settings, self.inventory
)
logger.info("Wanted games list built")
if self.wanted_games:
logger.info(
@@ -470,9 +429,6 @@ class Twitch:
watching_channel,
)
elif self._state is State.CHANNEL_SWITCH:
if self.settings.dump:
self.close()
continue
self.gui.status.update(_.t["gui"]["status"]["switching"])
# Determine the best channel to watch
@@ -700,3 +656,38 @@ class Twitch:
wanted_games.append(game)
break
return wanted_games
def _output_campaign_mapping(self, next_hour: datetime) -> None:
logger.info("=== Active Campaigns Mapping ===")
from collections import defaultdict
game_campaign_map: dict[str, list[tuple[DropsCampaign, list[str]]]] = defaultdict(list)
for campaign in self.inventory:
if campaign.eligible and not campaign.finished:
logger.info("eligible Campaign: %s - %s", campaign.name, campaign.game.name)
if campaign.can_earn_within(next_hour):
channel_names = []
if campaign.allowed_channels:
channel_names = [ch.name for ch in campaign.allowed_channels]
else:
channel_names = ["<directory>"]
game_campaign_map[campaign.game.name].append((campaign, channel_names))
for game_name in sorted(game_campaign_map.keys()):
logger.debug(f"Game: {game_name}")
for campaign, channel_list in game_campaign_map[game_name]:
status_info = f"{'ACTIVE' if campaign.active else 'UPCOMING'}"
ends_info = campaign.ends_at.astimezone().strftime("%Y-%m-%d %H:%M")
channel_info = (
f"{len(channel_list)} channels"
if channel_list[0] != "<directory>"
else "directory"
)
logger.debug(f" └─ Campaign: {campaign.name} [{status_info}] (ends: {ends_info})")
logger.debug(f" Channels: {channel_info}")
if channel_list[0] != "<directory>" and len(channel_list) <= 10:
logger.debug(f" └─ {', '.join(channel_list)}")
elif channel_list[0] != "<directory>":
logger.debug(
f" └─ {', '.join(channel_list[:10])} ... (+{len(channel_list) - 10} more)"
)
logger.info("=== End Campaigns Mapping ===")

View File

@@ -63,7 +63,9 @@ class WebsocketClosed(RequestException):
self.raw_message: str = raw_message
def __str__(self):
return f"Websocket has been closed. received: {self.received}, raw_message: {self.raw_message}"
return (
f"Websocket has been closed. received: {self.received}, raw_message: {self.raw_message}"
)
class LoginException(RequestException):

View File

@@ -8,9 +8,9 @@ from typing import TYPE_CHECKING
from dateutil.parser import isoparse
from src.config.constants import State, URLType
from src.config.constants import State
from src.models.channel import Channel
from src.models.drop import TimedDrop, remove_dimensions
from src.models.drop import TimedDrop
from src.models.game import Game
@@ -201,6 +201,4 @@ class DropsCampaign:
first_drop.display()
def has_wanted_unclaimed_benefits(self, allowed_benefits: dict[str, bool]) -> bool:
return any(
drop.has_wanted_unclaimed_benefits(allowed_benefits) for drop in self.drops
)
return any(drop.has_wanted_unclaimed_benefits(allowed_benefits) for drop in self.drops)

View File

@@ -139,9 +139,12 @@ class BaseDrop:
return delim.join(benefit.name for benefit in self.benefits)
def has_wanted_unclaimed_benefits(self, allowed_benefits: dict[str, bool]) -> bool:
return len(self.get_wanted_unclaimed_benefits(allowed_benefits)) > 0
def get_wanted_unclaimed_benefits(self, allowed_benefits: dict[str, bool]) -> list[str]:
if self.is_claimed:
return False
return any(benefit.is_wanted(allowed_benefits) for benefit in self.benefits)
return []
return [benefit.name for benefit in self.benefits if benefit.is_wanted(allowed_benefits)]
async def claim(self) -> bool:
result = await self._claim()

View File

@@ -0,0 +1,78 @@
from datetime import datetime, timedelta, timezone
from src.config.settings import Settings
from src.models.campaign import DropsCampaign
from src.models.game import Game
class StreamSelector:
def _get_wanted_game_tree(
self, settings: Settings, campaigns: list[DropsCampaign]
) -> list[dict]:
"""
Get the hierarchical tree of wanted items (Games -> Campaigns -> Drops -> Benefits).
Ignoring 'can earn within' time constraint.
"""
wanted_games = []
games_to_watch = settings.games_to_watch
mining_benefits = settings.mining_benefits
next_hour = datetime.now(timezone.utc) + timedelta(hours=1)
for game_name in games_to_watch:
wanted_campaigns = []
game_obj = None
game_name_lower = game_name.lower()
# Find all campaigns for this game
for campaign in campaigns:
if campaign.game.name.lower() != game_name_lower:
continue
if game_obj is None:
game_obj = campaign.game
if not campaign.can_earn_within(next_hour):
continue
wanted_drops = []
for drop in campaign.drops:
if drop.is_claimed:
continue
filtered_benefits = drop.get_wanted_unclaimed_benefits(mining_benefits)
if len(filtered_benefits) > 0:
wanted_drops.append({"name": drop.name, "benefits": filtered_benefits})
if len(wanted_drops) > 0:
wanted_campaigns.append(
{
"id": campaign.id,
"name": campaign.name,
"url": campaign.campaign_url,
"drops": wanted_drops,
}
)
if len(wanted_campaigns) > 0:
wanted_games.append(
{
"game_id": game_obj.id if game_obj else None,
"game_name": game_name,
"game_icon": game_obj.box_art_url if game_obj else None,
"game_obj": game_obj,
"campaigns": wanted_campaigns,
}
)
return wanted_games
def get_wanted_game_tree(
self, settings: Settings, campaigns: list[DropsCampaign]
) -> list[dict]:
return [
{**game, "game_obj": None} for game in self._get_wanted_game_tree(settings, campaigns)
]
def get_wanted_games(self, settings: Settings, campaigns: list[DropsCampaign]) -> list[Game]:
return [game["game_obj"] for game in self._get_wanted_game_tree(settings, campaigns)]

View File

@@ -212,9 +212,10 @@ async def update_settings(settings: SettingsUpdate):
@app.post("/api/settings/verify-proxy")
async def verify_proxy(request: ProxyVerifyRequest):
"""Verify proxy connectivity"""
import aiohttp
import time
import aiohttp
proxy_url = request.proxy.strip()
if not proxy_url:
return {"success": False, "message": "Proxy URL is empty"}
@@ -222,10 +223,10 @@ async def verify_proxy(request: ProxyVerifyRequest):
try:
start_time = time.time()
# Test connection to Twitch
async with aiohttp.ClientSession() as session:
async with session.get(
"https://www.twitch.tv", proxy=proxy_url, timeout=10
) as response:
async with (
aiohttp.ClientSession() as session,
session.get("https://www.twitch.tv", proxy=proxy_url, timeout=10) as response,
):
# Just checking if we can connect and get a response
if response.status < 500:
latency = round((time.time() - start_time) * 1000)
@@ -246,9 +247,10 @@ async def verify_proxy(request: ProxyVerifyRequest):
@app.get("/api/version")
async def get_version():
"""Get current application version and check for updates"""
from src.version import __version__
import aiohttp
from src.version import __version__
current_version = __version__
latest_version = None
update_available = False
@@ -256,15 +258,16 @@ async def get_version():
try:
# Check GitHub API for latest release
async with aiohttp.ClientSession() as session:
async with session.get(
"https://api.github.com/repos/rangermix/TwitchDropsMiner/releases/latest",
timeout=5
) as response:
async with (
aiohttp.ClientSession() as session,
session.get(
"https://api.github.com/repos/rangermix/TwitchDropsMiner/releases/latest", timeout=5
) as response,
):
if response.status == 200:
data = await response.json()
latest_version = data.get('tag_name', '').lstrip('v')
download_url = data.get('html_url')
latest_version = data.get("tag_name", "").lstrip("v")
download_url = data.get("html_url")
# Compare versions (simple string comparison works for semantic versioning)
if latest_version and latest_version > current_version:
@@ -276,7 +279,7 @@ async def get_version():
"current_version": current_version,
"latest_version": latest_version,
"update_available": update_available,
"download_url": download_url or "https://github.com/rangermix/TwitchDropsMiner/releases"
"download_url": download_url or "https://github.com/rangermix/TwitchDropsMiner/releases",
}
@@ -357,7 +360,7 @@ async def connect(sid, environ):
"login": gui_manager.login.get_status(),
"manual_mode": twitch_client.get_manual_mode_info(),
"current_drop": gui_manager.progress.get_current_drop(),
"wanted_items": gui_manager.get_wanted_tree(),
"wanted_items": gui_manager.get_wanted_game_tree(),
},
room=sid,
)
@@ -389,11 +392,7 @@ async def request_reload(sid):
async def get_wanted_items(sid):
"""Client requested wanted items list"""
if gui_manager:
await sio.emit(
"wanted_items_update",
gui_manager.get_wanted_tree(),
to=sid
)
await sio.emit("wanted_items_update", gui_manager.get_wanted_game_tree(), to=sid)
# Mount static files (CSS, JS, images)

View File

@@ -8,6 +8,7 @@ from typing import TYPE_CHECKING
from src.config import State
from src.models.game import Game
from src.services.stream_selector import StreamSelector
from src.web.managers.broadcaster import WebSocketBroadcaster
from src.web.managers.cache import ImageCache
from src.web.managers.campaigns import CampaignProgressManager
@@ -57,16 +58,14 @@ class WebGUIManager:
self.login = LoginFormManager(self._broadcaster, self)
# Callback to trigger game update when relevant settings change
on_settings_change = self._twitch.state_change(State.GAMES_UPDATE)
on_settings_change = self._twitch.get_change_state_callable(State.GAMES_UPDATE)
self.settings = SettingsManager(
self._broadcaster,
twitch.settings,
self.output,
on_change=on_settings_change
self._broadcaster, twitch.settings, self.output, on_change=on_settings_change
)
# Selected channel tracking (set by web client)
self._selected_channel_id: int | None = None
self._stream_selector = StreamSelector()
# Start message
logger.info("Web GUI Manager initialized")
@@ -82,14 +81,6 @@ class WebGUIManager:
"""
self._broadcaster.set_socketio(sio)
def save(self, *, force: bool = False):
"""Save GUI state and settings.
Args:
force: Force save even if no changes detected
"""
self._twitch.settings.save(force=force)
def print(self, message: str):
"""Print message to console output.
@@ -165,70 +156,14 @@ class WebGUIManager:
"""
asyncio.create_task(self._broadcaster.emit("manual_mode_update", manual_mode_info))
def get_wanted_tree(self) -> list[dict]:
"""
Get the hierarchical tree of wanted items (Games -> Campaigns -> Drops -> Benefits).
Ignoring 'can earn within' time constraint.
"""
wanted_tree = []
games_to_watch = self._twitch.settings.games_to_watch
mining_benefits = self._twitch.settings.mining_benefits
for game_name in games_to_watch:
game_matches = []
game_obj = None
# Find all campaigns for this game
for campaign in self._twitch.inventory:
if campaign.game.name.lower() != game_name.lower():
continue
if game_obj is None:
game_obj = campaign.game
wanted_drops = []
for drop in campaign.drops:
if drop.is_claimed:
continue
filtered_benefits = [
b.name for b in drop.benefits
if b.is_wanted(mining_benefits)
]
if filtered_benefits:
wanted_drops.append({
"name": drop.name,
"benefits": filtered_benefits
})
if wanted_drops:
game_matches.append({
"id": campaign.id,
"name": campaign.name,
"url": campaign.campaign_url,
"drops": wanted_drops
})
if game_matches:
# Use the game object from the first matching campaign to get metadata
# If no game object found (shouldn't happen if game_matches has items),
# we can't easily get the icon unless we search known games or similar.
# But here we are iterating games_to_watch, so we know the name at least.
icon_url = game_obj.box_art_url if game_obj else None
wanted_tree.append({
"game_id": game_obj.id if game_obj else None,
"game_name": game_name,
"game_icon": icon_url,
"campaigns": game_matches
})
return wanted_tree
def get_wanted_game_tree(self) -> list[dict]:
return self._stream_selector.get_wanted_game_tree(
self._twitch.settings, self._twitch.inventory
)
def broadcast_wanted_items(self):
"""Broadcast the list of wanted items to connected clients."""
tree = self.get_wanted_tree()
tree = self.get_wanted_game_tree()
asyncio.create_task(self._broadcaster.emit("wanted_items_update", tree))

View File

@@ -12,11 +12,12 @@ if TYPE_CHECKING:
from src.web.managers.broadcaster import WebSocketBroadcaster
import logging
logger = logging.getLogger("TwitchDrops")
class ConsoleOutputManager:
"""Manages console output display in the web interface.

View File

@@ -3,16 +3,17 @@
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING, Any, Callable
import logging
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
from src.i18n.translator import _
from src.models.game import Game
import logging
logger = logging.getLogger("TwitchDrops")
if TYPE_CHECKING:
from src.config.settings import Settings
from src.web.managers.broadcaster import WebSocketBroadcaster
@@ -45,17 +46,8 @@ class SettingsManager:
Returns:
Dictionary containing all user-configurable settings
"""
return {
"language": self._settings.language,
"dark_mode": self._settings.dark_mode,
"games_to_watch": list(self._settings.games_to_watch),
"games_available": self._available_games,
"proxy": str(self._settings.proxy),
"connection_quality": self._settings.connection_quality,
"minimum_refresh_interval_minutes": self._settings.minimum_refresh_interval_minutes,
"inventory_filters": self._settings.inventory_filters,
"mining_benefits": self._settings.mining_benefits,
}
settings = vars(self._settings).copy()
return settings
def get_languages(self) -> dict[str, Any]:
"""Get available languages and current selection.
@@ -79,70 +71,62 @@ class SettingsManager:
settings_data: Dictionary of settings to update
"""
should_trigger_update = False
if "games_to_watch" in settings_data:
self._settings.games_to_watch = settings_data["games_to_watch"]
self._log_change(f"Setting changed: games_to_watch = {len(self._settings.games_to_watch)} games")
should_trigger_update = True
if "dark_mode" in settings_data:
self._settings.dark_mode = settings_data["dark_mode"]
self._log_change(f"Setting changed: dark_mode = {self._settings.dark_mode}")
if "language" in settings_data:
language = settings_data["language"]
try:
_.set_language(language)
self._settings.language = language
self._log_change(f"Setting changed: language = {language}")
# Notify clients that translations need to be reloaded
asyncio.create_task(
self._broadcaster.emit("language_changed", {"language": language})
should_trigger_update |= self.check_and_update_setting(
"games_to_watch", settings_data.get("games_to_watch"), True
)
should_trigger_update |= self.check_and_update_setting(
"dark_mode", settings_data.get("dark_mode")
)
should_trigger_update |= self.check_and_update_setting(
"language", settings_data.get("language"), False, self._set_language
)
should_trigger_update |= self.check_and_update_setting(
"connection_quality", settings_data.get("connection_quality")
)
except ValueError as e:
# Invalid language, log warning
logger.warning(f"Invalid language '{language}': {e}")
if "connection_quality" in settings_data:
self._settings.connection_quality = settings_data["connection_quality"]
self._log_change(f"Setting changed: connection_quality = {self._settings.connection_quality}")
if "proxy" in settings_data:
from yarl import URL
proxy_value = settings_data["proxy"]
should_trigger_update |= self.check_and_update_setting(
"proxy",
str(proxy_value).strip() if proxy_value else "",
True,
lambda proxy: self._log_change("Proxy cleared") if proxy == "" else None,
)
should_trigger_update |= self.check_and_update_setting(
"minimum_refresh_interval_minutes",
settings_data.get("minimum_refresh_interval_minutes"),
)
should_trigger_update |= self.check_and_update_setting(
"inventory_filters", settings_data.get("inventory_filters")
)
should_trigger_update |= self.check_and_update_setting(
"mining_benefits", settings_data.get("mining_benefits"), True
)
proxy_str = settings_data["proxy"].strip()
if proxy_str:
if self._settings.proxy != URL(proxy_str):
self._settings.proxy = URL(proxy_str)
self._log_change(f"Proxy set to: {proxy_str}")
else:
if self._settings.proxy != URL():
self._settings.proxy = URL()
self._log_change("Proxy cleared")
if "minimum_refresh_interval_minutes" in settings_data:
self._settings.minimum_refresh_interval_minutes = settings_data[
"minimum_refresh_interval_minutes"
]
self._log_change(f"Setting changed: minimum_refresh_interval_minutes = {self._settings.minimum_refresh_interval_minutes}")
if "inventory_filters" in settings_data:
self._settings.inventory_filters = settings_data["inventory_filters"]
self._log_change("Setting changed: inventory_filters updated")
if "mining_benefits" in settings_data:
self._settings.mining_benefits = settings_data["mining_benefits"]
self._log_change(f"Setting changed: mining_benefits = {self._settings.mining_benefits}")
should_trigger_update = True
self._settings.alter()
# Persist settings to disk immediately
self._settings.save()
asyncio.create_task(self._broadcaster.emit("settings_updated", self.get_settings()))
if should_trigger_update and self._on_change:
self._on_change()
def check_and_update_setting(
self,
key: str,
new_value: Any,
should_trigger_update: bool = False,
action: Callable[[Any], None] = lambda x: None,
):
if new_value is None or getattr(self._settings, key, None) == new_value:
return False
setattr(self._settings, key, new_value)
self._log_change(f"Setting changed: {key} = {new_value}")
action(new_value)
return should_trigger_update
def _set_language(self, language: str):
_.set_language(language)
# Notify clients that translations need to be reloaded
asyncio.create_task(self._broadcaster.emit("language_changed", {"language": language}))
def set_games(self, games: set[Game]):
"""Update the list of available games for settings panel.

View File

@@ -3,7 +3,6 @@ from __future__ import annotations
import asyncio
import json
import logging
import traceback
from contextlib import suppress
from time import time
from typing import TYPE_CHECKING
@@ -17,11 +16,11 @@ from src.utils import (
CHARS_ASCII,
AwaitableValue,
ExponentialBackoff,
chunk,
create_nonce,
format_traceback,
json_minify,
task_wrapper,
chunk,
)
@@ -164,7 +163,9 @@ class Websocket:
session = await self._twitch.get_session()
backoff = ExponentialBackoff(**kwargs)
proxy = self._twitch.settings.proxy or None
ws_logger.info(f"Websocket[{self._idx}] connecting with {'no' if proxy is None else str(proxy)} proxy")
ws_logger.info(
f"Websocket[{self._idx}] connecting with {'no' if proxy is None else proxy} proxy"
)
for delay in backoff:
try:
async with session.ws_connect(ws_url, proxy=proxy) as websocket:
@@ -226,13 +227,19 @@ class Websocket:
)
elif self._closed.is_set():
# we closed it - exit
ws_logger.debug(f"Websocket[{self._idx}] to wss://pubsub-edge.twitch.tv/v1 stopped.")
ws_logger.debug(
f"Websocket[{self._idx}] to wss://pubsub-edge.twitch.tv/v1 stopped."
)
self.set_status(_.t["gui"]["websocket"]["disconnected"])
return
except Exception:
ws_logger.exception(f"Exception in Websocket[{self._idx}] to wss://pubsub-edge.twitch.tv/v1")
ws_logger.exception(
f"Exception in Websocket[{self._idx}] to wss://pubsub-edge.twitch.tv/v1"
)
self.set_status(_.t["gui"]["websocket"]["reconnecting"])
ws_logger.warning(f"Websocket[{self._idx}] to wss://pubsub-edge.twitch.tv/v1 reconnecting...")
ws_logger.warning(
f"Websocket[{self._idx}] to wss://pubsub-edge.twitch.tv/v1 reconnecting..."
)
async def _handle_ping(self):
"""Handle ping/pong heartbeat to keep connection alive."""

0
tests/__init__.py Normal file
View File

View File

@@ -1,8 +1,9 @@
import unittest
from datetime import datetime, timezone
from src.models.benefit import Benefit, BenefitType
from src.models.drop import TimedDrop
from src.models.benefit import Benefit
from src.models.campaign import DropsCampaign
from src.models.drop import TimedDrop
class TestBenefitFilter(unittest.TestCase):
def setUp(self):
@@ -12,7 +13,7 @@ class TestBenefitFilter(unittest.TestCase):
"id": "b1",
"name": "Test Badge",
"distributionType": "BADGE",
"imageAssetURL": "url"
"imageAssetURL": "url",
}
}
self.benefit_item_data = {
@@ -20,7 +21,7 @@ class TestBenefitFilter(unittest.TestCase):
"id": "b2",
"name": "Test Item",
"distributionType": "DIRECT_ENTITLEMENT",
"imageAssetURL": "url"
"imageAssetURL": "url",
}
}
@@ -46,6 +47,7 @@ class TestBenefitFilter(unittest.TestCase):
drop1.is_claimed = False
drop1.benefits = [self.badge]
drop1.has_wanted_unclaimed_benefits = TimedDrop.has_wanted_unclaimed_benefits.__get__(drop1)
drop1.get_wanted_unclaimed_benefits = TimedDrop.get_wanted_unclaimed_benefits.__get__(drop1)
allowed = {"BADGE": True, "DIRECT_ENTITLEMENT": False}
self.assertTrue(drop1.has_wanted_unclaimed_benefits(allowed))
@@ -58,6 +60,7 @@ class TestBenefitFilter(unittest.TestCase):
drop2.is_claimed = True
drop2.benefits = [self.badge]
drop2.has_wanted_unclaimed_benefits = TimedDrop.has_wanted_unclaimed_benefits.__get__(drop2)
drop2.get_wanted_unclaimed_benefits = TimedDrop.get_wanted_unclaimed_benefits.__get__(drop2)
self.assertFalse(drop2.has_wanted_unclaimed_benefits(allowed))
@@ -66,6 +69,7 @@ class TestBenefitFilter(unittest.TestCase):
drop3.is_claimed = False
drop3.benefits = [self.badge, self.item]
drop3.has_wanted_unclaimed_benefits = TimedDrop.has_wanted_unclaimed_benefits.__get__(drop3)
drop3.get_wanted_unclaimed_benefits = TimedDrop.get_wanted_unclaimed_benefits.__get__(drop3)
# Only want Item (which it has)
allowed_item = {"BADGE": False, "DIRECT_ENTITLEMENT": True}
@@ -83,7 +87,9 @@ class TestBenefitFilter(unittest.TestCase):
campaign.drops = [drop1, drop2]
# Bind method
campaign.has_wanted_unclaimed_benefits = DropsCampaign.has_wanted_unclaimed_benefits.__get__(campaign)
campaign.has_wanted_unclaimed_benefits = (
DropsCampaign.has_wanted_unclaimed_benefits.__get__(campaign)
)
allowed = {"BADGE": True}
# Since drop2 returns True, campaign should return True
@@ -93,5 +99,6 @@ class TestBenefitFilter(unittest.TestCase):
drop2.has_wanted_unclaimed_benefits.return_value = False
self.assertFalse(campaign.has_wanted_unclaimed_benefits(allowed))
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()

View File

@@ -1,57 +0,0 @@
import unittest
from unittest.mock import MagicMock
from datetime import datetime, timedelta, timezone
from src.core.client import Twitch
from src.models.campaign import DropsCampaign
from src.models.game import Game
class TestClientFilter(unittest.TestCase):
def setUp(self):
# Mock Settings
self.settings = MagicMock()
self.settings.games_to_watch = ["Game1", "Game2"]
self.settings.mining_benefits = {"BADGE": True, "DIRECT_ENTITLEMENT": True} # both allowed by default
# Mock Twitch Client
self.client = MagicMock(spec=Twitch)
self.client.settings = self.settings
self.client.inventory = []
# Bind the method we want to test
self.client._filter_wanted_campaigns = Twitch._filter_wanted_campaigns.__get__(self.client)
def test_filter_wanted_campaigns(self):
# Setup Campaigns
# Campaign 1: Game1, Can Earn, Has Wanted Benefits -> Should be selected
c1 = MagicMock(spec=DropsCampaign)
c1.game = Game({"id": 1, "name": "Game1"})
c1.can_earn_within.return_value = True
c1.has_wanted_unclaimed_benefits.return_value = True
# Campaign 2: Game2, Can Earn, NO Wanted Benefits -> Should NOT be selected
c2 = MagicMock(spec=DropsCampaign)
c2.game = Game({"id": 2, "name": "Game2"})
c2.can_earn_within.return_value = True
c2.has_wanted_unclaimed_benefits.return_value = False
# Campaign 3: Game3 (Not in games_to_watch), Can Earn, Has Benefits -> Should NOT be selected
c3 = MagicMock(spec=DropsCampaign)
c3.game = Game({"id": 3, "name": "Game3"})
c3.can_earn_within.return_value = True
c3.has_wanted_unclaimed_benefits.return_value = True
self.client.inventory = [c1, c2, c3]
next_hour = datetime.now(timezone.utc) + timedelta(hours=1)
wanted_games = self.client._filter_wanted_campaigns(next_hour)
self.assertEqual(len(wanted_games), 1)
self.assertEqual(wanted_games[0].name, "Game1")
# Verify calls
c1.has_wanted_unclaimed_benefits.assert_called_with(self.settings.mining_benefits)
c2.has_wanted_unclaimed_benefits.assert_called_with(self.settings.mining_benefits)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,37 +1,24 @@
import unittest
import asyncio
from unittest.mock import MagicMock
from yarl import URL
from unittest.mock import AsyncMock, MagicMock
# Mock the imports that depend on application structure if needed,
# or just import them if PYTHONPATH is set correctly.
# Assuming run from root, imports should work.
from src.config.settings import Settings
from src.web.managers.settings import SettingsManager
from src.web.managers.console import ConsoleOutputManager
from src.web.managers.settings import SettingsManager
class TestProxySettings(unittest.TestCase):
def setUp(self):
self.mock_broadcaster = MagicMock()
# Mock emit to be awaitable
f = asyncio.Future()
f.set_result(None)
self.mock_broadcaster.emit = MagicMock(return_value=f)
# Use AsyncMock for emit - it returns a coroutine without needing an event loop
self.mock_broadcaster.emit = AsyncMock()
# Create a pure mock Settings without wrapping a real instance
# This avoids file I/O during tests
self.mock_settings = MagicMock(spec=Settings)
# Setup properties
self.mock_settings.proxy = URL()
self.mock_settings.language = "en"
self.mock_settings.dark_mode = False
self.mock_settings.games_to_watch = []
self.mock_settings.connection_quality = 1
self.mock_settings.minimum_refresh_interval_minutes = 30
self.mock_console = MagicMock(spec=ConsoleOutputManager)
# Mock asyncio.create_task
self.create_task_patcher = unittest.mock.patch('asyncio.create_task')
self.create_task_patcher = unittest.mock.patch("asyncio.create_task")
self.mock_create_task = self.create_task_patcher.start()
def tearDown(self):
@@ -44,20 +31,22 @@ class TestProxySettings(unittest.TestCase):
proxy_url = "http://user:pass@localhost:8080"
manager.update_settings({"proxy": proxy_url})
self.assertEqual(self.mock_settings.proxy, URL(proxy_url))
self.mock_console.print.assert_called_with(f"Proxy set to: {proxy_url}")
self.assertEqual(self.mock_settings.proxy, proxy_url)
self.mock_console.print.assert_called_with(
"Setting changed: proxy = http://user:pass@localhost:8080"
)
# Test clearing a proxy
manager.update_settings({"proxy": ""})
self.assertEqual(self.mock_settings.proxy, URL())
self.assertEqual(self.mock_settings.proxy, "")
self.mock_console.print.assert_called_with("Proxy cleared")
def test_proxy_persistence_trigger(self):
manager = SettingsManager(self.mock_broadcaster, self.mock_settings, self.mock_console)
manager.update_settings({"proxy": "http://1.2.3.4:8080"})
self.mock_settings.alter.assert_called()
self.mock_settings.save.assert_called()
if __name__ == "__main__":
unittest.main()

View File

@@ -1,16 +1,17 @@
import asyncio
import unittest
from unittest.mock import MagicMock, AsyncMock, patch
from unittest.mock import AsyncMock, MagicMock
from src.config.settings import Settings
from src.web.app import SettingsUpdate
from src.web.managers.settings import SettingsManager
from src.config.settings import Settings
class TestSettingsAPI(unittest.IsolatedAsyncioTestCase):
def test_settings_update_model(self):
# Verify model accepts new fields
update_data = {
"inventory_filters": {"show_upcoming": True},
"mining_benefits": {"BADGE": True}
"mining_benefits": {"BADGE": True},
}
model = SettingsUpdate(**update_data)
self.assertEqual(model.inventory_filters, update_data["inventory_filters"])
@@ -20,25 +21,26 @@ class TestSettingsAPI(unittest.IsolatedAsyncioTestCase):
# Mock dependencies
mock_broadcaster = AsyncMock()
mock_settings = MagicMock(spec=Settings)
# Configure mock to satisfy get_settings() calls
mock_settings.language = "en"
mock_settings.dark_mode = False
# Initialize mock attributes with default values for comparison
mock_settings.inventory_filters = {}
mock_settings.mining_benefits = {}
mock_settings.games_to_watch = []
mock_settings.proxy = "http://proxy"
mock_settings.connection_quality = 1
mock_settings.minimum_refresh_interval_minutes = 30
mock_console = MagicMock()
mock_callback = MagicMock()
manager = SettingsManager(mock_broadcaster, mock_settings, mock_console, on_change=mock_callback)
manager = SettingsManager(
mock_broadcaster, mock_settings, mock_console, on_change=mock_callback
)
# 1. Update Inventory Filters (Should NOT trigger callback if not games/benefits)
# 1. Update Inventory Filters (does NOT trigger callback per implementation)
inv_filters = {"show_upcoming": False}
manager.update_settings({"inventory_filters": inv_filters})
mock_callback.assert_not_called()
mock_callback.assert_not_called() # inventory_filters has should_trigger_update=False
self.assertEqual(mock_settings.inventory_filters, inv_filters)
mock_console.print.assert_called_with("Setting changed: inventory_filters updated")
mock_console.print.assert_called_with(
"Setting changed: inventory_filters = {'show_upcoming': False}"
)
# 2. Update Mining Benefits (SHOULD trigger callback)
benefits = {"BADGE": False}
@@ -54,5 +56,5 @@ class TestSettingsAPI(unittest.IsolatedAsyncioTestCase):
mock_callback.assert_called_once()
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()

View File

@@ -1,9 +1,9 @@
import asyncio
import unittest
from unittest.mock import MagicMock, patch, AsyncMock
from src.web.app import verify_proxy
from src.web.app import ProxyVerifyRequest
from unittest.mock import AsyncMock, MagicMock, patch
from src.web.app import ProxyVerifyRequest, verify_proxy
class MockResponseContext:
def __init__(self, response_or_exc):
@@ -17,10 +17,11 @@ class MockResponseContext:
async def __aexit__(self, exc_type, exc, tb):
pass
class TestVerifyProxy(unittest.TestCase):
def setUp(self):
# Patch aiohttp.ClientSession
self.session_patcher = patch('aiohttp.ClientSession')
self.session_patcher = patch("aiohttp.ClientSession")
self.mock_session_cls = self.session_patcher.start()
# session object itself is not async, it has async methods/CMs
self.mock_session = MagicMock()
@@ -37,15 +38,17 @@ class TestVerifyProxy(unittest.TestCase):
mock_response.status = 200
# Configure get to return our custom context manager
self.mock_session.get.side_effect = lambda *args, **kwargs: MockResponseContext(mock_response)
self.mock_session.get.side_effect = lambda *args, **kwargs: MockResponseContext(
mock_response
)
request = ProxyVerifyRequest(proxy="http://valid-proxy:8080")
# Run async function
result = asyncio.run(verify_proxy(request))
self.assertTrue(result['success'])
self.assertIn("Connected!", result['message'])
self.assertTrue(result["success"])
self.assertIn("Connected!", result["message"])
self.assertIn("latency", result)
def test_verify_proxy_failure_status(self):
@@ -53,16 +56,18 @@ class TestVerifyProxy(unittest.TestCase):
mock_response = AsyncMock()
mock_response.status = 503
self.mock_session.get.side_effect = lambda *args, **kwargs: MockResponseContext(mock_response)
self.mock_session.get.side_effect = lambda *args, **kwargs: MockResponseContext(
mock_response
)
request = ProxyVerifyRequest(proxy="http://bad-proxy:8080")
result = asyncio.run(verify_proxy(request))
self.assertFalse(result['success'])
self.assertFalse(result["success"])
# The expected message in app.py is: f"Proxy reachable but returned {response.status}"
self.assertIn("Proxy reachable but returned 503", result['message'])
self.assertIn("Proxy reachable but returned 503", result["message"])
def test_verify_proxy_connection_error(self):
# Mock connection error
@@ -73,14 +78,15 @@ class TestVerifyProxy(unittest.TestCase):
result = asyncio.run(verify_proxy(request))
self.assertFalse(result['success'])
self.assertIn("Connection failed", result['message'])
self.assertFalse(result["success"])
self.assertIn("Connection failed", result["message"])
def test_verify_proxy_empty(self):
request = ProxyVerifyRequest(proxy="")
result = asyncio.run(verify_proxy(request))
self.assertFalse(result['success'])
self.assertEqual(result['message'], "Proxy URL is empty")
self.assertFalse(result["success"])
self.assertEqual(result["message"], "Proxy URL is empty")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,103 @@
import unittest
from unittest.mock import MagicMock
from src.models.campaign import DropsCampaign
from src.models.game import Game
from src.services.stream_selector import StreamSelector
class TestWantedGamesFilter(unittest.TestCase):
def setUp(self):
# Mock Settings
self.settings = MagicMock()
self.settings.games_to_watch = ["Game1", "Game2"]
self.settings.mining_benefits = {
"BADGE": True,
"DIRECT_ENTITLEMENT": True,
} # both allowed by default
def test_filter_wanted_campaigns(self):
# Setup Campaigns
# Campaign 1: Game1, Can Earn, Has Wanted Benefits -> Should be selected
c1 = MagicMock(spec=DropsCampaign)
c1.game = Game({"id": 1, "name": "Game1"})
c1.can_earn_within.return_value = True
c1.id = "123"
c1.name = "Test Campaign"
c1.campaign_url = "http://test.url"
d1 = MagicMock()
d1.name = "Test Drop"
d1.is_claimed = False
d1.get_wanted_unclaimed_benefits.return_value = ["Benefit1"]
c1.drops = [d1]
c1.has_wanted_unclaimed_benefits.side_effect = (
DropsCampaign.has_wanted_unclaimed_benefits.__get__(c1, DropsCampaign)
)
# Campaign 2: Game2, Can Earn, NO Wanted Benefits -> Should NOT be selected
c2 = MagicMock(spec=DropsCampaign)
c2.game = Game({"id": 2, "name": "Game2"})
c2.can_earn_within.return_value = True
d2 = MagicMock()
d2.is_claimed = False
d2.get_wanted_unclaimed_benefits.return_value = []
c2.drops = [d2]
c2.has_wanted_unclaimed_benefits.side_effect = (
DropsCampaign.has_wanted_unclaimed_benefits.__get__(c2, DropsCampaign)
)
# Campaign 3: Game3 (Not in games_to_watch), Can Earn, Has Benefits -> Should NOT be selected
c3 = MagicMock(spec=DropsCampaign)
c3.game = Game({"id": 3, "name": "Game3"})
c3.can_earn_within.return_value = True
d3 = MagicMock()
d3.is_claimed = False
d3.get_wanted_unclaimed_benefits.return_value = ["Benefit3"]
c3.drops = [d3]
c3.has_wanted_unclaimed_benefits.side_effect = (
DropsCampaign.has_wanted_unclaimed_benefits.__get__(c3, DropsCampaign)
)
# Campaign 4: Game1, Can Earn, Has Claimed Wanted Benefits -> Should NOT be selected
c4 = MagicMock(spec=DropsCampaign)
c4.game = Game({"id": 1, "name": "Game1"})
c4.can_earn_within.return_value = True
c4.id = "123"
c4.name = "Test Campaign"
c4.campaign_url = "http://test.url"
d4 = MagicMock()
d4.name = "Test Drop"
d4.is_claimed = True
d4.get_wanted_unclaimed_benefits.return_value = ["Benefit4"]
c4.drops = [d4]
c4.has_wanted_unclaimed_benefits.side_effect = (
DropsCampaign.has_wanted_unclaimed_benefits.__get__(c4, DropsCampaign)
)
# Campaign 5: Game1, Can Not Earn, Has Wanted Benefits -> Should NOT be selected
c5 = MagicMock(spec=DropsCampaign)
c5.game = Game({"id": 1, "name": "Game1"})
c5.can_earn_within.return_value = False
c5.id = "123"
c5.name = "Test Campaign"
c5.campaign_url = "http://test.url"
d5 = MagicMock()
d5.name = "Test Drop"
d5.is_claimed = False
d5.get_wanted_unclaimed_benefits.return_value = ["Benefit5"]
c5.drops = [d5]
c5.has_wanted_unclaimed_benefits.side_effect = (
DropsCampaign.has_wanted_unclaimed_benefits.__get__(c5, DropsCampaign)
)
inventory = [c1, c2, c3, c4, c5]
stream_selector = StreamSelector()
wanted_games = stream_selector.get_wanted_games(self.settings, inventory)
self.assertEqual(len(wanted_games), 1)
self.assertEqual(wanted_games[0].name, "Game1")
if __name__ == "__main__":
unittest.main()

View File

@@ -1,18 +1,20 @@
import unittest
from unittest.mock import MagicMock, Mock
from src.web.gui_manager import WebGUIManager
from src.core.client import Twitch, State
from src.models.campaign import DropsCampaign
from src.models.game import Game
from src.models.drop import TimedDrop
from unittest.mock import MagicMock
from src.core.client import Twitch
from src.models.benefit import Benefit, BenefitType
from src.models.campaign import DropsCampaign
from src.models.drop import TimedDrop
from src.models.game import Game
from src.web.gui_manager import WebGUIManager
class TestWantedItems(unittest.TestCase):
def setUp(self):
# Mock Twitch Client
self.twitch = MagicMock(spec=Twitch)
self.twitch.settings = MagicMock()
self.twitch.state_change.return_value = lambda: None
self.twitch.get_change_state_callable.return_value = lambda: None
# Mock dependencies created in __init__
# We can't easily mock internal creation of sub-managers without patching,
@@ -37,14 +39,18 @@ class TestWantedItems(unittest.TestCase):
c1.name = "Campaign1"
c1.campaign_url = "http://url1"
c1.game = Game({"id": 1, "name": "Game1", "boxArtURL": "http://img1"})
c1.can_earn_within.return_value = True
d1 = MagicMock(spec=TimedDrop)
d1.name = "Drop1"
d1.is_claimed = False
d1.get_wanted_unclaimed_benefits = TimedDrop.get_wanted_unclaimed_benefits.__get__(
d1, TimedDrop
)
b1 = MagicMock(spec=Benefit)
b1.name = "Badge1"
b1.type = BenefitType.BADGE
b1.is_wanted.return_value = True
b1.is_wanted = Benefit.is_wanted.__get__(b1, Benefit)
d1.benefits = [b1]
c1.drops = [d1]
@@ -54,14 +60,18 @@ class TestWantedItems(unittest.TestCase):
c2.name = "Campaign2"
c2.campaign_url = "http://url2"
c2.game = Game({"id": 2, "name": "Game2", "boxArtURL": "http://img2"})
c2.can_earn_within.return_value = True
d2 = MagicMock(spec=TimedDrop)
d2.name = "Drop2"
d2.is_claimed = False
d2.get_wanted_unclaimed_benefits = TimedDrop.get_wanted_unclaimed_benefits.__get__(
d2, TimedDrop
)
b2 = MagicMock(spec=Benefit)
b2.name = "Item1"
b2.type = BenefitType.DIRECT_ENTITLEMENT
b2.is_wanted.return_value = False
b2.is_wanted = Benefit.is_wanted.__get__(b2, Benefit)
d2.benefits = [b2]
c2.drops = [d2]
@@ -71,21 +81,47 @@ class TestWantedItems(unittest.TestCase):
c3.name = "Campaign3"
c3.campaign_url = "http://url3"
c3.game = Game({"id": 3, "name": "Game3", "boxArtURL": "http://img3"})
c3.can_earn_within.return_value = True
d3 = MagicMock(spec=TimedDrop)
d3.name = "Drop3"
d3.is_claimed = False
d3.get_wanted_unclaimed_benefits = TimedDrop.get_wanted_unclaimed_benefits.__get__(
d3, TimedDrop
)
b3 = MagicMock(spec=Benefit)
b3.name = "Badge2"
b3.type = BenefitType.BADGE
b3.is_wanted.return_value = True
b3.is_wanted = Benefit.is_wanted.__get__(b3, Benefit)
d3.benefits = [b3]
c3.drops = [d3]
self.twitch.inventory = [c1, c2, c3]
# Campaign 4: Game1, Drop with BADGE, can't earn (Wanted)
c4 = MagicMock(spec=DropsCampaign)
c4.id = "c4_id"
c4.name = "Campaign4"
c4.campaign_url = "http://url4"
c4.game = Game({"id": 1, "name": "Game1", "boxArtURL": "http://img1"})
c4.can_earn_within.return_value = False
d4 = MagicMock(spec=TimedDrop)
d4.name = "Drop4"
d4.is_claimed = False
d4.get_wanted_unclaimed_benefits = TimedDrop.get_wanted_unclaimed_benefits.__get__(
d4, TimedDrop
)
b4 = MagicMock(spec=Benefit)
b4.name = "Badge1"
b4.type = BenefitType.BADGE
b4.is_wanted = Benefit.is_wanted.__get__(b4, Benefit)
d4.benefits = [b4]
c4.drops = [d4]
self.twitch.inventory = [c1, c2, c3, c4]
# Execute
result = self.gui.get_wanted_tree()
result = self.gui.get_wanted_game_tree()
print(result)
# Verify
# Expected: Game1 only
@@ -125,10 +161,11 @@ class TestWantedItems(unittest.TestCase):
self.twitch.inventory = [c1]
# Execute
result = self.gui.get_wanted_tree()
result = self.gui.get_wanted_game_tree()
# Verify
self.assertEqual(len(result), 0)
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()