From f7b0a5f2cc8e671a621dca38b3a9ab9f69bb79dd Mon Sep 17 00:00:00 2001 From: DevilXD Date: Sun, 27 Mar 2022 14:03:11 +0200 Subject: [PATCH] Implement the inventory tab --- .gitignore | 1 + cache.py | 105 +++++++++++++++++++++++++ channel.py | 12 +-- constants.py | 5 +- gui.py | 218 ++++++++++++++++++++++++++++++++++++++++++--------- inventory.py | 21 ++++- main.py | 18 +++++ settings.py | 40 ++-------- twitch.py | 59 ++++++++------ utils.py | 51 +++++++++++- websocket.py | 7 +- 11 files changed, 426 insertions(+), 111 deletions(-) create mode 100644 cache.py diff --git a/.gitignore b/.gitignore index dc7926b..548ca0d 100644 --- a/.gitignore +++ b/.gitignore @@ -11,5 +11,6 @@ __pycache__ /*.tmp /Twitch Drops Miner # Dev files +/cache cookies.jar settings.json diff --git a/cache.py b/cache.py new file mode 100644 index 0000000..5dc7199 --- /dev/null +++ b/cache.py @@ -0,0 +1,105 @@ +from __future__ import annotations +import asyncio +from datetime import datetime, timedelta, timezone + +import io +from typing import Dict, TypedDict, NewType, TYPE_CHECKING + +from utils import json_load, json_save +from constants import URLType, CACHE_PATH, CACHE_DB + +import aiohttp +from PIL import Image as Image_module +from PIL.ImageTk import PhotoImage + + +if TYPE_CHECKING: + from gui import GUIManager + from PIL.Image import Image + from typing_extensions import TypeAlias + + +ImageHash = NewType("ImageHash", str) +ImageSize: TypeAlias = "tuple[int, int]" + + +class ExpiringHash(TypedDict): + hash: ImageHash + expires: datetime + + +Hashes = Dict[URLType, ExpiringHash] +default_database: Hashes = {} + + +class ImageCache: + LIFETIME = timedelta(days=7) + + def __init__(self, manager: GUIManager) -> None: + self._root = manager._root + CACHE_PATH.mkdir(parents=True, exist_ok=True) + self._hashes: Hashes = json_load(CACHE_DB, default_database) + self._images: dict[ImageHash, Image] = {} + self._photos: dict[tuple[ImageHash, ImageSize], PhotoImage] = {} + self._lock = asyncio.Lock() + # cleanup the URLs + hash_counts: dict[ImageHash, int] = {} + now = datetime.now(timezone.utc) + for url, hash_dict in list(self._hashes.items()): + img_hash = hash_dict["hash"] + if img_hash not in hash_counts: + hash_counts[img_hash] = 0 + if now >= hash_dict["expires"]: + del self._hashes[url] + else: + hash_counts[img_hash] += 1 + for img_hash, count in hash_counts.items(): + if count == 0: + # hashes come with an extension already + for file in CACHE_PATH.glob(img_hash): + file.unlink() + + def save(self) -> None: + json_save(CACHE_DB, self._hashes) + + def _new_expires(self) -> datetime: + return datetime.now(timezone.utc) + self.LIFETIME + + def _hash(self, image: Image) -> ImageHash: + pixel_data = list(image.resize((10, 10), Image_module.ANTIALIAS).convert('L').getdata()) + avg_pixel = sum(pixel_data) / len(pixel_data) + bits = ''.join('1' if px >= avg_pixel else '0' for px in pixel_data) + return ImageHash(f"{int(bits, 2):x}.png") + + async def get(self, url: URLType, size: ImageSize | None = None) -> PhotoImage: + async with self._lock: + image: Image | None = None + if url in self._hashes: + img_hash = self._hashes[url]["hash"] + self._hashes[url]["expires"] = self._new_expires() + if img_hash in self._images: + image = self._images[img_hash] + else: + try: + self._images[img_hash] = image = Image_module.open(CACHE_PATH / img_hash) + except FileNotFoundError: + pass + if image is None: + async with aiohttp.request("GET", url) as response: + image = Image_module.open(io.BytesIO(await response.read())) + img_hash = self._hash(image) + self._images[img_hash] = image + image.save(CACHE_PATH / img_hash) + self._hashes[url] = { + "hash": img_hash, + "expires": self._new_expires() + } + if size is None: + size = image.size + photo_key = (img_hash, size) + if photo_key in self._photos: + return self._photos[photo_key] + if image.size != size: + image = image.resize(size, Image_module.ADAPTIVE) + self._photos[photo_key] = photo = PhotoImage(master=self._root, image=image) + return photo diff --git a/channel.py b/channel.py index 375a027..c488689 100644 --- a/channel.py +++ b/channel.py @@ -10,7 +10,7 @@ from typing import Any, SupportsInt, TYPE_CHECKING from utils import Game, invalidate_cache from exceptions import MinerException, RequestException -from constants import BASE_URL, GQL_OPERATIONS, ONLINE_DELAY, DROPS_ENABLED_TAG +from constants import BASE_URL, GQL_OPERATIONS, ONLINE_DELAY, DROPS_ENABLED_TAG, URLType if TYPE_CHECKING: from twitch import Twitch @@ -79,7 +79,7 @@ class Channel: self.id: int = int(id) self._login: str = login self._display_name: str | None = display_name - self._spade_url: str | None = None + self._spade_url: URLType | None = None self.points: int | None = None self._stream: Stream | None = None self._pending_stream_up: asyncio.Task[Any] | None = None @@ -141,8 +141,8 @@ class Channel: return self._login @property - def url(self) -> str: - return f"{BASE_URL}/{self._login}" + def url(self) -> URLType: + return URLType(f"{BASE_URL}/{self._login}") @property def iid(self) -> str: @@ -206,7 +206,7 @@ class Channel: self._pending_stream_up = None self._gui_channels.remove(self) - async def get_spade_url(self) -> str: + async def get_spade_url(self) -> URLType: """ To get this monstrous thing, you have to walk a chain of requests. Streamer page (HTML) --parse-> Streamer Settings (JavaScript) --parse-> Spade URL @@ -228,7 +228,7 @@ class Channel: ) if not match: raise MinerException("Error while spade_url extraction: step #2") - return match.group(1) + return URLType(match.group(1)) async def get_stream(self) -> Stream | None: response: JsonType | None = await self._twitch.gql_request( diff --git a/constants.py b/constants.py index e9fc030..7f46a0b 100644 --- a/constants.py +++ b/constants.py @@ -6,7 +6,7 @@ from copy import copy from pathlib import Path from enum import Enum, auto from datetime import timedelta -from typing import Any, Dict, Literal, TYPE_CHECKING +from typing import Any, Dict, Literal, NewType, TYPE_CHECKING from typing_extensions import TypeAlias @@ -21,10 +21,13 @@ SELF_PATH = Path(sys.argv[0]) WORKING_DIR = SELF_PATH.absolute().parent # Other Paths LOG_PATH = Path(WORKING_DIR, "log.txt") +CACHE_PATH = Path(WORKING_DIR, "cache") +CACHE_DB = Path(CACHE_PATH, "mapping.json") COOKIES_PATH = Path(WORKING_DIR, "cookies.jar") SETTINGS_PATH = Path(WORKING_DIR, "settings.json") # Typing JsonType = Dict[str, Any] +URLType = NewType("URLType", str) TopicProcess: TypeAlias = "abc.Callable[[int, JsonType], Any]" # Values MAX_WEBSOCKETS = 8 diff --git a/gui.py b/gui.py index 1e8b535..da8b377 100644 --- a/gui.py +++ b/gui.py @@ -10,22 +10,19 @@ from collections import abc, namedtuple, OrderedDict from tkinter import Tk, ttk, StringVar, DoubleVar, IntVar from typing import Any, TypedDict, NoReturn, TYPE_CHECKING -try: - import pystray -except ModuleNotFoundError as exc: - raise ImportError("You have to run 'pip install pystray' first") from exc +import pystray +from PIL import Image as Image_module -from utils import resource_path +from cache import ImageCache from exceptions import ExitRequest +from utils import resource_path, Game from registry import RegistryKey, ValueType from constants import SELF_PATH, FORMATTER, WS_TOPICS_LIMIT, MAX_WEBSOCKETS, WINDOW_TITLE, State if TYPE_CHECKING: - from pathlib import Path - from twitch import Twitch from channel import Channel - from inventory import Game, TimedDrop + from inventory import DropsCampaign, TimedDrop digits = ceil(log10(WS_TOPICS_LIMIT)) @@ -33,15 +30,6 @@ WS_FONT = ("Courier New", 10) LARGE_FONT = (..., 12) -class _ICOImage: - def __init__(self, path: Path | str): - with open(path, 'rb') as file: - self._data = file.read() - - def save(self, file, format): - file.write(self._data) - - class _TKOutputHandler(logging.Handler): def __init__(self, output: GUIManager): super().__init__() @@ -609,8 +597,6 @@ class CampaignProgress: vars_campaign["percentage"].set( f"{campaign.progress:6.1%} ({campaign.claimed_drops}/{campaign.total_drops})" ) - # tray - self._manager.tray.display_progress(drop) self.stop_timer() if countdown: # restart our seconds update timer @@ -927,7 +913,7 @@ class TrayIcon: ) self.icon = pystray.Icon( "twitch_miner", - _ICOImage(resource_path("pickaxe.ico")), + Image_module.open(resource_path("pickaxe.ico")), self.get_title(drop), menu, ) @@ -963,7 +949,7 @@ class TrayIcon: asyncio.create_task(notifier()) - def display_progress(self, drop: TimedDrop): + def update_title(self, drop: TimedDrop): if self.icon is not None: self.icon.title = self.get_title(drop) @@ -984,6 +970,126 @@ class Notebook: self._nb.add(widget, text=name, **kwargs) +class InventoryOverview: + def __init__(self, manager: GUIManager, master: ttk.Widget): + self._cache = manager._cache + master.rowconfigure(0, weight=1) + master.columnconfigure(0, weight=1) + self._canvas = tk.Canvas(master, scrollregion=(0, 0, 0, 0)) + self._canvas.grid(column=0, row=0, sticky="nsew") + xscroll = ttk.Scrollbar(master, orient="horizontal", command=self._canvas.xview) + xscroll.grid(column=0, row=1, sticky="ew") + yscroll = ttk.Scrollbar(master, orient="vertical", command=self._canvas.yview) + yscroll.grid(column=1, row=0, sticky="ns") + self._canvas.configure(xscrollcommand=xscroll.set, yscrollcommand=yscroll.set) + self._canvas.bind("", self._canvas_update) + self._main_frame = ttk.Frame(self._canvas) + self._canvas.bind( + "", lambda e: self._canvas.bind_all("", self._on_mousewheel) + ) + self._canvas.bind("", lambda e: self._canvas.unbind_all("")) + self._canvas.create_window(0, 0, anchor="nw", window=self._main_frame) + self._campaigns: list[DropsCampaign] = [] + self._drops: dict[str, ttk.Label] = {} + + def _canvas_update(self, event: tk.Event[tk.Canvas]): + self._canvas.configure(scrollregion=self._canvas.bbox("all")) + + def _on_mousewheel(self, event: tk.Event[tk.Misc]): + delta = -1 if event.delta > 0 else 1 + state: int = event.state if isinstance(event.state, int) else 0 + if state & 1: + scroll = self._canvas.xview_scroll + else: + scroll = self._canvas.yview_scroll + scroll(delta, "units") + + async def add_campaign(self, campaign: DropsCampaign) -> None: + campaign_frame = ttk.Frame(self._main_frame, relief="ridge", borderwidth=1, padding=4) + campaign_frame.grid(column=0, row=len(self._campaigns), sticky="nsew", pady=3) + self._campaigns.append(campaign) + campaign_frame.rowconfigure(3, weight=1) + campaign_frame.columnconfigure(1, weight=1) + campaign_frame.columnconfigure(3, weight=10000) + ttk.Label( + campaign_frame, text=campaign.name, takefocus=False, width=45 + ).grid(column=0, row=0, columnspan=2, sticky="w") + if campaign.active: + status_text: str = "Active ✔" + status_color: tk._Color = "green" + elif campaign.upcoming: + status_text = "Upcoming ⏳" + status_color = "goldenrod" + else: + status_text = "Expired ❌" + status_color = "red" + ttk.Label( + campaign_frame, text=status_text, takefocus=False, foreground=status_color + ).grid(column=1, row=1, sticky="w", padx=4) + ttk.Label( + campaign_frame, + text=f"Ends: {campaign.ends_at.astimezone().replace(microsecond=0, tzinfo=None)}", + takefocus=False + ).grid(column=1, row=2, sticky="w", padx=4) + acl = campaign.allowed_channels + if acl: + if len(acl) <= 5: + allowed_text: str = '\n'.join(ch.name for ch in acl) + else: + allowed_text = '\n'.join(ch.name for ch in acl[:4]) + allowed_text += f"\nand {len(acl) - 4} more..." + else: + allowed_text = "All" + ttk.Label( + campaign_frame, text=f"Allowed channels:\n{allowed_text}", takefocus=False + ).grid(column=1, row=3, sticky="nw", padx=4) + campaign_image = await self._cache.get(campaign.image_url, size=(96, 128)) + ttk.Label(campaign_frame, image=campaign_image).grid(column=0, row=1, rowspan=3) + ttk.Separator( + campaign_frame, orient="vertical", takefocus=False + ).grid(column=2, row=0, rowspan=4, sticky="ns") + drops_row = ttk.Frame(campaign_frame) + drops_row.grid(column=3, row=0, rowspan=4, sticky="nsew", padx=4) + drops_row.rowconfigure(0, weight=1) + for i, drop in enumerate(campaign.drops): + drop_frame = ttk.Frame(drops_row, relief="ridge", borderwidth=1, padding=5) + drop_frame.grid(column=i, row=0, padx=4) + drop_image = await self._cache.get(drop.image_url, (80, 80)) + ttk.Label( + drop_frame, text=drop.rewards_text(), image=drop_image, compound="bottom" + ).grid(column=0, row=0) + progress_text, progress_color = self.get_progress(drop) + self._drops[drop.id] = label = ttk.Label( + drop_frame, text=progress_text, foreground=progress_color + ) + label.grid(column=0, row=1) + + def clear(self) -> None: + for child in self._main_frame.winfo_children(): + child.destroy() + self._campaigns.clear() + + def get_progress(self, drop: TimedDrop) -> tuple[str, tk._Color]: + progress_text: str = '' + progress_color: tk._Color = '' + if drop.is_claimed: + progress_text = "Claimed ✔" + progress_color = "green" + elif drop.can_claim: + progress_text = "Ready to claim ⏳" + progress_color = "goldenrod" + elif drop.preconditions: + progress_text = f"{drop.progress:3.1%} of {drop.required_minutes} minutes" + return (progress_text, progress_color) + + def update_drop(self, drop: TimedDrop) -> None: + label = self._drops.get(drop.id) + if label is None: + return + progress_text, progress_color = self.get_progress(drop) + label.config(text=progress_text, foreground=progress_color) + + class _SettingsVars(TypedDict): tray: IntVar autostart: IntVar @@ -1106,7 +1212,7 @@ class SettingsPanel: self._settings.autostart_tray = tray if enabled: # NOTE: we need double quotes in case the path contains spaces - self_path = f'"{SELF_PATH.absolute().resolve()!s}"' + self_path = f'"{SELF_PATH.resolve()!s}"' if tray: self_path += " --tray" with RegistryKey("HKCU/Software/Microsoft/Windows/CurrentVersion/Run") as key: @@ -1227,11 +1333,13 @@ class GUIManager: self._root = root = Tk() # withdraw immediately to prevent the window from flashing self._root.withdraw() - root.resizable(False, True) + # root.resizable(False, True) root.iconbitmap(resource_path("pickaxe.ico")) # window icon root.title(WINDOW_TITLE) # window title root.protocol("WM_DELETE_WINDOW", self.close) # hook the X window closing button root.bind_all("", self.unfocus) # pressing ESC unfocuses selection + # Image cache for displaying images + self._cache = ImageCache(self) # style adjustements self._style = style = ttk.Style(root) @@ -1283,13 +1391,17 @@ class GUIManager: self.games = GameSelector(self, main_frame) self.output = ConsoleOutput(self, main_frame) self.channels = ChannelList(self, main_frame) + # Inventory tab + inv_frame = ttk.Frame(root_frame, padding=8) + self.inv = InventoryOverview(self, inv_frame) + self.tabs.add_tab(inv_frame, name="Inventory") # Settings tab settings_frame = ttk.Frame(root_frame, padding=8) self.settings = SettingsPanel(self, settings_frame) self.tabs.add_tab(settings_frame, name="Settings") - # clamp minimum window height (update first, so that geometry calculates the size) + # clamp minimum window size (update geometry first) root.update_idletasks() - root.minsize(width=0, height=root.winfo_reqheight()) + root.minsize(width=root.winfo_reqwidth(), height=root.winfo_reqheight()) # register logging handler self._handler = _TKOutputHandler(self) self._handler.setFormatter(FORMATTER) @@ -1324,6 +1436,13 @@ class GUIManager: def close_requested(self) -> bool: return self._closed.is_set() + async def wait_until_closed(self): + # wait until the user closes the window + await self._closed.wait() + + def prevent_close(self): + self._closed.clear() + def start(self): if self._poll_task is None: self._poll_task = asyncio.create_task(self._poll()) @@ -1374,16 +1493,20 @@ class GUIManager: self.channels.clear_selection() self.settings.clear_selection() + # these are here to interface with underlaying GUI components + def save(self) -> None: + self._cache.save() + def set_games(self, games: abc.Iterable[Game]) -> None: self.games.set_games(games) self.settings.set_games(games) - def prevent_close(self): - self._closed.clear() - - async def wait_until_closed(self): - # wait until the user closes the window - await self._closed.wait() + def display_drop( + self, drop: TimedDrop, *, countdown: bool = True, subone: bool = False + ) -> None: + self.progress.display(drop, countdown=countdown, subone=subone) # main tab + self.inv.update_drop(drop) # inventory + self.tray.update_title(drop) # tray def print(self, *args, **kwargs): # print to our custom output @@ -1393,6 +1516,7 @@ class GUIManager: if __name__ == "__main__": # Everything below is for debug purposes only from types import SimpleNamespace + from datetime import datetime, timedelta, timezone class StrNamespace(SimpleNamespace): def __str__(self): @@ -1453,6 +1577,12 @@ if __name__ == "__main__": id="0", campaign=SimpleNamespace( name=campaign_name, + id="campaign", + active=False, + upcoming=True, + image_url="https://static-cdn.jtvnw.net/ttv-boxart/460630-285x380.jpg", + allowed_channels=[], + ends_at=datetime.now(timezone.utc) + timedelta(days=5, hours=6, minutes=23), timed_drops={}, claimed_drops=cd, total_drops=td, @@ -1460,6 +1590,12 @@ if __name__ == "__main__": progress=(cd * tm + cm) / (td * tm), remaining_minutes=(td - cd) * tm - cm, ), + image_url=( + "https://static-cdn.jtvnw.net/twitch-drops-assets-prod/" + "BENEFIT-81ab5665-b2f4-4179-96e6-74da5a82da28.jpeg" + ), + is_claimed=False, + preconditions=True, rewards_text=lambda: rewards, progress=cm/tm, current_minutes=cm, @@ -1467,6 +1603,7 @@ if __name__ == "__main__": remaining_minutes=tm-cm, ) mock.campaign.timed_drops["0"] = mock + mock.campaign.drops = mock.campaign.timed_drops.values() return mock async def main(exit_event: asyncio.Event): @@ -1539,9 +1676,11 @@ if __name__ == "__main__": # gui.tray.minimize() await asyncio.sleep(1) gui.tray.notify("Bounty Coins (3/7)", "Mined Drop") - # Drop progress + # Inventory overview drop = create_drop("Wardrobe Cleaning", "Fancy Pants", 2, 7, 239, 240) - gui.progress.display(drop) + await gui.inv.add_campaign(drop.campaign) + # Drop progress + gui.display_drop(drop) await asyncio.sleep(63) drop.current_minutes = 240 drop.remaining_minutes = 0 @@ -1551,14 +1690,21 @@ if __name__ == "__main__": campaign.progress = 3/7 campaign.claimed_drops = 3 campaign.remaining_drops = 4 - gui.progress.display(drop) + gui.display_drop(drop) await asyncio.sleep(10) drop.current_minutes = 0 drop.remaining_minutes = 240 drop.progress = 0.0 - gui.progress.display(drop) + gui.display_drop(drop) + + def main_exit(task: asyncio.Task[None]) -> None: + if task.exception() is not None: + exit_event.set() loop = asyncio.get_event_loop() exit_event = asyncio.Event() - loop.create_task(main(exit_event)) + main_task = loop.create_task(main(exit_event)) + main_task.add_done_callback(main_exit) loop.run_until_complete(exit_event.wait()) + if main_task.done(): + loop.run_until_complete(main_task) diff --git a/inventory.py b/inventory.py index 7ea5a18..cb2bcd1 100644 --- a/inventory.py +++ b/inventory.py @@ -1,19 +1,27 @@ from __future__ import annotations +import re from typing import TYPE_CHECKING from functools import cached_property from datetime import datetime, timezone from channel import Channel -from constants import GQL_OPERATIONS +from constants import GQL_OPERATIONS, URLType from utils import timestamp, invalidate_cache, Game if TYPE_CHECKING: from collections import abc from twitch import Twitch + from gui import GUIManager from constants import JsonType - from gui import CampaignProgress + + +DIMS_PATTERN = re.compile(r'-\d+x\d+(?=\.(?:jpg|png|gif)$)', re.I) + + +def remove_dimensions(url: URLType) -> URLType: + return URLType(DIMS_PATTERN.sub('', url)) class BaseDrop: @@ -25,6 +33,8 @@ class BaseDrop: self.name: str = data["name"] self.campaign: DropsCampaign = campaign self.rewards: list[str] = [b["benefit"]["name"] for b in data["benefitEdges"]] + # we use the first benefit's image specifically here + self.image_url: URLType = data["benefitEdges"][0]["benefit"]["imageAssetURL"] self.starts_at: datetime = timestamp(data["startAt"]) self.ends_at: datetime = timestamp(data["endAt"]) self.claim_id: str | None = None @@ -134,7 +144,7 @@ class TimedDrop(BaseDrop): self, campaign: DropsCampaign, data: JsonType, claimed_benefits: dict[str, datetime] ): super().__init__(campaign, data, claimed_benefits) - self._gui_progress: CampaignProgress = self._twitch.gui.progress + self._manager: GUIManager = self._twitch.gui self.current_minutes: int = 0 if "self" in data: self.current_minutes = data["self"]["currentMinutesWatched"] @@ -179,7 +189,7 @@ class TimedDrop(BaseDrop): self._on_minutes_changed() def display(self, *, countdown: bool = True, subone: bool = False): - self._gui_progress.display(self, countdown=countdown, subone=subone) + self._manager.display_drop(self, countdown=countdown, subone=subone) def bump_minutes(self): if self.current_minutes < self.required_minutes: @@ -193,6 +203,9 @@ class DropsCampaign: self.id: str = data["id"] self.name: str = data["name"] self.game: Game = Game(data["game"]) + # campaign's image actually comes from the game object + # we use regex to get rid of the dimensions part (ex. ".../game_id-285x380.jpg") + self.image_url: URLType = remove_dimensions(data["game"]["boxArtURL"]) self.starts_at: datetime = timestamp(data["startAt"]) self.ends_at: datetime = timestamp(data["endAt"]) allowed: JsonType = data["allow"] diff --git a/main.py b/main.py index 6e34922..a58d567 100644 --- a/main.py +++ b/main.py @@ -12,6 +12,24 @@ import tkinter as tk from tkinter import messagebox from typing import IO, NoReturn +# pre-import 3rd party libraries, handling missing ones with an exception +try: + import aiohttp # noqa +except ModuleNotFoundError as exc: + raise ImportError("You have to run 'pip install aiohttp' first") from exc +try: + import websockets # noqa +except ModuleNotFoundError as exc: + raise ImportError("You have to run 'pip install websockets' first") from exc +try: + import pystray # noqa +except ModuleNotFoundError as exc: + raise ImportError("You have to run 'pip install pystray' first") from exc +try: + import PIL # noqa +except ModuleNotFoundError as exc: + raise ImportError("You have to run 'pip install pillow' first") from exc + from twitch import Twitch from settings import Settings from version import __version__ diff --git a/settings.py b/settings.py index 157f3ad..e5bda72 100644 --- a/settings.py +++ b/settings.py @@ -1,10 +1,9 @@ from __future__ import annotations -import json -from enum import Enum from typing import Any, TypedDict, TYPE_CHECKING -from constants import JsonType, SETTINGS_PATH +from constants import SETTINGS_PATH +from utils import json_load, json_save if TYPE_CHECKING: from main import ParsedArgs @@ -18,9 +17,6 @@ class SettingsFile(TypedDict): autostart_tray: bool -serialize_env: dict[str, type] = { - "set": set, -} default_settings: SettingsFile = { "priority": [], "exclude": set(), @@ -30,26 +26,6 @@ default_settings: SettingsFile = { } -def serialize(obj: Any) -> Any: - if isinstance(obj, (set, Enum)): - if isinstance(obj, set): - d = list(obj) - elif isinstance(obj, Enum): - d = obj.value - return { - "__type": type(obj).__name__, - "data": d, - } - raise TypeError(obj) - - -def deserialize(obj: JsonType) -> Any: - if "__type" in obj: - t = eval(obj["__type"], None, serialize_env) - return t(obj["data"]) - return obj - - class Settings: # from args log: bool @@ -67,10 +43,7 @@ class Settings: autostart_tray: bool def __init__(self, args: ParsedArgs): - self._settings: SettingsFile = default_settings.copy() - if SETTINGS_PATH.exists(): - with open(SETTINGS_PATH, 'r') as file: - self._settings.update(json.load(file, object_hook=deserialize)) + self._settings: SettingsFile = json_load(SETTINGS_PATH, default_settings) self._args: ParsedArgs = args # default logic of reading settings is to check args first, then the settings file @@ -78,7 +51,7 @@ class Settings: if hasattr(self._args, name): return getattr(self._args, name) elif name in self._settings: - return self._settings[name] # type: ignore[misc] + return self._settings[name] # type: ignore[literal-required] return getattr(super(), name) def __setattr__(self, name: str, value: Any, /) -> None: @@ -86,7 +59,7 @@ class Settings: # passthrough return super().__setattr__(name, value) elif name in self._settings: - self._settings[name] = value # type: ignore[misc] + self._settings[name] = value # type: ignore[literal-required] return raise TypeError(f"{name} is missing a custom setter") @@ -94,5 +67,4 @@ class Settings: raise RuntimeError("settings can't be deleted") def save(self) -> None: - with open(SETTINGS_PATH, 'w') as file: - json.dump(self._settings, file, default=serialize, sort_keys=True, indent=4) + json_save(SETTINGS_PATH, self._settings) diff --git a/twitch.py b/twitch.py index f1a10de..d742435 100644 --- a/twitch.py +++ b/twitch.py @@ -10,10 +10,7 @@ from collections import abc, OrderedDict from contextlib import suppress, asynccontextmanager from typing import Final, NoReturn, cast, TYPE_CHECKING -try: - import aiohttp -except ModuleNotFoundError as exc: - raise ImportError("You have to run 'pip install aiohttp' first") from exc +import aiohttp from gui import GUIManager from channel import Channel @@ -102,11 +99,13 @@ class Twitch: if self._session is not None: cookie_jar = cast(aiohttp.CookieJar, self._session.cookie_jar) cookie_jar.save(COOKIES_PATH) - await self._session.close() + session = self._session self._session = None + await session.close() await self.websocket.stop() - # save settings + # save important files self.settings.save() + self.gui.save() # wait at least one full second + whatever it takes to complete the closing # this allows aiohttp to safely close the session await asyncio.sleep(start_time + 0.5 - time()) @@ -834,28 +833,27 @@ class Twitch: return response_json async def fetch_campaign( - self, campaign_id: str, claimed_benefits: dict[str, datetime] + self, + campaign_id: str, + available_data: JsonType, + claimed_benefits: dict[str, datetime], ) -> DropsCampaign: response = await self.gql_request( GQL_OPERATIONS["CampaignDetails"].with_variables( {"channelLogin": str(self._user_id), "dropID": campaign_id} ) ) - return DropsCampaign(self, response["data"]["user"]["dropCampaign"], claimed_benefits) + campaign_data: JsonType = response["data"]["user"]["dropCampaign"] + # NOTE: we use available_data to add a couple of fields missing from the existing details, + # most notably: game boxart + campaign_data["game"]["boxArtURL"] = available_data["game"]["boxArtURL"] + return DropsCampaign(self, campaign_data, claimed_benefits) async def fetch_inventory(self) -> None: - # fetch all available campaign IDs, that are currently ACTIVE and account is connected - response = await self.gql_request(GQL_OPERATIONS["Campaigns"]) - data = response["data"]["currentUser"]["dropCampaigns"] or [] - applicable_statuses = ("ACTIVE", "UPCOMING") - available_campaigns: set[str] = set( - c["id"] for c in data - if c["status"] in applicable_statuses and c["self"]["isAccountConnected"] - ) # fetch in-progress campaigns (inventory) response = await self.gql_request(GQL_OPERATIONS["Inventory"]) - inventory = response["data"]["currentUser"]["inventory"] - ongoing_campaigns = inventory["dropCampaignsInProgress"] or [] + inventory: JsonType = response["data"]["currentUser"]["inventory"] + ongoing_campaigns: list[JsonType] = inventory["dropCampaignsInProgress"] or [] # this contains claimed benefit edge IDs, not drop IDs claimed_benefits: dict[str, datetime] = { b["id"]: timestamp(b["lastAwardedAt"]) for b in inventory["gameEventDrops"] @@ -864,22 +862,35 @@ class Twitch: DropsCampaign(self, campaign_data, claimed_benefits) for campaign_data in ongoing_campaigns ] - # filter out in-progress campaigns from all available campaigns, - # since we already have all information needed for them - for campaign in campaigns: - available_campaigns.discard(campaign.id) + # fetch all available campaigns data + response = await self.gql_request(GQL_OPERATIONS["Campaigns"]) + available_list: list[JsonType] = response["data"]["currentUser"]["dropCampaigns"] or [] + applicable_statuses = ("ACTIVE", "UPCOMING") + existing_campaigns: set[str] = set(c.id for c in campaigns) + available_campaigns: dict[str, JsonType] = { + c["id"]: c + for c in available_list + if ( + c["status"] in applicable_statuses # that are currently ACTIVE + and c["self"]["isAccountConnected"] # and account is connected + and c["id"] not in existing_campaigns # and they aren't in the inventory already + ) + } # add campaigns that remained, that can be earned but are not in-progress yet - for campaign_id in available_campaigns: - campaign = await self.fetch_campaign(campaign_id, claimed_benefits) + for campaign_id, available_data in available_campaigns.items(): + campaign = await self.fetch_campaign(campaign_id, available_data, claimed_benefits) if any(drop.can_earn() for drop in campaign.drops): campaigns.append(campaign) campaigns.sort(key=lambda c: c.ends_at) self.inventory.clear() + self.gui.inv.clear() for campaign in campaigns: game = campaign.game if game not in self.inventory: self.inventory[game] = [] + await self.gui.inv.add_campaign(campaign) self.inventory[game].append(campaign) + self.gui.save() def get_drop(self, drop_id: str) -> TimedDrop | None: """ diff --git a/utils.py b/utils.py index 5eb274f..d7fa8ac 100644 --- a/utils.py +++ b/utils.py @@ -1,16 +1,20 @@ from __future__ import annotations import sys +import json import random import string import asyncio import logging +from enum import Enum from pathlib import Path from functools import wraps from contextlib import suppress from datetime import datetime, timezone from collections import abc, OrderedDict -from typing import Any, Literal, MutableSet, Generic, TypeVar, cast, TYPE_CHECKING +from typing import ( + Any, Literal, MutableSet, Callable, Generic, Mapping, TypeVar, cast, TYPE_CHECKING +) from constants import WORKING_DIR, JsonType @@ -26,8 +30,13 @@ else: _T = TypeVar("_T") # type _D = TypeVar("_D") # default _P = ParamSpec("_P") # params +_JSON_T = TypeVar("_JSON_T", bound=Mapping[Any, Any]) logger = logging.getLogger("TwitchDrops") NONCE_CHARS = string.ascii_letters + string.digits +serialize_env: dict[str, Callable[[Any], object]] = { + "set": set, + "datetime": lambda d: datetime.fromtimestamp(d, timezone.utc), +} def resource_path(relative_path: Path | str) -> Path: @@ -79,6 +88,46 @@ def invalidate_cache(instance, *attrnames): delattr(instance, name) +def _serialize(obj: Any) -> Any: + # convert data + d: int | str | float | list[Any] | JsonType + if isinstance(obj, set): + d = list(obj) + elif isinstance(obj, Enum): + d = obj.value + elif isinstance(obj, datetime): + if obj.tzinfo is None: + # assume naive objects are UTC + obj = obj.replace(tzinfo=timezone.utc) + d = obj.timestamp() + else: + raise TypeError(obj) + # store with type + return { + "__type": type(obj).__name__, + "data": d, + } + + +def _deserialize(obj: JsonType) -> Any: + if "__type" in obj: + return serialize_env[obj["__type"]](obj["data"]) + return obj + + +def json_load(path: Path, defaults: _JSON_T) -> _JSON_T: + loaded: JsonType = dict(defaults) + if path.exists(): + with open(path, 'r') as file: + loaded.update(json.load(file, object_hook=_deserialize)) + return cast(_JSON_T, loaded) + + +def json_save(path: Path, contents: Mapping[Any, Any]) -> None: + with open(path, 'w') as file: + json.dump(contents, file, default=_serialize, sort_keys=True, indent=4) + + class OrderedSet(MutableSet[_T]): """ Implementation of a set that preserves insertion order, diff --git a/websocket.py b/websocket.py index 8eeb029..ea73c8d 100644 --- a/websocket.py +++ b/websocket.py @@ -7,11 +7,8 @@ from time import time from contextlib import suppress from typing import TYPE_CHECKING -try: - from websockets.exceptions import ConnectionClosed, ConnectionClosedOK - from websockets.client import WebSocketClientProtocol, connect as websocket_connect -except ModuleNotFoundError as exc: - raise ImportError("You have to run 'pip install websockets' first") from exc +from websockets.exceptions import ConnectionClosed, ConnectionClosedOK +from websockets.client import WebSocketClientProtocol, connect as websocket_connect from exceptions import MinerException from utils import task_wrapper, create_nonce, AwaitableValue