Implement the inventory tab

This commit is contained in:
DevilXD
2022-03-27 14:03:11 +02:00
parent 7beb10df8d
commit f7b0a5f2cc
11 changed files with 426 additions and 111 deletions

1
.gitignore vendored
View File

@@ -11,5 +11,6 @@ __pycache__
/*.tmp
/Twitch Drops Miner
# Dev files
/cache
cookies.jar
settings.json

105
cache.py Normal file
View File

@@ -0,0 +1,105 @@
from __future__ import annotations
import asyncio
from datetime import datetime, timedelta, timezone
import io
from typing import Dict, TypedDict, NewType, TYPE_CHECKING
from utils import json_load, json_save
from constants import URLType, CACHE_PATH, CACHE_DB
import aiohttp
from PIL import Image as Image_module
from PIL.ImageTk import PhotoImage
if TYPE_CHECKING:
from gui import GUIManager
from PIL.Image import Image
from typing_extensions import TypeAlias
ImageHash = NewType("ImageHash", str)
ImageSize: TypeAlias = "tuple[int, int]"
class ExpiringHash(TypedDict):
hash: ImageHash
expires: datetime
Hashes = Dict[URLType, ExpiringHash]
default_database: Hashes = {}
class ImageCache:
LIFETIME = timedelta(days=7)
def __init__(self, manager: GUIManager) -> None:
self._root = manager._root
CACHE_PATH.mkdir(parents=True, exist_ok=True)
self._hashes: Hashes = json_load(CACHE_DB, default_database)
self._images: dict[ImageHash, Image] = {}
self._photos: dict[tuple[ImageHash, ImageSize], PhotoImage] = {}
self._lock = asyncio.Lock()
# cleanup the URLs
hash_counts: dict[ImageHash, int] = {}
now = datetime.now(timezone.utc)
for url, hash_dict in list(self._hashes.items()):
img_hash = hash_dict["hash"]
if img_hash not in hash_counts:
hash_counts[img_hash] = 0
if now >= hash_dict["expires"]:
del self._hashes[url]
else:
hash_counts[img_hash] += 1
for img_hash, count in hash_counts.items():
if count == 0:
# hashes come with an extension already
for file in CACHE_PATH.glob(img_hash):
file.unlink()
def save(self) -> None:
json_save(CACHE_DB, self._hashes)
def _new_expires(self) -> datetime:
return datetime.now(timezone.utc) + self.LIFETIME
def _hash(self, image: Image) -> ImageHash:
pixel_data = list(image.resize((10, 10), Image_module.ANTIALIAS).convert('L').getdata())
avg_pixel = sum(pixel_data) / len(pixel_data)
bits = ''.join('1' if px >= avg_pixel else '0' for px in pixel_data)
return ImageHash(f"{int(bits, 2):x}.png")
async def get(self, url: URLType, size: ImageSize | None = None) -> PhotoImage:
async with self._lock:
image: Image | None = None
if url in self._hashes:
img_hash = self._hashes[url]["hash"]
self._hashes[url]["expires"] = self._new_expires()
if img_hash in self._images:
image = self._images[img_hash]
else:
try:
self._images[img_hash] = image = Image_module.open(CACHE_PATH / img_hash)
except FileNotFoundError:
pass
if image is None:
async with aiohttp.request("GET", url) as response:
image = Image_module.open(io.BytesIO(await response.read()))
img_hash = self._hash(image)
self._images[img_hash] = image
image.save(CACHE_PATH / img_hash)
self._hashes[url] = {
"hash": img_hash,
"expires": self._new_expires()
}
if size is None:
size = image.size
photo_key = (img_hash, size)
if photo_key in self._photos:
return self._photos[photo_key]
if image.size != size:
image = image.resize(size, Image_module.ADAPTIVE)
self._photos[photo_key] = photo = PhotoImage(master=self._root, image=image)
return photo

View File

@@ -10,7 +10,7 @@ from typing import Any, SupportsInt, TYPE_CHECKING
from utils import Game, invalidate_cache
from exceptions import MinerException, RequestException
from constants import BASE_URL, GQL_OPERATIONS, ONLINE_DELAY, DROPS_ENABLED_TAG
from constants import BASE_URL, GQL_OPERATIONS, ONLINE_DELAY, DROPS_ENABLED_TAG, URLType
if TYPE_CHECKING:
from twitch import Twitch
@@ -79,7 +79,7 @@ class Channel:
self.id: int = int(id)
self._login: str = login
self._display_name: str | None = display_name
self._spade_url: str | None = None
self._spade_url: URLType | None = None
self.points: int | None = None
self._stream: Stream | None = None
self._pending_stream_up: asyncio.Task[Any] | None = None
@@ -141,8 +141,8 @@ class Channel:
return self._login
@property
def url(self) -> str:
return f"{BASE_URL}/{self._login}"
def url(self) -> URLType:
return URLType(f"{BASE_URL}/{self._login}")
@property
def iid(self) -> str:
@@ -206,7 +206,7 @@ class Channel:
self._pending_stream_up = None
self._gui_channels.remove(self)
async def get_spade_url(self) -> str:
async def get_spade_url(self) -> URLType:
"""
To get this monstrous thing, you have to walk a chain of requests.
Streamer page (HTML) --parse-> Streamer Settings (JavaScript) --parse-> Spade URL
@@ -228,7 +228,7 @@ class Channel:
)
if not match:
raise MinerException("Error while spade_url extraction: step #2")
return match.group(1)
return URLType(match.group(1))
async def get_stream(self) -> Stream | None:
response: JsonType | None = await self._twitch.gql_request(

View File

@@ -6,7 +6,7 @@ from copy import copy
from pathlib import Path
from enum import Enum, auto
from datetime import timedelta
from typing import Any, Dict, Literal, TYPE_CHECKING
from typing import Any, Dict, Literal, NewType, TYPE_CHECKING
from typing_extensions import TypeAlias
@@ -21,10 +21,13 @@ SELF_PATH = Path(sys.argv[0])
WORKING_DIR = SELF_PATH.absolute().parent
# Other Paths
LOG_PATH = Path(WORKING_DIR, "log.txt")
CACHE_PATH = Path(WORKING_DIR, "cache")
CACHE_DB = Path(CACHE_PATH, "mapping.json")
COOKIES_PATH = Path(WORKING_DIR, "cookies.jar")
SETTINGS_PATH = Path(WORKING_DIR, "settings.json")
# Typing
JsonType = Dict[str, Any]
URLType = NewType("URLType", str)
TopicProcess: TypeAlias = "abc.Callable[[int, JsonType], Any]"
# Values
MAX_WEBSOCKETS = 8

218
gui.py
View File

@@ -10,22 +10,19 @@ from collections import abc, namedtuple, OrderedDict
from tkinter import Tk, ttk, StringVar, DoubleVar, IntVar
from typing import Any, TypedDict, NoReturn, TYPE_CHECKING
try:
import pystray
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install pystray' first") from exc
import pystray
from PIL import Image as Image_module
from utils import resource_path
from cache import ImageCache
from exceptions import ExitRequest
from utils import resource_path, Game
from registry import RegistryKey, ValueType
from constants import SELF_PATH, FORMATTER, WS_TOPICS_LIMIT, MAX_WEBSOCKETS, WINDOW_TITLE, State
if TYPE_CHECKING:
from pathlib import Path
from twitch import Twitch
from channel import Channel
from inventory import Game, TimedDrop
from inventory import DropsCampaign, TimedDrop
digits = ceil(log10(WS_TOPICS_LIMIT))
@@ -33,15 +30,6 @@ WS_FONT = ("Courier New", 10)
LARGE_FONT = (..., 12)
class _ICOImage:
def __init__(self, path: Path | str):
with open(path, 'rb') as file:
self._data = file.read()
def save(self, file, format):
file.write(self._data)
class _TKOutputHandler(logging.Handler):
def __init__(self, output: GUIManager):
super().__init__()
@@ -609,8 +597,6 @@ class CampaignProgress:
vars_campaign["percentage"].set(
f"{campaign.progress:6.1%} ({campaign.claimed_drops}/{campaign.total_drops})"
)
# tray
self._manager.tray.display_progress(drop)
self.stop_timer()
if countdown:
# restart our seconds update timer
@@ -927,7 +913,7 @@ class TrayIcon:
)
self.icon = pystray.Icon(
"twitch_miner",
_ICOImage(resource_path("pickaxe.ico")),
Image_module.open(resource_path("pickaxe.ico")),
self.get_title(drop),
menu,
)
@@ -963,7 +949,7 @@ class TrayIcon:
asyncio.create_task(notifier())
def display_progress(self, drop: TimedDrop):
def update_title(self, drop: TimedDrop):
if self.icon is not None:
self.icon.title = self.get_title(drop)
@@ -984,6 +970,126 @@ class Notebook:
self._nb.add(widget, text=name, **kwargs)
class InventoryOverview:
def __init__(self, manager: GUIManager, master: ttk.Widget):
self._cache = manager._cache
master.rowconfigure(0, weight=1)
master.columnconfigure(0, weight=1)
self._canvas = tk.Canvas(master, scrollregion=(0, 0, 0, 0))
self._canvas.grid(column=0, row=0, sticky="nsew")
xscroll = ttk.Scrollbar(master, orient="horizontal", command=self._canvas.xview)
xscroll.grid(column=0, row=1, sticky="ew")
yscroll = ttk.Scrollbar(master, orient="vertical", command=self._canvas.yview)
yscroll.grid(column=1, row=0, sticky="ns")
self._canvas.configure(xscrollcommand=xscroll.set, yscrollcommand=yscroll.set)
self._canvas.bind("<Configure>", self._canvas_update)
self._main_frame = ttk.Frame(self._canvas)
self._canvas.bind(
"<Enter>", lambda e: self._canvas.bind_all("<MouseWheel>", self._on_mousewheel)
)
self._canvas.bind("<Leave>", lambda e: self._canvas.unbind_all("<MouseWheel>"))
self._canvas.create_window(0, 0, anchor="nw", window=self._main_frame)
self._campaigns: list[DropsCampaign] = []
self._drops: dict[str, ttk.Label] = {}
def _canvas_update(self, event: tk.Event[tk.Canvas]):
self._canvas.configure(scrollregion=self._canvas.bbox("all"))
def _on_mousewheel(self, event: tk.Event[tk.Misc]):
delta = -1 if event.delta > 0 else 1
state: int = event.state if isinstance(event.state, int) else 0
if state & 1:
scroll = self._canvas.xview_scroll
else:
scroll = self._canvas.yview_scroll
scroll(delta, "units")
async def add_campaign(self, campaign: DropsCampaign) -> None:
campaign_frame = ttk.Frame(self._main_frame, relief="ridge", borderwidth=1, padding=4)
campaign_frame.grid(column=0, row=len(self._campaigns), sticky="nsew", pady=3)
self._campaigns.append(campaign)
campaign_frame.rowconfigure(3, weight=1)
campaign_frame.columnconfigure(1, weight=1)
campaign_frame.columnconfigure(3, weight=10000)
ttk.Label(
campaign_frame, text=campaign.name, takefocus=False, width=45
).grid(column=0, row=0, columnspan=2, sticky="w")
if campaign.active:
status_text: str = "Active ✔"
status_color: tk._Color = "green"
elif campaign.upcoming:
status_text = "Upcoming ⏳"
status_color = "goldenrod"
else:
status_text = "Expired ❌"
status_color = "red"
ttk.Label(
campaign_frame, text=status_text, takefocus=False, foreground=status_color
).grid(column=1, row=1, sticky="w", padx=4)
ttk.Label(
campaign_frame,
text=f"Ends: {campaign.ends_at.astimezone().replace(microsecond=0, tzinfo=None)}",
takefocus=False
).grid(column=1, row=2, sticky="w", padx=4)
acl = campaign.allowed_channels
if acl:
if len(acl) <= 5:
allowed_text: str = '\n'.join(ch.name for ch in acl)
else:
allowed_text = '\n'.join(ch.name for ch in acl[:4])
allowed_text += f"\nand {len(acl) - 4} more..."
else:
allowed_text = "All"
ttk.Label(
campaign_frame, text=f"Allowed channels:\n{allowed_text}", takefocus=False
).grid(column=1, row=3, sticky="nw", padx=4)
campaign_image = await self._cache.get(campaign.image_url, size=(96, 128))
ttk.Label(campaign_frame, image=campaign_image).grid(column=0, row=1, rowspan=3)
ttk.Separator(
campaign_frame, orient="vertical", takefocus=False
).grid(column=2, row=0, rowspan=4, sticky="ns")
drops_row = ttk.Frame(campaign_frame)
drops_row.grid(column=3, row=0, rowspan=4, sticky="nsew", padx=4)
drops_row.rowconfigure(0, weight=1)
for i, drop in enumerate(campaign.drops):
drop_frame = ttk.Frame(drops_row, relief="ridge", borderwidth=1, padding=5)
drop_frame.grid(column=i, row=0, padx=4)
drop_image = await self._cache.get(drop.image_url, (80, 80))
ttk.Label(
drop_frame, text=drop.rewards_text(), image=drop_image, compound="bottom"
).grid(column=0, row=0)
progress_text, progress_color = self.get_progress(drop)
self._drops[drop.id] = label = ttk.Label(
drop_frame, text=progress_text, foreground=progress_color
)
label.grid(column=0, row=1)
def clear(self) -> None:
for child in self._main_frame.winfo_children():
child.destroy()
self._campaigns.clear()
def get_progress(self, drop: TimedDrop) -> tuple[str, tk._Color]:
progress_text: str = ''
progress_color: tk._Color = ''
if drop.is_claimed:
progress_text = "Claimed ✔"
progress_color = "green"
elif drop.can_claim:
progress_text = "Ready to claim ⏳"
progress_color = "goldenrod"
elif drop.preconditions:
progress_text = f"{drop.progress:3.1%} of {drop.required_minutes} minutes"
return (progress_text, progress_color)
def update_drop(self, drop: TimedDrop) -> None:
label = self._drops.get(drop.id)
if label is None:
return
progress_text, progress_color = self.get_progress(drop)
label.config(text=progress_text, foreground=progress_color)
class _SettingsVars(TypedDict):
tray: IntVar
autostart: IntVar
@@ -1106,7 +1212,7 @@ class SettingsPanel:
self._settings.autostart_tray = tray
if enabled:
# NOTE: we need double quotes in case the path contains spaces
self_path = f'"{SELF_PATH.absolute().resolve()!s}"'
self_path = f'"{SELF_PATH.resolve()!s}"'
if tray:
self_path += " --tray"
with RegistryKey("HKCU/Software/Microsoft/Windows/CurrentVersion/Run") as key:
@@ -1227,11 +1333,13 @@ class GUIManager:
self._root = root = Tk()
# withdraw immediately to prevent the window from flashing
self._root.withdraw()
root.resizable(False, True)
# root.resizable(False, True)
root.iconbitmap(resource_path("pickaxe.ico")) # window icon
root.title(WINDOW_TITLE) # window title
root.protocol("WM_DELETE_WINDOW", self.close) # hook the X window closing button
root.bind_all("<KeyPress-Escape>", self.unfocus) # pressing ESC unfocuses selection
# Image cache for displaying images
self._cache = ImageCache(self)
# style adjustements
self._style = style = ttk.Style(root)
@@ -1283,13 +1391,17 @@ class GUIManager:
self.games = GameSelector(self, main_frame)
self.output = ConsoleOutput(self, main_frame)
self.channels = ChannelList(self, main_frame)
# Inventory tab
inv_frame = ttk.Frame(root_frame, padding=8)
self.inv = InventoryOverview(self, inv_frame)
self.tabs.add_tab(inv_frame, name="Inventory")
# Settings tab
settings_frame = ttk.Frame(root_frame, padding=8)
self.settings = SettingsPanel(self, settings_frame)
self.tabs.add_tab(settings_frame, name="Settings")
# clamp minimum window height (update first, so that geometry calculates the size)
# clamp minimum window size (update geometry first)
root.update_idletasks()
root.minsize(width=0, height=root.winfo_reqheight())
root.minsize(width=root.winfo_reqwidth(), height=root.winfo_reqheight())
# register logging handler
self._handler = _TKOutputHandler(self)
self._handler.setFormatter(FORMATTER)
@@ -1324,6 +1436,13 @@ class GUIManager:
def close_requested(self) -> bool:
return self._closed.is_set()
async def wait_until_closed(self):
# wait until the user closes the window
await self._closed.wait()
def prevent_close(self):
self._closed.clear()
def start(self):
if self._poll_task is None:
self._poll_task = asyncio.create_task(self._poll())
@@ -1374,16 +1493,20 @@ class GUIManager:
self.channels.clear_selection()
self.settings.clear_selection()
# these are here to interface with underlaying GUI components
def save(self) -> None:
self._cache.save()
def set_games(self, games: abc.Iterable[Game]) -> None:
self.games.set_games(games)
self.settings.set_games(games)
def prevent_close(self):
self._closed.clear()
async def wait_until_closed(self):
# wait until the user closes the window
await self._closed.wait()
def display_drop(
self, drop: TimedDrop, *, countdown: bool = True, subone: bool = False
) -> None:
self.progress.display(drop, countdown=countdown, subone=subone) # main tab
self.inv.update_drop(drop) # inventory
self.tray.update_title(drop) # tray
def print(self, *args, **kwargs):
# print to our custom output
@@ -1393,6 +1516,7 @@ class GUIManager:
if __name__ == "__main__":
# Everything below is for debug purposes only
from types import SimpleNamespace
from datetime import datetime, timedelta, timezone
class StrNamespace(SimpleNamespace):
def __str__(self):
@@ -1453,6 +1577,12 @@ if __name__ == "__main__":
id="0",
campaign=SimpleNamespace(
name=campaign_name,
id="campaign",
active=False,
upcoming=True,
image_url="https://static-cdn.jtvnw.net/ttv-boxart/460630-285x380.jpg",
allowed_channels=[],
ends_at=datetime.now(timezone.utc) + timedelta(days=5, hours=6, minutes=23),
timed_drops={},
claimed_drops=cd,
total_drops=td,
@@ -1460,6 +1590,12 @@ if __name__ == "__main__":
progress=(cd * tm + cm) / (td * tm),
remaining_minutes=(td - cd) * tm - cm,
),
image_url=(
"https://static-cdn.jtvnw.net/twitch-drops-assets-prod/"
"BENEFIT-81ab5665-b2f4-4179-96e6-74da5a82da28.jpeg"
),
is_claimed=False,
preconditions=True,
rewards_text=lambda: rewards,
progress=cm/tm,
current_minutes=cm,
@@ -1467,6 +1603,7 @@ if __name__ == "__main__":
remaining_minutes=tm-cm,
)
mock.campaign.timed_drops["0"] = mock
mock.campaign.drops = mock.campaign.timed_drops.values()
return mock
async def main(exit_event: asyncio.Event):
@@ -1539,9 +1676,11 @@ if __name__ == "__main__":
# gui.tray.minimize()
await asyncio.sleep(1)
gui.tray.notify("Bounty Coins (3/7)", "Mined Drop")
# Drop progress
# Inventory overview
drop = create_drop("Wardrobe Cleaning", "Fancy Pants", 2, 7, 239, 240)
gui.progress.display(drop)
await gui.inv.add_campaign(drop.campaign)
# Drop progress
gui.display_drop(drop)
await asyncio.sleep(63)
drop.current_minutes = 240
drop.remaining_minutes = 0
@@ -1551,14 +1690,21 @@ if __name__ == "__main__":
campaign.progress = 3/7
campaign.claimed_drops = 3
campaign.remaining_drops = 4
gui.progress.display(drop)
gui.display_drop(drop)
await asyncio.sleep(10)
drop.current_minutes = 0
drop.remaining_minutes = 240
drop.progress = 0.0
gui.progress.display(drop)
gui.display_drop(drop)
def main_exit(task: asyncio.Task[None]) -> None:
if task.exception() is not None:
exit_event.set()
loop = asyncio.get_event_loop()
exit_event = asyncio.Event()
loop.create_task(main(exit_event))
main_task = loop.create_task(main(exit_event))
main_task.add_done_callback(main_exit)
loop.run_until_complete(exit_event.wait())
if main_task.done():
loop.run_until_complete(main_task)

View File

@@ -1,19 +1,27 @@
from __future__ import annotations
import re
from typing import TYPE_CHECKING
from functools import cached_property
from datetime import datetime, timezone
from channel import Channel
from constants import GQL_OPERATIONS
from constants import GQL_OPERATIONS, URLType
from utils import timestamp, invalidate_cache, Game
if TYPE_CHECKING:
from collections import abc
from twitch import Twitch
from gui import GUIManager
from constants import JsonType
from gui import CampaignProgress
DIMS_PATTERN = re.compile(r'-\d+x\d+(?=\.(?:jpg|png|gif)$)', re.I)
def remove_dimensions(url: URLType) -> URLType:
return URLType(DIMS_PATTERN.sub('', url))
class BaseDrop:
@@ -25,6 +33,8 @@ class BaseDrop:
self.name: str = data["name"]
self.campaign: DropsCampaign = campaign
self.rewards: list[str] = [b["benefit"]["name"] for b in data["benefitEdges"]]
# we use the first benefit's image specifically here
self.image_url: URLType = data["benefitEdges"][0]["benefit"]["imageAssetURL"]
self.starts_at: datetime = timestamp(data["startAt"])
self.ends_at: datetime = timestamp(data["endAt"])
self.claim_id: str | None = None
@@ -134,7 +144,7 @@ class TimedDrop(BaseDrop):
self, campaign: DropsCampaign, data: JsonType, claimed_benefits: dict[str, datetime]
):
super().__init__(campaign, data, claimed_benefits)
self._gui_progress: CampaignProgress = self._twitch.gui.progress
self._manager: GUIManager = self._twitch.gui
self.current_minutes: int = 0
if "self" in data:
self.current_minutes = data["self"]["currentMinutesWatched"]
@@ -179,7 +189,7 @@ class TimedDrop(BaseDrop):
self._on_minutes_changed()
def display(self, *, countdown: bool = True, subone: bool = False):
self._gui_progress.display(self, countdown=countdown, subone=subone)
self._manager.display_drop(self, countdown=countdown, subone=subone)
def bump_minutes(self):
if self.current_minutes < self.required_minutes:
@@ -193,6 +203,9 @@ class DropsCampaign:
self.id: str = data["id"]
self.name: str = data["name"]
self.game: Game = Game(data["game"])
# campaign's image actually comes from the game object
# we use regex to get rid of the dimensions part (ex. ".../game_id-285x380.jpg")
self.image_url: URLType = remove_dimensions(data["game"]["boxArtURL"])
self.starts_at: datetime = timestamp(data["startAt"])
self.ends_at: datetime = timestamp(data["endAt"])
allowed: JsonType = data["allow"]

18
main.py
View File

@@ -12,6 +12,24 @@ import tkinter as tk
from tkinter import messagebox
from typing import IO, NoReturn
# pre-import 3rd party libraries, handling missing ones with an exception
try:
import aiohttp # noqa
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install aiohttp' first") from exc
try:
import websockets # noqa
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install websockets' first") from exc
try:
import pystray # noqa
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install pystray' first") from exc
try:
import PIL # noqa
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install pillow' first") from exc
from twitch import Twitch
from settings import Settings
from version import __version__

View File

@@ -1,10 +1,9 @@
from __future__ import annotations
import json
from enum import Enum
from typing import Any, TypedDict, TYPE_CHECKING
from constants import JsonType, SETTINGS_PATH
from constants import SETTINGS_PATH
from utils import json_load, json_save
if TYPE_CHECKING:
from main import ParsedArgs
@@ -18,9 +17,6 @@ class SettingsFile(TypedDict):
autostart_tray: bool
serialize_env: dict[str, type] = {
"set": set,
}
default_settings: SettingsFile = {
"priority": [],
"exclude": set(),
@@ -30,26 +26,6 @@ default_settings: SettingsFile = {
}
def serialize(obj: Any) -> Any:
if isinstance(obj, (set, Enum)):
if isinstance(obj, set):
d = list(obj)
elif isinstance(obj, Enum):
d = obj.value
return {
"__type": type(obj).__name__,
"data": d,
}
raise TypeError(obj)
def deserialize(obj: JsonType) -> Any:
if "__type" in obj:
t = eval(obj["__type"], None, serialize_env)
return t(obj["data"])
return obj
class Settings:
# from args
log: bool
@@ -67,10 +43,7 @@ class Settings:
autostart_tray: bool
def __init__(self, args: ParsedArgs):
self._settings: SettingsFile = default_settings.copy()
if SETTINGS_PATH.exists():
with open(SETTINGS_PATH, 'r') as file:
self._settings.update(json.load(file, object_hook=deserialize))
self._settings: SettingsFile = json_load(SETTINGS_PATH, default_settings)
self._args: ParsedArgs = args
# default logic of reading settings is to check args first, then the settings file
@@ -78,7 +51,7 @@ class Settings:
if hasattr(self._args, name):
return getattr(self._args, name)
elif name in self._settings:
return self._settings[name] # type: ignore[misc]
return self._settings[name] # type: ignore[literal-required]
return getattr(super(), name)
def __setattr__(self, name: str, value: Any, /) -> None:
@@ -86,7 +59,7 @@ class Settings:
# passthrough
return super().__setattr__(name, value)
elif name in self._settings:
self._settings[name] = value # type: ignore[misc]
self._settings[name] = value # type: ignore[literal-required]
return
raise TypeError(f"{name} is missing a custom setter")
@@ -94,5 +67,4 @@ class Settings:
raise RuntimeError("settings can't be deleted")
def save(self) -> None:
with open(SETTINGS_PATH, 'w') as file:
json.dump(self._settings, file, default=serialize, sort_keys=True, indent=4)
json_save(SETTINGS_PATH, self._settings)

View File

@@ -10,10 +10,7 @@ from collections import abc, OrderedDict
from contextlib import suppress, asynccontextmanager
from typing import Final, NoReturn, cast, TYPE_CHECKING
try:
import aiohttp
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install aiohttp' first") from exc
import aiohttp
from gui import GUIManager
from channel import Channel
@@ -102,11 +99,13 @@ class Twitch:
if self._session is not None:
cookie_jar = cast(aiohttp.CookieJar, self._session.cookie_jar)
cookie_jar.save(COOKIES_PATH)
await self._session.close()
session = self._session
self._session = None
await session.close()
await self.websocket.stop()
# save settings
# save important files
self.settings.save()
self.gui.save()
# wait at least one full second + whatever it takes to complete the closing
# this allows aiohttp to safely close the session
await asyncio.sleep(start_time + 0.5 - time())
@@ -834,28 +833,27 @@ class Twitch:
return response_json
async def fetch_campaign(
self, campaign_id: str, claimed_benefits: dict[str, datetime]
self,
campaign_id: str,
available_data: JsonType,
claimed_benefits: dict[str, datetime],
) -> DropsCampaign:
response = await self.gql_request(
GQL_OPERATIONS["CampaignDetails"].with_variables(
{"channelLogin": str(self._user_id), "dropID": campaign_id}
)
)
return DropsCampaign(self, response["data"]["user"]["dropCampaign"], claimed_benefits)
campaign_data: JsonType = response["data"]["user"]["dropCampaign"]
# NOTE: we use available_data to add a couple of fields missing from the existing details,
# most notably: game boxart
campaign_data["game"]["boxArtURL"] = available_data["game"]["boxArtURL"]
return DropsCampaign(self, campaign_data, claimed_benefits)
async def fetch_inventory(self) -> None:
# fetch all available campaign IDs, that are currently ACTIVE and account is connected
response = await self.gql_request(GQL_OPERATIONS["Campaigns"])
data = response["data"]["currentUser"]["dropCampaigns"] or []
applicable_statuses = ("ACTIVE", "UPCOMING")
available_campaigns: set[str] = set(
c["id"] for c in data
if c["status"] in applicable_statuses and c["self"]["isAccountConnected"]
)
# fetch in-progress campaigns (inventory)
response = await self.gql_request(GQL_OPERATIONS["Inventory"])
inventory = response["data"]["currentUser"]["inventory"]
ongoing_campaigns = inventory["dropCampaignsInProgress"] or []
inventory: JsonType = response["data"]["currentUser"]["inventory"]
ongoing_campaigns: list[JsonType] = inventory["dropCampaignsInProgress"] or []
# this contains claimed benefit edge IDs, not drop IDs
claimed_benefits: dict[str, datetime] = {
b["id"]: timestamp(b["lastAwardedAt"]) for b in inventory["gameEventDrops"]
@@ -864,22 +862,35 @@ class Twitch:
DropsCampaign(self, campaign_data, claimed_benefits)
for campaign_data in ongoing_campaigns
]
# filter out in-progress campaigns from all available campaigns,
# since we already have all information needed for them
for campaign in campaigns:
available_campaigns.discard(campaign.id)
# fetch all available campaigns data
response = await self.gql_request(GQL_OPERATIONS["Campaigns"])
available_list: list[JsonType] = response["data"]["currentUser"]["dropCampaigns"] or []
applicable_statuses = ("ACTIVE", "UPCOMING")
existing_campaigns: set[str] = set(c.id for c in campaigns)
available_campaigns: dict[str, JsonType] = {
c["id"]: c
for c in available_list
if (
c["status"] in applicable_statuses # that are currently ACTIVE
and c["self"]["isAccountConnected"] # and account is connected
and c["id"] not in existing_campaigns # and they aren't in the inventory already
)
}
# add campaigns that remained, that can be earned but are not in-progress yet
for campaign_id in available_campaigns:
campaign = await self.fetch_campaign(campaign_id, claimed_benefits)
for campaign_id, available_data in available_campaigns.items():
campaign = await self.fetch_campaign(campaign_id, available_data, claimed_benefits)
if any(drop.can_earn() for drop in campaign.drops):
campaigns.append(campaign)
campaigns.sort(key=lambda c: c.ends_at)
self.inventory.clear()
self.gui.inv.clear()
for campaign in campaigns:
game = campaign.game
if game not in self.inventory:
self.inventory[game] = []
await self.gui.inv.add_campaign(campaign)
self.inventory[game].append(campaign)
self.gui.save()
def get_drop(self, drop_id: str) -> TimedDrop | None:
"""

View File

@@ -1,16 +1,20 @@
from __future__ import annotations
import sys
import json
import random
import string
import asyncio
import logging
from enum import Enum
from pathlib import Path
from functools import wraps
from contextlib import suppress
from datetime import datetime, timezone
from collections import abc, OrderedDict
from typing import Any, Literal, MutableSet, Generic, TypeVar, cast, TYPE_CHECKING
from typing import (
Any, Literal, MutableSet, Callable, Generic, Mapping, TypeVar, cast, TYPE_CHECKING
)
from constants import WORKING_DIR, JsonType
@@ -26,8 +30,13 @@ else:
_T = TypeVar("_T") # type
_D = TypeVar("_D") # default
_P = ParamSpec("_P") # params
_JSON_T = TypeVar("_JSON_T", bound=Mapping[Any, Any])
logger = logging.getLogger("TwitchDrops")
NONCE_CHARS = string.ascii_letters + string.digits
serialize_env: dict[str, Callable[[Any], object]] = {
"set": set,
"datetime": lambda d: datetime.fromtimestamp(d, timezone.utc),
}
def resource_path(relative_path: Path | str) -> Path:
@@ -79,6 +88,46 @@ def invalidate_cache(instance, *attrnames):
delattr(instance, name)
def _serialize(obj: Any) -> Any:
# convert data
d: int | str | float | list[Any] | JsonType
if isinstance(obj, set):
d = list(obj)
elif isinstance(obj, Enum):
d = obj.value
elif isinstance(obj, datetime):
if obj.tzinfo is None:
# assume naive objects are UTC
obj = obj.replace(tzinfo=timezone.utc)
d = obj.timestamp()
else:
raise TypeError(obj)
# store with type
return {
"__type": type(obj).__name__,
"data": d,
}
def _deserialize(obj: JsonType) -> Any:
if "__type" in obj:
return serialize_env[obj["__type"]](obj["data"])
return obj
def json_load(path: Path, defaults: _JSON_T) -> _JSON_T:
loaded: JsonType = dict(defaults)
if path.exists():
with open(path, 'r') as file:
loaded.update(json.load(file, object_hook=_deserialize))
return cast(_JSON_T, loaded)
def json_save(path: Path, contents: Mapping[Any, Any]) -> None:
with open(path, 'w') as file:
json.dump(contents, file, default=_serialize, sort_keys=True, indent=4)
class OrderedSet(MutableSet[_T]):
"""
Implementation of a set that preserves insertion order,

View File

@@ -7,11 +7,8 @@ from time import time
from contextlib import suppress
from typing import TYPE_CHECKING
try:
from websockets.exceptions import ConnectionClosed, ConnectionClosedOK
from websockets.client import WebSocketClientProtocol, connect as websocket_connect
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install websockets' first") from exc
from websockets.exceptions import ConnectionClosed, ConnectionClosedOK
from websockets.client import WebSocketClientProtocol, connect as websocket_connect
from exceptions import MinerException
from utils import task_wrapper, create_nonce, AwaitableValue