diff --git a/.gitignore b/.gitignore index 4d563a4..dc7926b 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,5 @@ __pycache__ /*.tmp /Twitch Drops Miner # Dev files -cookies* +cookies.jar +settings.json diff --git a/gui.py b/gui.py index 4cb7634..c836b03 100644 --- a/gui.py +++ b/gui.py @@ -1,12 +1,15 @@ from __future__ import annotations +import sys import asyncio import logging import tkinter as tk +from pathlib import Path from math import log10, ceil +from functools import partial from tkinter.font import Font -from tkinter import Tk, ttk, StringVar, DoubleVar from collections import abc, namedtuple, OrderedDict +from tkinter import Tk, ttk, StringVar, DoubleVar, IntVar from typing import Any, TypedDict, NoReturn, TYPE_CHECKING try: @@ -15,6 +18,7 @@ except ModuleNotFoundError as exc: raise ImportError("You have to run 'pip install pystray' first") from exc from utils import resource_path +from registry import RegistryKey, ValueType from constants import FORMATTER, WS_TOPICS_LIMIT, MAX_WEBSOCKETS, WINDOW_TITLE, State if TYPE_CHECKING: @@ -25,6 +29,7 @@ if TYPE_CHECKING: digits = ceil(log10(WS_TOPICS_LIMIT)) WS_FONT = ("Courier New", 10) +LARGE_FONT = (..., 12) class _ICOImage: @@ -49,10 +54,10 @@ class PlaceholderEntry(ttk.Entry): def __init__( self, master: ttk.Widget, - *args, + *args: Any, placeholder: str, placeholdercolor: str = "grey60", - **kwargs, + **kwargs: Any, ): super().__init__(master, *args, **kwargs) self._show: str = kwargs.get("show", '') @@ -61,19 +66,24 @@ class PlaceholderEntry(ttk.Entry): self._ph_text: str = placeholder self.bind("", self._focus_in) self.bind("", self._focus_out) - self._ph: bool = False - self._focus_out(None) + if isinstance(self, ttk.Combobox): + # only bind this for comboboxes + self.bind("<>", self._combobox_select) + # start with a placeholder + self._ph: bool = True + super().config(foreground=self._ph_color, show='') + super().insert(0, self._ph_text) - def _focus_in(self, event): + def _focus_in(self, event: tk.Event[PlaceholderEntry]) -> None: """ On focus in, if we've had a placeholder, clear the box and set normal text colour and show. """ if self._ph: self._ph = False self.delete(0, "end") - self.config(foreground=self._text_color, show=self._show) + super().config(foreground=self._text_color, show=self._show) - def _focus_out(self, event): + def _focus_out(self, event: tk.Event[PlaceholderEntry]) -> None: """ On focus out, if we're empty, insert a placeholder, set placeholder text color and make sure it's shown. @@ -81,40 +91,142 @@ class PlaceholderEntry(ttk.Entry): """ if not super().get(): self._ph = True - self.config(foreground=self._ph_color, show='') - self.insert(0, self._ph_text) + super().config(foreground=self._ph_color, show='') + super().insert(0, self._ph_text) - def _store_option(self, options: dict[str, Any], name: str, attr: str): + def _combobox_select(self, event: tk.Event[PlaceholderEntry]): + # combobox clears and inserts the selected value internally, bypassing the insert method + # disable the placeholder flag and set the color here, so _focus_in doesn't clear the entry + self._ph = False + super().config(foreground=self._text_color, show=self._show) + + def _store_option( + self, options: dict[str, object], name: str, attr: str, *, remove: bool = False + ) -> None: if name in options: - setattr(self, attr, options[name]) + if remove: + value = options.pop(name) + else: + value = options[name] + setattr(self, attr, value) - def configure(self, *args, **kwargs): - if args: - options = args[0] + def configure(self, *args: Any, **kwargs: Any) -> Any: + options = {} + if args and args[0] is not None: + options.update(args[0]) if kwargs: - options = kwargs + options.update(kwargs) self._store_option(options, "show", "_show") - self._store_option(options, "placeholder", "_ph_text") + self._store_option(options, "placeholder", "_ph_text", remove=True) self._store_option(options, "foreground", "_text_color") - self._store_option(options, "placeholdercolor", "_ph_color") - super().configure(*args, *kwargs) + self._store_option(options, "placeholdercolor", "_ph_color", remove=True) + return super().configure(*args, **kwargs) - def get(self): + def config(self, *args: Any, **kwargs: Any) -> Any: + # because 'config = configure' makes mypy complain + self.configure(*args, **kwargs) + + def get(self) -> str: if self._ph: return '' return super().get() - def clear(self): + def insert(self, index: tk._EntryIndex, content: str) -> None: + # when inserting into the entry externally, disable the placeholder flag + if self._ph: + self._ph = False + self.delete(0, "end") + super().config(foreground=self._text_color, show=self._show) + return super().insert(index, content) + + def clear(self) -> None: self.delete(0, "end") self._ph = True - self.config(foreground=self._ph_color, show='') - self.insert(0, self._ph_text) + super().config(foreground=self._ph_color, show='') + super().insert(0, self._ph_text) - def enable(self): - super().configure(state="normal") - def disable(self): - super().configure(state="disabled") +class PlaceholderCombobox(PlaceholderEntry, ttk.Combobox): + pass + + +class PaddedListbox(tk.Listbox): + def __init__(self, master: ttk.Widget, *args, padding: tk._Padding = (0, 0, 0, 0), **kwargs): + # we place the listbox inside a frame with the same background + # this means we need to forward the 'grid' method to the frame, not the listbox + self._frame = tk.Frame(master) + self._frame.rowconfigure(0, weight=1) + self._frame.columnconfigure(0, weight=1) + super().__init__(self._frame) + # mimic default listbox style with sunken relief and borderwidth of 1 + if "relief" not in kwargs: + kwargs["relief"] = "sunken" + if "borderwidth" not in kwargs: + kwargs["borderwidth"] = 1 + self.configure(*args, padding=padding, **kwargs) + + def grid(self, *args, **kwargs): + return self._frame.grid(*args, **kwargs) + + def grid_remove(self) -> None: + return self._frame.grid_remove() + + def grid_info(self) -> tk._GridInfo: + return self._frame.grid_info() + + def grid_forget(self) -> None: + return self._frame.grid_forget() + + def configure(self, *args: Any, **kwargs: Any) -> Any: + options = {} + if args and args[0] is not None: + options.update(args[0]) + if kwargs: + options.update(kwargs) + # NOTE on processed options: + # • relief is applied to the frame only + # • background is copied, so that both listbox and frame change color + # • borderwidth is applied to the frame only + # bg is folded into background for easier processing + if "bg" in options: + options["background"] = options.pop("bg") + frame_options = {} + if "relief" in options: + frame_options["relief"] = options.pop("relief") + if "background" in options: + frame_options["background"] = options["background"] # copy + if "borderwidth" in options: + frame_options["borderwidth"] = options.pop("borderwidth") + self._frame.configure(frame_options) + # update padding + if "padding" in options: + padding: tk._Padding = options.pop("padding") + padx1: tk._ScreenUnits + padx2: tk._ScreenUnits + pady1: tk._ScreenUnits + pady2: tk._ScreenUnits + if not isinstance(padding, tuple) or len(padding) == 1: + if isinstance(padding, tuple): + padding = padding[0] + padx1 = padx2 = pady1 = pady2 = padding + elif len(padding) == 2: + padx1 = padx2 = padding[0] + pady1 = pady2 = padding[1] # type: ignore + elif len(padding) == 3: + padx1, padx2 = padding[0:2] # type: ignore + pady1 = pady2 = padding[2] # type: ignore + else: + padx1, padx2, pady1, pady2 = padding # type: ignore + super().grid(column=0, row=0, padx=(padx1, padx2), pady=(pady1, pady2), sticky="nsew") + else: + super().grid(column=0, row=0, sticky="nsew") + # listbox uses flat relief to blend in with the inside of the frame + options["relief"] = "flat" + return super().configure(options) + + def config(self, *args: Any, **kwargs: Any) -> Any: + # because 'config = configure' makes mypy complain + self.configure(*args, **kwargs) class _WSEntry(TypedDict): @@ -124,8 +236,8 @@ class _WSEntry(TypedDict): class WebsocketStatus: def __init__(self, manager: GUIManager, master: ttk.Widget): - self._status_var = StringVar() - self._topics_var = StringVar() + self._status_var = StringVar(master) + self._topics_var = StringVar(master) frame = ttk.LabelFrame(master, text="Websocket Status", padding=(4, 0, 4, 4)) frame.grid(column=0, row=0, sticky="nsew", padx=2) ttk.Label( @@ -188,7 +300,7 @@ LoginData = namedtuple("LoginData", ["username", "password", "token"]) class LoginForm: def __init__(self, manager: GUIManager, master: ttk.Widget): self._manager = manager - self._var = StringVar() + self._var = StringVar(master) frame = ttk.LabelFrame(master, text="Login Form", padding=(4, 0, 4, 4)) frame.grid(column=1, row=0, sticky="nsew", padx=2) frame.columnconfigure(0, weight=2) @@ -237,40 +349,39 @@ class LoginForm: class GameSelector: def __init__(self, manager: GUIManager, master: ttk.Widget): self._manager = manager - self._var = StringVar() + self._settings = manager._twitch.settings + self._var = StringVar(master) frame = ttk.LabelFrame(master, text="Game Selector", padding=(4, 0, 4, 4)) frame.grid(column=1, row=1, sticky="nsew", padx=2) + frame.rowconfigure(0, weight=1) frame.columnconfigure(0, weight=1) - self._list = tk.Listbox( + self._list = PaddedListbox( frame, height=5, - selectmode="single", + padding=(1, 0), activestyle="none", - exportselection=False, + selectmode="single", highlightthickness=0, + exportselection=False, ) - self._list.pack(fill="both", expand=True) - self._selection: str | None = self._manager._twitch.options.game - self._games: OrderedDict[str, Game] = OrderedDict() - self._excluded: set[int] = set() + self._list.grid(column=0, row=0, sticky="nsew") + self._selection: str | None = None + self._games: OrderedDict[str, tuple[int, Game]] = OrderedDict() + self._excluded_indices: set[int] = set() self._list.bind("<>", self._on_select) def set_games(self, games: abc.Iterable[Game]): self._games.clear() - self._games.update((str(g), g) for g in games) + self._games.update((str(game), (i, game)) for i, game in enumerate(games)) self._list.delete(0, "end") self._list.insert("end", *self._games.keys()) self._list.config(width=0) # autoadjust listbox width - # process excluded games and relink the selection - self._excluded.clear() + # gray-out excluded games and relink the selection + self._excluded_indices.clear() + self._exclude_sync() selected_index: int | None = None - exclude = self._manager._twitch.options.exclude - for i, game_name in enumerate(self._list.get(0, "end")): - if game_name in exclude: - self._excluded.add(i) - self._list.itemconfig(i, foreground="gray60") - elif game_name == self._selection: - selected_index = i + if self._selection is not None and self._selection in self._games: + selected_index = self._games[self._selection][0] self._list.selection_clear(0, "end") if selected_index is not None: # reselect the currently selected item @@ -279,13 +390,38 @@ class GameSelector: # the game we've had selected isn't there anymore - clear selection self._selection = None - def _on_select(self, event): + def _exclude_update(self, game_name: str, *, index: int | None = None) -> None: + # called from the settings tab or _update below + if index is None: + try: + index = self._games[game_name][0] + except KeyError: + # user added an excluded game that isn't on the list - just return + return + if ( + game_name in self._settings.exclude + or self._settings.priority_only + and game_name not in self._settings.priority + ): + if index not in self._excluded_indices: + self._excluded_indices.add(index) + self._list.itemconfig(index, foreground="gray60") + elif index in self._excluded_indices: + self._excluded_indices.discard(index) + self._list.itemconfig(index, foreground='') + + def _exclude_sync(self) -> None: + # called when priority_only state changes + for game_name, (i, _) in self._games.items(): + self._exclude_update(game_name, index=i) + + def _on_select(self, event: tk.Event[PaddedListbox]) -> None: current: tuple[int, ...] = self._list.curselection() if not current: # can happen when the user clicks on an empty list return idx: int = current[0] - if idx in self._excluded: + if idx in self._excluded_indices: # user clicked on an excluded game - reselect the previous one if possible self._list.selection_clear(0, "end") if self._selection is not None: @@ -301,19 +437,27 @@ class GameSelector: self._selection = new_selection self._manager._twitch.change_state(State.GAME_SELECT) - def get_selection(self) -> Game | None: - if self._selection is None: - return None - return self._games[self._selection] - - def set_first(self) -> Game | None: - # select and return the first non-excluded game from the list + def get_next(self) -> Game | None: + if self._selection is not None: + return self._games[self._selection][1] + # select and return the first prioritized game from the list self._list.selection_clear(0, "end") + for game_name in self._settings.priority: + if game_name in self._games: + # it's a prioritized game - easy choice + idx, game = self._games[game_name] + self._selection = game_name + self._list.selection_set(idx) + return game + # if priority_only is enabled, we end here + if self._settings.priority_only: + return None + # with priorities out of the way, select and return the first non-excluded game instead for i, game_name in enumerate(self._list.get(0, "end")): - if i not in self._excluded: + if i not in self._excluded_indices: self._selection = game_name self._list.selection_set(i) - return self._games[game_name] + return self._games[game_name][1] return None @@ -343,16 +487,16 @@ class CampaignProgress: self._manager = manager self._vars: _ProgressVars = { "campaign": { - "name": StringVar(), # campaign name - "progress": DoubleVar(), # controls the progress bar - "percentage": StringVar(), # percentage display string - "remaining": StringVar(), # time remaining string + "name": StringVar(master), # campaign name + "progress": DoubleVar(master), # controls the progress bar + "percentage": StringVar(master), # percentage display string + "remaining": StringVar(master), # time remaining string }, "drop": { - "rewards": StringVar(), # drop rewards - "progress": DoubleVar(), # as above - "percentage": StringVar(), # as above - "remaining": StringVar(), # as above + "rewards": StringVar(master), # drop rewards + "progress": DoubleVar(master), # as above + "percentage": StringVar(master), # as above + "remaining": StringVar(master), # as above }, } self._frame = frame = ttk.LabelFrame( @@ -459,8 +603,7 @@ class CampaignProgress: f"{campaign.progress:6.1%} ({campaign.claimed_drops}/{campaign.total_drops})" ) # tray - tray = self._manager.tray - tray.display_progress(drop) + self._manager.tray.display_progress(drop) self.stop_timer() if countdown: # restart our seconds update timer @@ -748,7 +891,7 @@ class TrayIcon: self.icon: pystray.Icon | None = None self._button = ttk.Button(master, command=self.minimize, text="Minimize to Tray") self._button.grid(column=0, row=0, sticky="ne") - if manager._twitch.options.tray: + if manager._twitch.settings.tray: # start hidden in tray self._manager._root.after_idle(self.minimize) @@ -823,24 +966,10 @@ class TrayIcon: class Notebook: def __init__(self, manager: GUIManager, master: ttk.Widget): - self._style = manager._style - # removes "Notebook.focus" from the Tab layout tree to avoid ugly dotted line on selection - # we target "Notebook.padding" since "focus" follows it, then fold the layout children - original = self._style.layout("TNotebook.Tab") - layout_list = original - while True: - element, layout = layout_list[0] - layout_list = layout["children"] - if element == "Notebook.padding": - layout["children"] = layout_list[0][1]["children"] - break - self._style.layout("TNotebook.Tab", original) - # Add padding to the tab names - self._style.theme_settings( - self._style.theme_use(), {"TNotebook.Tab": {"configure": {"padding": [8, 4]}}} - ) self._nb = ttk.Notebook(master) self._nb.grid(column=0, row=0, sticky="nsew") + # prevent entries from being selected after switching tabs + self._nb.bind("<>", lambda event: manager._root.focus_set()) master.rowconfigure(0, weight=1) master.columnconfigure(0, weight=1) @@ -851,6 +980,240 @@ class Notebook: self._nb.add(widget, text=name, **kwargs) +class _SettingsVars(TypedDict): + tray: IntVar + autostart: IntVar + priority_only: IntVar + + +class SettingsPanel: + AUTOSTART_NAME: str = "TwitchDropsMiner" + + def __init__(self, manager: GUIManager, master: ttk.Widget): + self._settings = manager._twitch.settings + self._game_selector = manager.games + self._vars: _SettingsVars = { + "tray": IntVar(master, self._settings.autostart_tray), + "autostart": IntVar(master, self._settings.autostart), + "priority_only": IntVar(master, self._settings.priority_only), + } + master.rowconfigure(0, weight=1) + master.columnconfigure(0, weight=1) + # use a frame to center the content within the tab + center_frame = ttk.Frame(master) + center_frame.grid(column=0, row=0) + # General section + general_frame = ttk.LabelFrame(center_frame, padding=(4, 0, 4, 4), text="General") + general_frame.grid(column=0, row=0, sticky="nsew") + # use another frame to center the options within the section + # NOTE: this can be adjusted or removed later on if more options were to be added + general_frame.rowconfigure(0, weight=1) + general_frame.columnconfigure(0, weight=1) + center_frame2 = ttk.Frame(general_frame) + center_frame2.grid(column=0, row=0) + ttk.Label(center_frame2, text="Autostart: ").grid(column=0, row=0, sticky="e") + ttk.Checkbutton( + center_frame2, variable=self._vars["autostart"], command=self.update_autostart + ).grid(column=1, row=0) + ttk.Label(center_frame2, text="Autostart into tray: ").grid(column=0, row=1, sticky="e") + ttk.Checkbutton( + center_frame2, variable=self._vars["tray"], command=self.update_autostart + ).grid(column=1, row=1) + ttk.Label(center_frame2, text="Priority only: ").grid(column=0, row=2, sticky="e") + ttk.Checkbutton( + center_frame2, variable=self._vars["priority_only"], command=self.priority_only + ).grid(column=1, row=2) + # Priority section + priority_frame = ttk.LabelFrame(center_frame, padding=(4, 0, 4, 4), text="Priority") + priority_frame.grid(column=1, row=0, sticky="nsew") + self._priority_entry = PlaceholderCombobox( + priority_frame, placeholder="Game name", width=30 + ) + self._priority_entry.grid(column=0, row=0, sticky="ew") + priority_frame.columnconfigure(0, weight=1) + ttk.Button( + priority_frame, text="+", command=self.priority_add, width=2, style="Large.TButton" + ).grid(column=1, row=0) + self._priority_list = PaddedListbox( + priority_frame, + height=10, + padding=(1, 0), + activestyle="none", + selectmode="single", + highlightthickness=0, + exportselection=False, + ) + self._priority_list.grid(column=0, row=1, rowspan=3, sticky="nsew") + self._priority_list.insert("end", *self._settings.priority) + ttk.Button( + priority_frame, + width=2, + text="▲", + style="Large.TButton", + command=partial(self.priority_move, True), + ).grid(column=1, row=1, sticky="ns") + priority_frame.rowconfigure(1, weight=1) + ttk.Button( + priority_frame, + width=2, + text="▼", + style="Large.TButton", + command=partial(self.priority_move, False), + ).grid(column=1, row=2, sticky="ns") + priority_frame.rowconfigure(2, weight=1) + ttk.Button( + priority_frame, text="❌", command=self.priority_delete, width=2, style="Large.TButton" + ).grid(column=1, row=3, sticky="ns") + priority_frame.rowconfigure(3, weight=1) + # Exclude section + exclude_frame = ttk.LabelFrame(center_frame, padding=(4, 0, 4, 4), text="Exclude") + exclude_frame.grid(column=2, row=0, sticky="nsew") + self._exclude_entry = PlaceholderCombobox(exclude_frame, placeholder="Game name", width=26) + self._exclude_entry.grid(column=0, row=0, sticky="ew") + ttk.Button( + exclude_frame, text="+", command=self.exclude_add, width=2, style="Large.TButton" + ).grid(column=1, row=0) + self._exclude_list = PaddedListbox( + exclude_frame, + height=10, + padding=(1, 0), + activestyle="none", + selectmode="single", + highlightthickness=0, + exportselection=False, + ) + self._exclude_list.grid(column=0, row=1, columnspan=2, sticky="nsew") + exclude_frame.rowconfigure(1, weight=1) + # insert them alphabetically + self._exclude_list.insert("end", *sorted(self._settings.exclude)) + ttk.Button( + exclude_frame, text="❌", command=self.exclude_delete, width=2, style="Large.TButton" + ).grid(column=0, row=2, columnspan=2, sticky="ew") + self.priority_only() # update exclude section state + + def clear_selection(self) -> None: + self._priority_list.selection_clear(0, "end") + self._exclude_list.selection_clear(0, "end") + + def update_autostart(self) -> None: + enabled = bool(self._vars["autostart"].get()) + tray = bool(self._vars["tray"].get()) + self._settings.autostart = enabled + self._settings.autostart_tray = tray + if enabled: + self_path = str(Path(sys.argv[0]).resolve()) + if tray: + self_path += " --tray" + with RegistryKey("HKCU/Software/Microsoft/Windows/CurrentVersion/Run") as key: + key.set(self.AUTOSTART_NAME, ValueType.REG_SZ, self_path) + else: + with RegistryKey("HKCU/Software/Microsoft/Windows/CurrentVersion/Run") as key: + key.delete(self.AUTOSTART_NAME, silent=True) + + def set_games(self, games: abc.Iterable[Game]) -> None: + games_list = list(map(str, games)) + self._exclude_entry.config(values=games_list) + self._priority_entry.config(values=games_list) + + def priority_add(self) -> None: + game_name: str = self._priority_entry.get() + if not game_name: + # prevent adding empty strings + return + self._priority_entry.clear() + # add it preventing duplicates + try: + existing_idx: int = self._settings.priority.index(game_name) + except ValueError: + # not there, add it + self._priority_list.insert("end", game_name) + self._priority_list.see("end") + self._settings.priority.append(game_name) + self._game_selector._exclude_update(game_name) + else: + # already there, set the selection on it + self._priority_list.selection_set(existing_idx) + self._priority_list.see(existing_idx) + + def _priority_idx(self) -> int | None: + selection: tuple[int, ...] = self._priority_list.curselection() + if not selection: + return None + return selection[0] + + def priority_move(self, up: bool) -> None: + idx: int | None = self._priority_idx() + if idx is None: + return + if up and idx == 0 or not up and idx == self._priority_list.size() - 1: + return + swap_idx: int = idx - 1 if up else idx + 1 + item: str = self._priority_list.get(idx) + self._priority_list.delete(idx) + self._priority_list.insert(swap_idx, item) + # reselect the item and scroll the list if needed + self._priority_list.selection_set(swap_idx) + self._priority_list.see(swap_idx) + p = self._settings.priority + p[idx], p[swap_idx] = p[swap_idx], p[idx] + + def priority_delete(self) -> None: + idx: int | None = self._priority_idx() + if idx is None: + return + # NOTE: get the item before it's removed from the list + item: str = self._priority_list.get(idx) + self._priority_list.delete(idx) + del self._settings.priority[idx] + self._game_selector._exclude_update(item) + + def priority_only(self) -> None: + self._settings.priority_only = bool(self._vars["priority_only"].get()) + self._game_selector._exclude_sync() + + def exclude_add(self) -> None: + game_name: str = self._exclude_entry.get() + if not game_name: + # prevent adding empty strings + return + self._exclude_entry.clear() + e = self._settings.exclude + if game_name not in e: + e.add(game_name) + # insert it alphabetically + for i, item in enumerate(self._exclude_list.get(0, "end")): + if game_name < item: + self._exclude_list.insert(i, game_name) + self._exclude_list.see(i) + break + else: + self._exclude_list.insert("end", game_name) + self._exclude_list.see("end") + self._game_selector._exclude_update(game_name) + else: + # it was already there, select it + for i, item in enumerate(self._exclude_list.get(0, "end")): + if item == game_name: + existing_idx = i + break + else: + # something went horribly wrong and it's not there after all - just return + return + self._exclude_list.selection_set(existing_idx) + self._exclude_list.see(existing_idx) + + def exclude_delete(self) -> None: + selection: tuple[int, ...] = self._exclude_list.curselection() + if not selection: + return None + idx: int = selection[0] + item: str = self._exclude_list.get(idx) + if item in self._settings.exclude: + self._settings.exclude.discard(item) + self._exclude_list.delete(idx) + self._game_selector._exclude_update(item) + + class GUIManager: def __init__(self, twitch: Twitch): self._twitch: Twitch = twitch @@ -862,14 +1225,36 @@ class GUIManager: root.resizable(False, True) root.iconbitmap(resource_path("pickaxe.ico")) # window icon root.title(WINDOW_TITLE) # window title - root.protocol("WM_DELETE_WINDOW", self.close) - root.bind_all("", self.unfocus) + root.protocol("WM_DELETE_WINDOW", self.close) # hook the X window closing button + root.bind_all("", self.unfocus) # pressing ESC unfocuses selection + + # style adjustements self._style = ttk.Style(root) + # fix treeview's background color from tags not working (also see '_fixed_map') self._style.map( "Treeview", foreground=self._fixed_map("foreground"), background=self._fixed_map("background"), ) + # remove Notebook.focus from the Notebook.Tab layout tree to avoid an ugly dotted line + # on tab selection. We fold the Notebook.focus children into Notebook.padding children. + original = self._style.layout("TNotebook.Tab") + sublayout = original[0][1]["children"][0][1] + sublayout["children"] = sublayout["children"][0][1]["children"] + self._style.layout("TNotebook.Tab", original) + # add padding to the tab names + self._style.configure("TNotebook.Tab", padding=[8, 4]) + # remove Checkbutton.focus dotted line from checkbuttons + self._style.configure("TCheckbutton", padding=0) + original = self._style.layout("TCheckbutton") + sublayout = original[0][1]["children"] + sublayout[1] = sublayout[1][1]["children"][0] + del original[0][1]["children"][1] + self._style.layout("TCheckbutton", original) + # adds a style with a larger font for buttons + self._style.configure("Large.TButton", font=LARGE_FONT) + # end of style changes + root_frame = ttk.Frame(root, padding=8) root_frame.grid(column=0, row=0, sticky="nsew") root.rowconfigure(0, weight=1) @@ -889,15 +1274,8 @@ class GUIManager: self.channels = ChannelList(self, main_frame) # Settings tab settings_frame = ttk.Frame(root_frame, padding=8) - settings_frame.rowconfigure(0, weight=1) - settings_frame.columnconfigure(0, weight=1) + self.settings = SettingsPanel(self, settings_frame) self.tabs.add_tab(settings_frame, name="Settings") - ttk.Label( - settings_frame, - font=(..., 20), - anchor="center", - text="Work In Progress", - ).grid(column=0, row=0, sticky="nsew") # clamp minimum window height (update first, so that geometry calculates the size) root.update_idletasks() root.minsize(width=0, height=root.winfo_reqheight()) @@ -906,7 +1284,7 @@ class GUIManager: self._handler.setFormatter(FORMATTER) logging.getLogger("TwitchDrops").addHandler(self._handler) # show the window when ready - if not self._twitch.options.tray: + if not self._twitch.settings.tray: self._root.deiconify() # https://stackoverflow.com/questions/56329342/tkinter-treeview-background-tag-not-working @@ -980,6 +1358,11 @@ class GUIManager: # support pressing ESC to unfocus self._root.focus_set() self.channels.clear_selection() + self.settings.clear_selection() + + def set_games(self, games: abc.Iterable[Game]) -> None: + self.games.set_games(games) + self.settings.set_games(games) def prevent_close(self): self._closed.clear() @@ -995,7 +1378,6 @@ class GUIManager: if __name__ == "__main__": # Everything below is for debug purposes only - from functools import partial from types import SimpleNamespace class StrNamespace(SimpleNamespace): @@ -1076,7 +1458,14 @@ if __name__ == "__main__": async def main(exit_event: asyncio.Event): # Initialize GUI debug mock = SimpleNamespace( - options=SimpleNamespace(game=None, tray=False, exclude={"Lit Game"}), channels={} + settings=SimpleNamespace( + tray=False, + priority=[], + autostart=False, + priority_only=False, + autostart_tray=False, + exclude={"Lit Game"}, + ) ) mock.change_state = lambda state: mock.gui.print(f"State change: {state.value}") mock.state_change = lambda state: partial(mock.change_state, state) @@ -1088,8 +1477,8 @@ if __name__ == "__main__": gui._poll_task.add_done_callback(lambda t: exit_event.set()) # Login form gui.login.update("Login required", None) - # Game selector - gui.games.set_games([ + # Game selector and settings panel games + gui.set_games([ create_game(420690, "Lit Game"), create_game(123456, "Best Game"), create_game(654321, "My Game Very Long Name"), diff --git a/main.py b/main.py index f3180b7..33c03f6 100644 --- a/main.py +++ b/main.py @@ -9,78 +9,42 @@ import logging import argparse import traceback import tkinter as tk -from copy import copy from pathlib import Path from tkinter import messagebox -from typing import Generic, NoReturn +from typing import IO, NoReturn -from utils import _T from twitch import Twitch +from settings import Settings from version import __version__ from exceptions import CaptchaRequired from constants import FORMATTER, LOG_PATH, WINDOW_TITLE -# we need a dummy invisible window for the parser -root = tk.Tk() -root.overrideredirect(True) -root.withdraw() -root.update() - - class Parser(argparse.ArgumentParser): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self._message: io.StringIO = io.StringIO() - def _print_message(self, message: str, *args, **kwargs) -> None: + def _print_message(self, message: str, file: IO[str] | None = None) -> None: self._message.write(message) # print(message, file=self._message) - def exit(self, *args, **kwargs) -> NoReturn: + def exit(self, status: int = 0, message: str | None = None) -> NoReturn: try: - super().exit(*args, **kwargs) + super().exit(status, message) finally: messagebox.showerror("Argument Parser Error", self._message.getvalue()) -class SetCollectAction(argparse.Action, Generic[_T]): - def __init__( - self, - option_strings, - dest, - *, - const: _T | None = None, - nargs: int | str | None = None, - default: set[_T] | None = None, - **kwargs, - ) -> None: - if nargs is not None and nargs in ('?', '*') or isinstance(nargs, int) and nargs <= 0: - raise ValueError("'nargs' has to be '+' or an integer greater than zero") - if default is None: - default = set() - elif not isinstance(default, set): - raise TypeError("'default' has to be of 'set' type") - super().__init__(option_strings, dest, nargs, const, default, **kwargs) - - def __call__(self, parser, namespace, values, option_string=None): - items: set[_T] = getattr(namespace, self.dest, self.default) - items = copy(items) - for value in values: - items.add(value) - setattr(namespace, self.dest, items) - - class ParsedArgs(argparse.Namespace): _verbose: int _debug_ws: bool _debug_gql: bool log: bool tray: bool - game: str | None - exclude: set[str] no_run_check: bool + # TODO: replace int with union of literal values once typeshed updates @property def logging_level(self) -> int: return { @@ -113,8 +77,12 @@ class ParsedArgs(argparse.Namespace): # handle input parameters -# NOTE: due to using pythonw to run the main script, CLI help via '-h' and generally any -# console output is not available. The input arguments still work though. +# NOTE: parser output is shown via message box +# we also need a dummy invisible window for the parser +root = tk.Tk() +root.overrideredirect(True) +root.withdraw() +root.update() parser = Parser( Path(sys.argv[0]).name, description="A program that allows you to mine timed drops on Twitch.", @@ -123,43 +91,45 @@ parser.add_argument("--version", action="version", version=f"v{__version__}") parser.add_argument("-v", dest="_verbose", action="count", default=0) parser.add_argument("--tray", action="store_true") parser.add_argument("--log", action="store_true") -parser.add_argument("-g", "--game", default=None) -parser.add_argument("--exclude", action=SetCollectAction, nargs='+', metavar="GAME") # undocumented debug args parser.add_argument( "--no-run-check", dest="no_run_check", action="store_true", help=argparse.SUPPRESS ) parser.add_argument("--debug-ws", dest="_debug_ws", action="store_true", help=argparse.SUPPRESS) parser.add_argument("--debug-gql", dest="_debug_gql", action="store_true", help=argparse.SUPPRESS) -options: ParsedArgs = parser.parse_args(namespace=ParsedArgs()) +args = parser.parse_args(namespace=ParsedArgs()) # dummy window isn't needed anymore root.destroy() + +# load settings +settings = Settings(args) # check if we're not already running try: exists = ctypes.windll.user32.FindWindowW(None, WINDOW_TITLE) except AttributeError: # we're not on Windows - continue exists = False -if exists and not options.no_run_check: +if exists and not settings.no_run_check: # already running - exit sys.exit() + # handle logging stuff -if options.logging_level > logging.DEBUG: +if settings.logging_level > logging.DEBUG: # redirect the root logger into a NullHandler, effectively ignoring all logging calls # that aren't ours. This always runs, unless the main logging level is DEBUG or lower. logging.getLogger().addHandler(logging.NullHandler()) logger = logging.getLogger("TwitchDrops") -logger.setLevel(options.logging_level) -if options.log: +logger.setLevel(settings.logging_level) +if settings.log: handler = logging.FileHandler(LOG_PATH) handler.setFormatter(FORMATTER) logger.addHandler(handler) -logging.getLogger("TwitchDrops.gql").setLevel(options.debug_gql) -logging.getLogger("TwitchDrops.websocket").setLevel(options.debug_ws) +logging.getLogger("TwitchDrops.gql").setLevel(settings.debug_gql) +logging.getLogger("TwitchDrops.websocket").setLevel(settings.debug_ws) # client run loop = asyncio.get_event_loop() -client = Twitch(options) +client = Twitch(settings) signal.signal(signal.SIGINT, lambda *_: client.close()) signal.signal(signal.SIGTERM, lambda *_: client.close()) try: diff --git a/manual.txt b/manual.txt index 9859cb8..810fb31 100644 --- a/manual.txt +++ b/manual.txt @@ -4,10 +4,6 @@ Available command line arguments: -• -g ; --game - On application start, begin mining immedately for the `` game. Case sensitive. -• --exclude [ ...] - Exclude the `` game(s) from being mined. Case sensitive. • --tray Start application as minimised into tray. • -v @@ -19,5 +15,4 @@ Available command line arguments: • --version Show application version information -Note: Arguments containing spaces (such as game names) need to be encased in double quotes. -For example: "Game Name". +Note: Additional settings are available within the application GUI. diff --git a/registry.py b/registry.py new file mode 100644 index 0000000..b717b07 --- /dev/null +++ b/registry.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import winreg as reg +from typing import Any +from enum import Enum, Flag +from collections.abc import Generator + + +class RegistryError(Exception): + pass + + +class ValueNotExists(RegistryError): + pass + + +class Access(Flag): + KEY_READ = reg.KEY_READ + KEY_WRITE = reg.KEY_WRITE + KEY_NOTIFY = reg.KEY_NOTIFY + KEY_EXECUTE = reg.KEY_EXECUTE + KEY_SET_VALUE = reg.KEY_SET_VALUE + KEY_ALL_ACCESS = reg.KEY_ALL_ACCESS + KEY_CREATE_LINK = reg.KEY_CREATE_LINK + KEY_QUERY_VALUE = reg.KEY_QUERY_VALUE + KEY_CREATE_SUB_KEY = reg.KEY_CREATE_SUB_KEY + KEY_ENUMERATE_SUB_KEYS = reg.KEY_ENUMERATE_SUB_KEYS + + +class MainKey(Enum): + HKU = reg.HKEY_USERS + HKCR = reg.HKEY_CLASSES_ROOT + HKCU = reg.HKEY_CURRENT_USER + HKLM = reg.HKEY_LOCAL_MACHINE + HKEY_USERS = reg.HKEY_USERS + HKEY_CLASSES_ROOT = reg.HKEY_CLASSES_ROOT + HKEY_CURRENT_USER = reg.HKEY_CURRENT_USER + HKEY_LOCAL_MACHINE = reg.HKEY_LOCAL_MACHINE + HKEY_CURRENT_CONFIG = reg.HKEY_CURRENT_CONFIG + HKEY_PERFORMANCE_DATA = reg.HKEY_PERFORMANCE_DATA + + +class ValueType(Enum): + REG_SZ = reg.REG_SZ + REG_NONE = reg.REG_NONE + REG_LINK = reg.REG_LINK + REG_DWORD = reg.REG_DWORD + REG_QWORD = reg.REG_QWORD + REG_BINARY = reg.REG_BINARY + REG_MULTI_SZ = reg.REG_MULTI_SZ + REG_EXPAND_SZ = reg.REG_EXPAND_SZ + REG_RESOURCE_LIST = reg.REG_RESOURCE_LIST + REG_DWORD_BIG_ENDIAN = reg.REG_DWORD_BIG_ENDIAN + REG_DWORD_LITTLE_ENDIAN = reg.REG_DWORD_LITTLE_ENDIAN + REG_QWORD_LITTLE_ENDIAN = reg.REG_QWORD_LITTLE_ENDIAN + REG_FULL_RESOURCE_DESCRIPTOR = reg.REG_FULL_RESOURCE_DESCRIPTOR + REG_RESOURCE_REQUIREMENTS_LIST = reg.REG_RESOURCE_REQUIREMENTS_LIST + + +class RegistryKey: + def __init__(self, path: str): + main_key, _, path = path.replace('/', '\\').partition('\\') + self.main_key = MainKey[main_key] + self.path = path + self._handle = reg.OpenKey( + self.main_key.value, path, access=(Access.KEY_QUERY_VALUE | Access.KEY_SET_VALUE).value + ) + + def __enter__(self) -> RegistryKey: + return self + + def __exit__(self, exc_type, exc, tb): + self._handle.Close() + + def get(self, name: str) -> tuple[ValueType, Any]: + try: + value, value_type = reg.QueryValueEx(self._handle, name) + except FileNotFoundError: + # TODO: consider returning None for missing values + raise ValueNotExists(name) + return (ValueType(value_type), value) + + def set(self, name: str, value_type: ValueType, value: Any) -> bool: + reg.SetValueEx(self._handle, name, 0, value_type.value, value) + return True # TODO: return False if the set operation fails + + def delete(self, name: str, *, silent: bool = False) -> bool: + try: + reg.DeleteValue(self._handle, name) + except FileNotFoundError: + if not silent: + raise ValueNotExists(name) + return False + return True + + def values(self) -> Generator[tuple[str, ValueType, Any], None, None]: + len_keys, len_values, last_modified = reg.QueryInfoKey(self._handle) + for i in range(len_values): + try: + name, value, value_type = reg.EnumValue(self._handle, i) + yield name, ValueType(value_type), value + except OSError: + return + + +if __name__ == "__main__": + with RegistryKey("HKCU/Software/Microsoft/Windows/CurrentVersion/Run") as key: + # key.get("test") + # key.set("test", ValueType.REG_SZ, "test\\path") + for name, value_type, value in key.values(): + print(name, value_type, value) diff --git a/settings.py b/settings.py new file mode 100644 index 0000000..148b2ac --- /dev/null +++ b/settings.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +import json +from enum import Enum +from pathlib import Path +from typing import Any, TypedDict, TYPE_CHECKING + +from constants import JsonType, SETTINGS_PATH + +if TYPE_CHECKING: + from main import ParsedArgs + + +PATH = Path(SETTINGS_PATH) + + +class SettingsFile(TypedDict): + autostart: bool + exclude: set[str] + priority: list[str] + priority_only: bool + autostart_tray: bool + + +serialize_env: dict[str, type] = { + "set": set, +} +default_settings: SettingsFile = { + "priority": [], + "exclude": set(), + "autostart": False, + "priority_only": True, + "autostart_tray": False, +} + + +def serialize(obj: Any) -> Any: + if isinstance(obj, (set, Enum)): + if isinstance(obj, set): + d = list(obj) + elif isinstance(obj, Enum): + d = obj.value + return { + "__type": type(obj).__name__, + "data": d, + } + raise TypeError(obj) + + +def deserialize(obj: JsonType) -> Any: + if "__type" in obj: + t = eval(obj["__type"], None, serialize_env) + return t(obj["data"]) + return obj + + +class Settings: + # from args + log: bool + tray: bool + no_run_check: bool + # args properties + debug_ws: int + debug_gql: int + logging_level: int + # from settings file + autostart: bool + exclude: set[str] + priority: list[str] + priority_only: bool + autostart_tray: bool + + def __init__(self, args: ParsedArgs): + self._settings: SettingsFile = default_settings.copy() + if PATH.exists(): + with open(PATH, 'r') as file: + self._settings.update(json.load(file, object_hook=deserialize)) + self._args: ParsedArgs = args + + # default logic of reading settings is to check args first, then the settings file + def __getattr__(self, name: str, /) -> Any: + if hasattr(self._args, name): + return getattr(self._args, name) + elif name in self._settings: + return self._settings[name] # type: ignore[misc] + return getattr(super(), name) + + def __setattr__(self, name: str, value: Any, /) -> None: + if name in ("_settings", "_args"): + # passthrough + return super().__setattr__(name, value) + elif name in self._settings: + self._settings[name] = value # type: ignore[misc] + return + raise TypeError(f"{name} is missing a custom setter") + + def __delattr__(self, name: str, /) -> None: + raise RuntimeError("settings can't be deleted") + + def save(self) -> None: + with open(PATH, 'w') as file: + json.dump(self._settings, file, default=serialize, sort_keys=True, indent=4) diff --git a/twitch.py b/twitch.py index 643c0ea..1a30b5f 100644 --- a/twitch.py +++ b/twitch.py @@ -41,7 +41,7 @@ if TYPE_CHECKING: from utils import Game from gui import LoginForm - from main import ParsedArgs + from settings import Settings from inventory import TimedDrop from constants import JsonType, GQLOperation @@ -57,8 +57,8 @@ def viewers_key(channel: Channel) -> int: class Twitch: - def __init__(self, options: ParsedArgs): - self.options = options + def __init__(self, settings: Settings): + self.settings: Settings = settings # State management self._state: State = State.IDLE self._state_change = asyncio.Event() @@ -99,12 +99,14 @@ class Twitch: if self._watching_task is not None: self._watching_task.cancel() self._watching_task = None - # close session and stop websocket + # close session, save cookies and stop websocket if self._session is not None: self._session.cookie_jar.save(COOKIES_PATH) # type: ignore await self._session.close() self._session = None await self.websocket.stop() + # save settings + self.settings.save() # wait at least one full second + whatever it takes to complete the closing # this allows aiohttp to safely close the session await asyncio.sleep(start_time + 0.5 - time()) @@ -175,7 +177,6 @@ class Twitch: WebsocketTopic("User", "Drops", self._user_id, self.process_drops), WebsocketTopic("User", "CommunityPoints", self._user_id, self.process_points), ]) - first_select: bool = True full_cleanup: bool = False channels: Final[OrderedDict[int, Channel]] = self.channels self.change_state(State.INVENTORY_FETCH) @@ -211,21 +212,14 @@ class Twitch: return # only start the websocket after we confirm there are drops to mine await self.websocket.start() - self.gui.games.set_games(games) + self.gui.set_games(games) self.change_state(State.GAME_SELECT) elif self._state is State.GAME_SELECT: - new_game: Game | None = self.gui.games.get_selection() - if new_game is None: - if first_select: - # on first select, let the user make the choice - first_select = False - else: - new_game = self.gui.games.set_first() - if new_game is not None: - # restart the watch loop immediately on new game selected - self.restart_watching() - # signal channel cleanup that we're removing everything + new_game: Game | None = self.gui.games.get_next() if new_game != self.game: + if new_game is not None: + # restart the watch loop immediately on new game selected + self.restart_watching() self.game = new_game full_cleanup = True self.change_state(State.CHANNELS_CLEANUP)