Implement settings and saving them to the settings file

This commit is contained in:
DevilXD
2022-03-16 19:26:39 +01:00
parent 82b999995f
commit 5a3a540b45
7 changed files with 741 additions and 179 deletions

3
.gitignore vendored
View File

@@ -11,4 +11,5 @@ __pycache__
/*.tmp
/Twitch Drops Miner
# Dev files
cookies*
cookies.jar
settings.json

591
gui.py
View File

@@ -1,12 +1,15 @@
from __future__ import annotations
import sys
import asyncio
import logging
import tkinter as tk
from pathlib import Path
from math import log10, ceil
from functools import partial
from tkinter.font import Font
from tkinter import Tk, ttk, StringVar, DoubleVar
from collections import abc, namedtuple, OrderedDict
from tkinter import Tk, ttk, StringVar, DoubleVar, IntVar
from typing import Any, TypedDict, NoReturn, TYPE_CHECKING
try:
@@ -15,6 +18,7 @@ except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install pystray' first") from exc
from utils import resource_path
from registry import RegistryKey, ValueType
from constants import FORMATTER, WS_TOPICS_LIMIT, MAX_WEBSOCKETS, WINDOW_TITLE, State
if TYPE_CHECKING:
@@ -25,6 +29,7 @@ if TYPE_CHECKING:
digits = ceil(log10(WS_TOPICS_LIMIT))
WS_FONT = ("Courier New", 10)
LARGE_FONT = (..., 12)
class _ICOImage:
@@ -49,10 +54,10 @@ class PlaceholderEntry(ttk.Entry):
def __init__(
self,
master: ttk.Widget,
*args,
*args: Any,
placeholder: str,
placeholdercolor: str = "grey60",
**kwargs,
**kwargs: Any,
):
super().__init__(master, *args, **kwargs)
self._show: str = kwargs.get("show", '')
@@ -61,19 +66,24 @@ class PlaceholderEntry(ttk.Entry):
self._ph_text: str = placeholder
self.bind("<FocusIn>", self._focus_in)
self.bind("<FocusOut>", self._focus_out)
self._ph: bool = False
self._focus_out(None)
if isinstance(self, ttk.Combobox):
# only bind this for comboboxes
self.bind("<<ComboboxSelected>>", self._combobox_select)
# start with a placeholder
self._ph: bool = True
super().config(foreground=self._ph_color, show='')
super().insert(0, self._ph_text)
def _focus_in(self, event):
def _focus_in(self, event: tk.Event[PlaceholderEntry]) -> None:
"""
On focus in, if we've had a placeholder, clear the box and set normal text colour and show.
"""
if self._ph:
self._ph = False
self.delete(0, "end")
self.config(foreground=self._text_color, show=self._show)
super().config(foreground=self._text_color, show=self._show)
def _focus_out(self, event):
def _focus_out(self, event: tk.Event[PlaceholderEntry]) -> None:
"""
On focus out, if we're empty, insert a placeholder,
set placeholder text color and make sure it's shown.
@@ -81,40 +91,142 @@ class PlaceholderEntry(ttk.Entry):
"""
if not super().get():
self._ph = True
self.config(foreground=self._ph_color, show='')
self.insert(0, self._ph_text)
super().config(foreground=self._ph_color, show='')
super().insert(0, self._ph_text)
def _store_option(self, options: dict[str, Any], name: str, attr: str):
def _combobox_select(self, event: tk.Event[PlaceholderEntry]):
# combobox clears and inserts the selected value internally, bypassing the insert method
# disable the placeholder flag and set the color here, so _focus_in doesn't clear the entry
self._ph = False
super().config(foreground=self._text_color, show=self._show)
def _store_option(
self, options: dict[str, object], name: str, attr: str, *, remove: bool = False
) -> None:
if name in options:
setattr(self, attr, options[name])
if remove:
value = options.pop(name)
else:
value = options[name]
setattr(self, attr, value)
def configure(self, *args, **kwargs):
if args:
options = args[0]
def configure(self, *args: Any, **kwargs: Any) -> Any:
options = {}
if args and args[0] is not None:
options.update(args[0])
if kwargs:
options = kwargs
options.update(kwargs)
self._store_option(options, "show", "_show")
self._store_option(options, "placeholder", "_ph_text")
self._store_option(options, "placeholder", "_ph_text", remove=True)
self._store_option(options, "foreground", "_text_color")
self._store_option(options, "placeholdercolor", "_ph_color")
super().configure(*args, *kwargs)
self._store_option(options, "placeholdercolor", "_ph_color", remove=True)
return super().configure(*args, **kwargs)
def get(self):
def config(self, *args: Any, **kwargs: Any) -> Any:
# because 'config = configure' makes mypy complain
self.configure(*args, **kwargs)
def get(self) -> str:
if self._ph:
return ''
return super().get()
def clear(self):
def insert(self, index: tk._EntryIndex, content: str) -> None:
# when inserting into the entry externally, disable the placeholder flag
if self._ph:
self._ph = False
self.delete(0, "end")
super().config(foreground=self._text_color, show=self._show)
return super().insert(index, content)
def clear(self) -> None:
self.delete(0, "end")
self._ph = True
self.config(foreground=self._ph_color, show='')
self.insert(0, self._ph_text)
super().config(foreground=self._ph_color, show='')
super().insert(0, self._ph_text)
def enable(self):
super().configure(state="normal")
def disable(self):
super().configure(state="disabled")
class PlaceholderCombobox(PlaceholderEntry, ttk.Combobox):
pass
class PaddedListbox(tk.Listbox):
def __init__(self, master: ttk.Widget, *args, padding: tk._Padding = (0, 0, 0, 0), **kwargs):
# we place the listbox inside a frame with the same background
# this means we need to forward the 'grid' method to the frame, not the listbox
self._frame = tk.Frame(master)
self._frame.rowconfigure(0, weight=1)
self._frame.columnconfigure(0, weight=1)
super().__init__(self._frame)
# mimic default listbox style with sunken relief and borderwidth of 1
if "relief" not in kwargs:
kwargs["relief"] = "sunken"
if "borderwidth" not in kwargs:
kwargs["borderwidth"] = 1
self.configure(*args, padding=padding, **kwargs)
def grid(self, *args, **kwargs):
return self._frame.grid(*args, **kwargs)
def grid_remove(self) -> None:
return self._frame.grid_remove()
def grid_info(self) -> tk._GridInfo:
return self._frame.grid_info()
def grid_forget(self) -> None:
return self._frame.grid_forget()
def configure(self, *args: Any, **kwargs: Any) -> Any:
options = {}
if args and args[0] is not None:
options.update(args[0])
if kwargs:
options.update(kwargs)
# NOTE on processed options:
# • relief is applied to the frame only
# • background is copied, so that both listbox and frame change color
# • borderwidth is applied to the frame only
# bg is folded into background for easier processing
if "bg" in options:
options["background"] = options.pop("bg")
frame_options = {}
if "relief" in options:
frame_options["relief"] = options.pop("relief")
if "background" in options:
frame_options["background"] = options["background"] # copy
if "borderwidth" in options:
frame_options["borderwidth"] = options.pop("borderwidth")
self._frame.configure(frame_options)
# update padding
if "padding" in options:
padding: tk._Padding = options.pop("padding")
padx1: tk._ScreenUnits
padx2: tk._ScreenUnits
pady1: tk._ScreenUnits
pady2: tk._ScreenUnits
if not isinstance(padding, tuple) or len(padding) == 1:
if isinstance(padding, tuple):
padding = padding[0]
padx1 = padx2 = pady1 = pady2 = padding
elif len(padding) == 2:
padx1 = padx2 = padding[0]
pady1 = pady2 = padding[1] # type: ignore
elif len(padding) == 3:
padx1, padx2 = padding[0:2] # type: ignore
pady1 = pady2 = padding[2] # type: ignore
else:
padx1, padx2, pady1, pady2 = padding # type: ignore
super().grid(column=0, row=0, padx=(padx1, padx2), pady=(pady1, pady2), sticky="nsew")
else:
super().grid(column=0, row=0, sticky="nsew")
# listbox uses flat relief to blend in with the inside of the frame
options["relief"] = "flat"
return super().configure(options)
def config(self, *args: Any, **kwargs: Any) -> Any:
# because 'config = configure' makes mypy complain
self.configure(*args, **kwargs)
class _WSEntry(TypedDict):
@@ -124,8 +236,8 @@ class _WSEntry(TypedDict):
class WebsocketStatus:
def __init__(self, manager: GUIManager, master: ttk.Widget):
self._status_var = StringVar()
self._topics_var = StringVar()
self._status_var = StringVar(master)
self._topics_var = StringVar(master)
frame = ttk.LabelFrame(master, text="Websocket Status", padding=(4, 0, 4, 4))
frame.grid(column=0, row=0, sticky="nsew", padx=2)
ttk.Label(
@@ -188,7 +300,7 @@ LoginData = namedtuple("LoginData", ["username", "password", "token"])
class LoginForm:
def __init__(self, manager: GUIManager, master: ttk.Widget):
self._manager = manager
self._var = StringVar()
self._var = StringVar(master)
frame = ttk.LabelFrame(master, text="Login Form", padding=(4, 0, 4, 4))
frame.grid(column=1, row=0, sticky="nsew", padx=2)
frame.columnconfigure(0, weight=2)
@@ -237,40 +349,39 @@ class LoginForm:
class GameSelector:
def __init__(self, manager: GUIManager, master: ttk.Widget):
self._manager = manager
self._var = StringVar()
self._settings = manager._twitch.settings
self._var = StringVar(master)
frame = ttk.LabelFrame(master, text="Game Selector", padding=(4, 0, 4, 4))
frame.grid(column=1, row=1, sticky="nsew", padx=2)
frame.rowconfigure(0, weight=1)
frame.columnconfigure(0, weight=1)
self._list = tk.Listbox(
self._list = PaddedListbox(
frame,
height=5,
selectmode="single",
padding=(1, 0),
activestyle="none",
exportselection=False,
selectmode="single",
highlightthickness=0,
exportselection=False,
)
self._list.pack(fill="both", expand=True)
self._selection: str | None = self._manager._twitch.options.game
self._games: OrderedDict[str, Game] = OrderedDict()
self._excluded: set[int] = set()
self._list.grid(column=0, row=0, sticky="nsew")
self._selection: str | None = None
self._games: OrderedDict[str, tuple[int, Game]] = OrderedDict()
self._excluded_indices: set[int] = set()
self._list.bind("<<ListboxSelect>>", self._on_select)
def set_games(self, games: abc.Iterable[Game]):
self._games.clear()
self._games.update((str(g), g) for g in games)
self._games.update((str(game), (i, game)) for i, game in enumerate(games))
self._list.delete(0, "end")
self._list.insert("end", *self._games.keys())
self._list.config(width=0) # autoadjust listbox width
# process excluded games and relink the selection
self._excluded.clear()
# gray-out excluded games and relink the selection
self._excluded_indices.clear()
self._exclude_sync()
selected_index: int | None = None
exclude = self._manager._twitch.options.exclude
for i, game_name in enumerate(self._list.get(0, "end")):
if game_name in exclude:
self._excluded.add(i)
self._list.itemconfig(i, foreground="gray60")
elif game_name == self._selection:
selected_index = i
if self._selection is not None and self._selection in self._games:
selected_index = self._games[self._selection][0]
self._list.selection_clear(0, "end")
if selected_index is not None:
# reselect the currently selected item
@@ -279,13 +390,38 @@ class GameSelector:
# the game we've had selected isn't there anymore - clear selection
self._selection = None
def _on_select(self, event):
def _exclude_update(self, game_name: str, *, index: int | None = None) -> None:
# called from the settings tab or _update below
if index is None:
try:
index = self._games[game_name][0]
except KeyError:
# user added an excluded game that isn't on the list - just return
return
if (
game_name in self._settings.exclude
or self._settings.priority_only
and game_name not in self._settings.priority
):
if index not in self._excluded_indices:
self._excluded_indices.add(index)
self._list.itemconfig(index, foreground="gray60")
elif index in self._excluded_indices:
self._excluded_indices.discard(index)
self._list.itemconfig(index, foreground='')
def _exclude_sync(self) -> None:
# called when priority_only state changes
for game_name, (i, _) in self._games.items():
self._exclude_update(game_name, index=i)
def _on_select(self, event: tk.Event[PaddedListbox]) -> None:
current: tuple[int, ...] = self._list.curselection()
if not current:
# can happen when the user clicks on an empty list
return
idx: int = current[0]
if idx in self._excluded:
if idx in self._excluded_indices:
# user clicked on an excluded game - reselect the previous one if possible
self._list.selection_clear(0, "end")
if self._selection is not None:
@@ -301,19 +437,27 @@ class GameSelector:
self._selection = new_selection
self._manager._twitch.change_state(State.GAME_SELECT)
def get_selection(self) -> Game | None:
if self._selection is None:
return None
return self._games[self._selection]
def set_first(self) -> Game | None:
# select and return the first non-excluded game from the list
def get_next(self) -> Game | None:
if self._selection is not None:
return self._games[self._selection][1]
# select and return the first prioritized game from the list
self._list.selection_clear(0, "end")
for game_name in self._settings.priority:
if game_name in self._games:
# it's a prioritized game - easy choice
idx, game = self._games[game_name]
self._selection = game_name
self._list.selection_set(idx)
return game
# if priority_only is enabled, we end here
if self._settings.priority_only:
return None
# with priorities out of the way, select and return the first non-excluded game instead
for i, game_name in enumerate(self._list.get(0, "end")):
if i not in self._excluded:
if i not in self._excluded_indices:
self._selection = game_name
self._list.selection_set(i)
return self._games[game_name]
return self._games[game_name][1]
return None
@@ -343,16 +487,16 @@ class CampaignProgress:
self._manager = manager
self._vars: _ProgressVars = {
"campaign": {
"name": StringVar(), # campaign name
"progress": DoubleVar(), # controls the progress bar
"percentage": StringVar(), # percentage display string
"remaining": StringVar(), # time remaining string
"name": StringVar(master), # campaign name
"progress": DoubleVar(master), # controls the progress bar
"percentage": StringVar(master), # percentage display string
"remaining": StringVar(master), # time remaining string
},
"drop": {
"rewards": StringVar(), # drop rewards
"progress": DoubleVar(), # as above
"percentage": StringVar(), # as above
"remaining": StringVar(), # as above
"rewards": StringVar(master), # drop rewards
"progress": DoubleVar(master), # as above
"percentage": StringVar(master), # as above
"remaining": StringVar(master), # as above
},
}
self._frame = frame = ttk.LabelFrame(
@@ -459,8 +603,7 @@ class CampaignProgress:
f"{campaign.progress:6.1%} ({campaign.claimed_drops}/{campaign.total_drops})"
)
# tray
tray = self._manager.tray
tray.display_progress(drop)
self._manager.tray.display_progress(drop)
self.stop_timer()
if countdown:
# restart our seconds update timer
@@ -748,7 +891,7 @@ class TrayIcon:
self.icon: pystray.Icon | None = None
self._button = ttk.Button(master, command=self.minimize, text="Minimize to Tray")
self._button.grid(column=0, row=0, sticky="ne")
if manager._twitch.options.tray:
if manager._twitch.settings.tray:
# start hidden in tray
self._manager._root.after_idle(self.minimize)
@@ -823,24 +966,10 @@ class TrayIcon:
class Notebook:
def __init__(self, manager: GUIManager, master: ttk.Widget):
self._style = manager._style
# removes "Notebook.focus" from the Tab layout tree to avoid ugly dotted line on selection
# we target "Notebook.padding" since "focus" follows it, then fold the layout children
original = self._style.layout("TNotebook.Tab")
layout_list = original
while True:
element, layout = layout_list[0]
layout_list = layout["children"]
if element == "Notebook.padding":
layout["children"] = layout_list[0][1]["children"]
break
self._style.layout("TNotebook.Tab", original)
# Add padding to the tab names
self._style.theme_settings(
self._style.theme_use(), {"TNotebook.Tab": {"configure": {"padding": [8, 4]}}}
)
self._nb = ttk.Notebook(master)
self._nb.grid(column=0, row=0, sticky="nsew")
# prevent entries from being selected after switching tabs
self._nb.bind("<<NotebookTabChanged>>", lambda event: manager._root.focus_set())
master.rowconfigure(0, weight=1)
master.columnconfigure(0, weight=1)
@@ -851,6 +980,240 @@ class Notebook:
self._nb.add(widget, text=name, **kwargs)
class _SettingsVars(TypedDict):
tray: IntVar
autostart: IntVar
priority_only: IntVar
class SettingsPanel:
AUTOSTART_NAME: str = "TwitchDropsMiner"
def __init__(self, manager: GUIManager, master: ttk.Widget):
self._settings = manager._twitch.settings
self._game_selector = manager.games
self._vars: _SettingsVars = {
"tray": IntVar(master, self._settings.autostart_tray),
"autostart": IntVar(master, self._settings.autostart),
"priority_only": IntVar(master, self._settings.priority_only),
}
master.rowconfigure(0, weight=1)
master.columnconfigure(0, weight=1)
# use a frame to center the content within the tab
center_frame = ttk.Frame(master)
center_frame.grid(column=0, row=0)
# General section
general_frame = ttk.LabelFrame(center_frame, padding=(4, 0, 4, 4), text="General")
general_frame.grid(column=0, row=0, sticky="nsew")
# use another frame to center the options within the section
# NOTE: this can be adjusted or removed later on if more options were to be added
general_frame.rowconfigure(0, weight=1)
general_frame.columnconfigure(0, weight=1)
center_frame2 = ttk.Frame(general_frame)
center_frame2.grid(column=0, row=0)
ttk.Label(center_frame2, text="Autostart: ").grid(column=0, row=0, sticky="e")
ttk.Checkbutton(
center_frame2, variable=self._vars["autostart"], command=self.update_autostart
).grid(column=1, row=0)
ttk.Label(center_frame2, text="Autostart into tray: ").grid(column=0, row=1, sticky="e")
ttk.Checkbutton(
center_frame2, variable=self._vars["tray"], command=self.update_autostart
).grid(column=1, row=1)
ttk.Label(center_frame2, text="Priority only: ").grid(column=0, row=2, sticky="e")
ttk.Checkbutton(
center_frame2, variable=self._vars["priority_only"], command=self.priority_only
).grid(column=1, row=2)
# Priority section
priority_frame = ttk.LabelFrame(center_frame, padding=(4, 0, 4, 4), text="Priority")
priority_frame.grid(column=1, row=0, sticky="nsew")
self._priority_entry = PlaceholderCombobox(
priority_frame, placeholder="Game name", width=30
)
self._priority_entry.grid(column=0, row=0, sticky="ew")
priority_frame.columnconfigure(0, weight=1)
ttk.Button(
priority_frame, text="+", command=self.priority_add, width=2, style="Large.TButton"
).grid(column=1, row=0)
self._priority_list = PaddedListbox(
priority_frame,
height=10,
padding=(1, 0),
activestyle="none",
selectmode="single",
highlightthickness=0,
exportselection=False,
)
self._priority_list.grid(column=0, row=1, rowspan=3, sticky="nsew")
self._priority_list.insert("end", *self._settings.priority)
ttk.Button(
priority_frame,
width=2,
text="",
style="Large.TButton",
command=partial(self.priority_move, True),
).grid(column=1, row=1, sticky="ns")
priority_frame.rowconfigure(1, weight=1)
ttk.Button(
priority_frame,
width=2,
text="",
style="Large.TButton",
command=partial(self.priority_move, False),
).grid(column=1, row=2, sticky="ns")
priority_frame.rowconfigure(2, weight=1)
ttk.Button(
priority_frame, text="", command=self.priority_delete, width=2, style="Large.TButton"
).grid(column=1, row=3, sticky="ns")
priority_frame.rowconfigure(3, weight=1)
# Exclude section
exclude_frame = ttk.LabelFrame(center_frame, padding=(4, 0, 4, 4), text="Exclude")
exclude_frame.grid(column=2, row=0, sticky="nsew")
self._exclude_entry = PlaceholderCombobox(exclude_frame, placeholder="Game name", width=26)
self._exclude_entry.grid(column=0, row=0, sticky="ew")
ttk.Button(
exclude_frame, text="+", command=self.exclude_add, width=2, style="Large.TButton"
).grid(column=1, row=0)
self._exclude_list = PaddedListbox(
exclude_frame,
height=10,
padding=(1, 0),
activestyle="none",
selectmode="single",
highlightthickness=0,
exportselection=False,
)
self._exclude_list.grid(column=0, row=1, columnspan=2, sticky="nsew")
exclude_frame.rowconfigure(1, weight=1)
# insert them alphabetically
self._exclude_list.insert("end", *sorted(self._settings.exclude))
ttk.Button(
exclude_frame, text="", command=self.exclude_delete, width=2, style="Large.TButton"
).grid(column=0, row=2, columnspan=2, sticky="ew")
self.priority_only() # update exclude section state
def clear_selection(self) -> None:
self._priority_list.selection_clear(0, "end")
self._exclude_list.selection_clear(0, "end")
def update_autostart(self) -> None:
enabled = bool(self._vars["autostart"].get())
tray = bool(self._vars["tray"].get())
self._settings.autostart = enabled
self._settings.autostart_tray = tray
if enabled:
self_path = str(Path(sys.argv[0]).resolve())
if tray:
self_path += " --tray"
with RegistryKey("HKCU/Software/Microsoft/Windows/CurrentVersion/Run") as key:
key.set(self.AUTOSTART_NAME, ValueType.REG_SZ, self_path)
else:
with RegistryKey("HKCU/Software/Microsoft/Windows/CurrentVersion/Run") as key:
key.delete(self.AUTOSTART_NAME, silent=True)
def set_games(self, games: abc.Iterable[Game]) -> None:
games_list = list(map(str, games))
self._exclude_entry.config(values=games_list)
self._priority_entry.config(values=games_list)
def priority_add(self) -> None:
game_name: str = self._priority_entry.get()
if not game_name:
# prevent adding empty strings
return
self._priority_entry.clear()
# add it preventing duplicates
try:
existing_idx: int = self._settings.priority.index(game_name)
except ValueError:
# not there, add it
self._priority_list.insert("end", game_name)
self._priority_list.see("end")
self._settings.priority.append(game_name)
self._game_selector._exclude_update(game_name)
else:
# already there, set the selection on it
self._priority_list.selection_set(existing_idx)
self._priority_list.see(existing_idx)
def _priority_idx(self) -> int | None:
selection: tuple[int, ...] = self._priority_list.curselection()
if not selection:
return None
return selection[0]
def priority_move(self, up: bool) -> None:
idx: int | None = self._priority_idx()
if idx is None:
return
if up and idx == 0 or not up and idx == self._priority_list.size() - 1:
return
swap_idx: int = idx - 1 if up else idx + 1
item: str = self._priority_list.get(idx)
self._priority_list.delete(idx)
self._priority_list.insert(swap_idx, item)
# reselect the item and scroll the list if needed
self._priority_list.selection_set(swap_idx)
self._priority_list.see(swap_idx)
p = self._settings.priority
p[idx], p[swap_idx] = p[swap_idx], p[idx]
def priority_delete(self) -> None:
idx: int | None = self._priority_idx()
if idx is None:
return
# NOTE: get the item before it's removed from the list
item: str = self._priority_list.get(idx)
self._priority_list.delete(idx)
del self._settings.priority[idx]
self._game_selector._exclude_update(item)
def priority_only(self) -> None:
self._settings.priority_only = bool(self._vars["priority_only"].get())
self._game_selector._exclude_sync()
def exclude_add(self) -> None:
game_name: str = self._exclude_entry.get()
if not game_name:
# prevent adding empty strings
return
self._exclude_entry.clear()
e = self._settings.exclude
if game_name not in e:
e.add(game_name)
# insert it alphabetically
for i, item in enumerate(self._exclude_list.get(0, "end")):
if game_name < item:
self._exclude_list.insert(i, game_name)
self._exclude_list.see(i)
break
else:
self._exclude_list.insert("end", game_name)
self._exclude_list.see("end")
self._game_selector._exclude_update(game_name)
else:
# it was already there, select it
for i, item in enumerate(self._exclude_list.get(0, "end")):
if item == game_name:
existing_idx = i
break
else:
# something went horribly wrong and it's not there after all - just return
return
self._exclude_list.selection_set(existing_idx)
self._exclude_list.see(existing_idx)
def exclude_delete(self) -> None:
selection: tuple[int, ...] = self._exclude_list.curselection()
if not selection:
return None
idx: int = selection[0]
item: str = self._exclude_list.get(idx)
if item in self._settings.exclude:
self._settings.exclude.discard(item)
self._exclude_list.delete(idx)
self._game_selector._exclude_update(item)
class GUIManager:
def __init__(self, twitch: Twitch):
self._twitch: Twitch = twitch
@@ -862,14 +1225,36 @@ class GUIManager:
root.resizable(False, True)
root.iconbitmap(resource_path("pickaxe.ico")) # window icon
root.title(WINDOW_TITLE) # window title
root.protocol("WM_DELETE_WINDOW", self.close)
root.bind_all("<KeyPress-Escape>", self.unfocus)
root.protocol("WM_DELETE_WINDOW", self.close) # hook the X window closing button
root.bind_all("<KeyPress-Escape>", self.unfocus) # pressing ESC unfocuses selection
# style adjustements
self._style = ttk.Style(root)
# fix treeview's background color from tags not working (also see '_fixed_map')
self._style.map(
"Treeview",
foreground=self._fixed_map("foreground"),
background=self._fixed_map("background"),
)
# remove Notebook.focus from the Notebook.Tab layout tree to avoid an ugly dotted line
# on tab selection. We fold the Notebook.focus children into Notebook.padding children.
original = self._style.layout("TNotebook.Tab")
sublayout = original[0][1]["children"][0][1]
sublayout["children"] = sublayout["children"][0][1]["children"]
self._style.layout("TNotebook.Tab", original)
# add padding to the tab names
self._style.configure("TNotebook.Tab", padding=[8, 4])
# remove Checkbutton.focus dotted line from checkbuttons
self._style.configure("TCheckbutton", padding=0)
original = self._style.layout("TCheckbutton")
sublayout = original[0][1]["children"]
sublayout[1] = sublayout[1][1]["children"][0]
del original[0][1]["children"][1]
self._style.layout("TCheckbutton", original)
# adds a style with a larger font for buttons
self._style.configure("Large.TButton", font=LARGE_FONT)
# end of style changes
root_frame = ttk.Frame(root, padding=8)
root_frame.grid(column=0, row=0, sticky="nsew")
root.rowconfigure(0, weight=1)
@@ -889,15 +1274,8 @@ class GUIManager:
self.channels = ChannelList(self, main_frame)
# Settings tab
settings_frame = ttk.Frame(root_frame, padding=8)
settings_frame.rowconfigure(0, weight=1)
settings_frame.columnconfigure(0, weight=1)
self.settings = SettingsPanel(self, settings_frame)
self.tabs.add_tab(settings_frame, name="Settings")
ttk.Label(
settings_frame,
font=(..., 20),
anchor="center",
text="Work In Progress",
).grid(column=0, row=0, sticky="nsew")
# clamp minimum window height (update first, so that geometry calculates the size)
root.update_idletasks()
root.minsize(width=0, height=root.winfo_reqheight())
@@ -906,7 +1284,7 @@ class GUIManager:
self._handler.setFormatter(FORMATTER)
logging.getLogger("TwitchDrops").addHandler(self._handler)
# show the window when ready
if not self._twitch.options.tray:
if not self._twitch.settings.tray:
self._root.deiconify()
# https://stackoverflow.com/questions/56329342/tkinter-treeview-background-tag-not-working
@@ -980,6 +1358,11 @@ class GUIManager:
# support pressing ESC to unfocus
self._root.focus_set()
self.channels.clear_selection()
self.settings.clear_selection()
def set_games(self, games: abc.Iterable[Game]) -> None:
self.games.set_games(games)
self.settings.set_games(games)
def prevent_close(self):
self._closed.clear()
@@ -995,7 +1378,6 @@ class GUIManager:
if __name__ == "__main__":
# Everything below is for debug purposes only
from functools import partial
from types import SimpleNamespace
class StrNamespace(SimpleNamespace):
@@ -1076,7 +1458,14 @@ if __name__ == "__main__":
async def main(exit_event: asyncio.Event):
# Initialize GUI debug
mock = SimpleNamespace(
options=SimpleNamespace(game=None, tray=False, exclude={"Lit Game"}), channels={}
settings=SimpleNamespace(
tray=False,
priority=[],
autostart=False,
priority_only=False,
autostart_tray=False,
exclude={"Lit Game"},
)
)
mock.change_state = lambda state: mock.gui.print(f"State change: {state.value}")
mock.state_change = lambda state: partial(mock.change_state, state)
@@ -1088,8 +1477,8 @@ if __name__ == "__main__":
gui._poll_task.add_done_callback(lambda t: exit_event.set())
# Login form
gui.login.update("Login required", None)
# Game selector
gui.games.set_games([
# Game selector and settings panel games
gui.set_games([
create_game(420690, "Lit Game"),
create_game(123456, "Best Game"),
create_game(654321, "My Game Very Long Name"),

78
main.py
View File

@@ -9,78 +9,42 @@ import logging
import argparse
import traceback
import tkinter as tk
from copy import copy
from pathlib import Path
from tkinter import messagebox
from typing import Generic, NoReturn
from typing import IO, NoReturn
from utils import _T
from twitch import Twitch
from settings import Settings
from version import __version__
from exceptions import CaptchaRequired
from constants import FORMATTER, LOG_PATH, WINDOW_TITLE
# we need a dummy invisible window for the parser
root = tk.Tk()
root.overrideredirect(True)
root.withdraw()
root.update()
class Parser(argparse.ArgumentParser):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._message: io.StringIO = io.StringIO()
def _print_message(self, message: str, *args, **kwargs) -> None:
def _print_message(self, message: str, file: IO[str] | None = None) -> None:
self._message.write(message)
# print(message, file=self._message)
def exit(self, *args, **kwargs) -> NoReturn:
def exit(self, status: int = 0, message: str | None = None) -> NoReturn:
try:
super().exit(*args, **kwargs)
super().exit(status, message)
finally:
messagebox.showerror("Argument Parser Error", self._message.getvalue())
class SetCollectAction(argparse.Action, Generic[_T]):
def __init__(
self,
option_strings,
dest,
*,
const: _T | None = None,
nargs: int | str | None = None,
default: set[_T] | None = None,
**kwargs,
) -> None:
if nargs is not None and nargs in ('?', '*') or isinstance(nargs, int) and nargs <= 0:
raise ValueError("'nargs' has to be '+' or an integer greater than zero")
if default is None:
default = set()
elif not isinstance(default, set):
raise TypeError("'default' has to be of 'set' type")
super().__init__(option_strings, dest, nargs, const, default, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
items: set[_T] = getattr(namespace, self.dest, self.default)
items = copy(items)
for value in values:
items.add(value)
setattr(namespace, self.dest, items)
class ParsedArgs(argparse.Namespace):
_verbose: int
_debug_ws: bool
_debug_gql: bool
log: bool
tray: bool
game: str | None
exclude: set[str]
no_run_check: bool
# TODO: replace int with union of literal values once typeshed updates
@property
def logging_level(self) -> int:
return {
@@ -113,8 +77,12 @@ class ParsedArgs(argparse.Namespace):
# handle input parameters
# NOTE: due to using pythonw to run the main script, CLI help via '-h' and generally any
# console output is not available. The input arguments still work though.
# NOTE: parser output is shown via message box
# we also need a dummy invisible window for the parser
root = tk.Tk()
root.overrideredirect(True)
root.withdraw()
root.update()
parser = Parser(
Path(sys.argv[0]).name,
description="A program that allows you to mine timed drops on Twitch.",
@@ -123,43 +91,45 @@ parser.add_argument("--version", action="version", version=f"v{__version__}")
parser.add_argument("-v", dest="_verbose", action="count", default=0)
parser.add_argument("--tray", action="store_true")
parser.add_argument("--log", action="store_true")
parser.add_argument("-g", "--game", default=None)
parser.add_argument("--exclude", action=SetCollectAction, nargs='+', metavar="GAME")
# undocumented debug args
parser.add_argument(
"--no-run-check", dest="no_run_check", action="store_true", help=argparse.SUPPRESS
)
parser.add_argument("--debug-ws", dest="_debug_ws", action="store_true", help=argparse.SUPPRESS)
parser.add_argument("--debug-gql", dest="_debug_gql", action="store_true", help=argparse.SUPPRESS)
options: ParsedArgs = parser.parse_args(namespace=ParsedArgs())
args = parser.parse_args(namespace=ParsedArgs())
# dummy window isn't needed anymore
root.destroy()
# load settings
settings = Settings(args)
# check if we're not already running
try:
exists = ctypes.windll.user32.FindWindowW(None, WINDOW_TITLE)
except AttributeError:
# we're not on Windows - continue
exists = False
if exists and not options.no_run_check:
if exists and not settings.no_run_check:
# already running - exit
sys.exit()
# handle logging stuff
if options.logging_level > logging.DEBUG:
if settings.logging_level > logging.DEBUG:
# redirect the root logger into a NullHandler, effectively ignoring all logging calls
# that aren't ours. This always runs, unless the main logging level is DEBUG or lower.
logging.getLogger().addHandler(logging.NullHandler())
logger = logging.getLogger("TwitchDrops")
logger.setLevel(options.logging_level)
if options.log:
logger.setLevel(settings.logging_level)
if settings.log:
handler = logging.FileHandler(LOG_PATH)
handler.setFormatter(FORMATTER)
logger.addHandler(handler)
logging.getLogger("TwitchDrops.gql").setLevel(options.debug_gql)
logging.getLogger("TwitchDrops.websocket").setLevel(options.debug_ws)
logging.getLogger("TwitchDrops.gql").setLevel(settings.debug_gql)
logging.getLogger("TwitchDrops.websocket").setLevel(settings.debug_ws)
# client run
loop = asyncio.get_event_loop()
client = Twitch(options)
client = Twitch(settings)
signal.signal(signal.SIGINT, lambda *_: client.close())
signal.signal(signal.SIGTERM, lambda *_: client.close())
try:

View File

@@ -4,10 +4,6 @@
Available command line arguments:
• -g <name>; --game <name>
On application start, begin mining immedately for the `<name>` game. Case sensitive.
• --exclude <name> [<name> ...]
Exclude the `<name>` game(s) from being mined. Case sensitive.
• --tray
Start application as minimised into tray.
• -v
@@ -19,5 +15,4 @@ Available command line arguments:
• --version
Show application version information
Note: Arguments containing spaces (such as game names) need to be encased in double quotes.
For example: "Game Name".
Note: Additional settings are available within the application GUI.

111
registry.py Normal file
View File

@@ -0,0 +1,111 @@
from __future__ import annotations
import winreg as reg
from typing import Any
from enum import Enum, Flag
from collections.abc import Generator
class RegistryError(Exception):
pass
class ValueNotExists(RegistryError):
pass
class Access(Flag):
KEY_READ = reg.KEY_READ
KEY_WRITE = reg.KEY_WRITE
KEY_NOTIFY = reg.KEY_NOTIFY
KEY_EXECUTE = reg.KEY_EXECUTE
KEY_SET_VALUE = reg.KEY_SET_VALUE
KEY_ALL_ACCESS = reg.KEY_ALL_ACCESS
KEY_CREATE_LINK = reg.KEY_CREATE_LINK
KEY_QUERY_VALUE = reg.KEY_QUERY_VALUE
KEY_CREATE_SUB_KEY = reg.KEY_CREATE_SUB_KEY
KEY_ENUMERATE_SUB_KEYS = reg.KEY_ENUMERATE_SUB_KEYS
class MainKey(Enum):
HKU = reg.HKEY_USERS
HKCR = reg.HKEY_CLASSES_ROOT
HKCU = reg.HKEY_CURRENT_USER
HKLM = reg.HKEY_LOCAL_MACHINE
HKEY_USERS = reg.HKEY_USERS
HKEY_CLASSES_ROOT = reg.HKEY_CLASSES_ROOT
HKEY_CURRENT_USER = reg.HKEY_CURRENT_USER
HKEY_LOCAL_MACHINE = reg.HKEY_LOCAL_MACHINE
HKEY_CURRENT_CONFIG = reg.HKEY_CURRENT_CONFIG
HKEY_PERFORMANCE_DATA = reg.HKEY_PERFORMANCE_DATA
class ValueType(Enum):
REG_SZ = reg.REG_SZ
REG_NONE = reg.REG_NONE
REG_LINK = reg.REG_LINK
REG_DWORD = reg.REG_DWORD
REG_QWORD = reg.REG_QWORD
REG_BINARY = reg.REG_BINARY
REG_MULTI_SZ = reg.REG_MULTI_SZ
REG_EXPAND_SZ = reg.REG_EXPAND_SZ
REG_RESOURCE_LIST = reg.REG_RESOURCE_LIST
REG_DWORD_BIG_ENDIAN = reg.REG_DWORD_BIG_ENDIAN
REG_DWORD_LITTLE_ENDIAN = reg.REG_DWORD_LITTLE_ENDIAN
REG_QWORD_LITTLE_ENDIAN = reg.REG_QWORD_LITTLE_ENDIAN
REG_FULL_RESOURCE_DESCRIPTOR = reg.REG_FULL_RESOURCE_DESCRIPTOR
REG_RESOURCE_REQUIREMENTS_LIST = reg.REG_RESOURCE_REQUIREMENTS_LIST
class RegistryKey:
def __init__(self, path: str):
main_key, _, path = path.replace('/', '\\').partition('\\')
self.main_key = MainKey[main_key]
self.path = path
self._handle = reg.OpenKey(
self.main_key.value, path, access=(Access.KEY_QUERY_VALUE | Access.KEY_SET_VALUE).value
)
def __enter__(self) -> RegistryKey:
return self
def __exit__(self, exc_type, exc, tb):
self._handle.Close()
def get(self, name: str) -> tuple[ValueType, Any]:
try:
value, value_type = reg.QueryValueEx(self._handle, name)
except FileNotFoundError:
# TODO: consider returning None for missing values
raise ValueNotExists(name)
return (ValueType(value_type), value)
def set(self, name: str, value_type: ValueType, value: Any) -> bool:
reg.SetValueEx(self._handle, name, 0, value_type.value, value)
return True # TODO: return False if the set operation fails
def delete(self, name: str, *, silent: bool = False) -> bool:
try:
reg.DeleteValue(self._handle, name)
except FileNotFoundError:
if not silent:
raise ValueNotExists(name)
return False
return True
def values(self) -> Generator[tuple[str, ValueType, Any], None, None]:
len_keys, len_values, last_modified = reg.QueryInfoKey(self._handle)
for i in range(len_values):
try:
name, value, value_type = reg.EnumValue(self._handle, i)
yield name, ValueType(value_type), value
except OSError:
return
if __name__ == "__main__":
with RegistryKey("HKCU/Software/Microsoft/Windows/CurrentVersion/Run") as key:
# key.get("test")
# key.set("test", ValueType.REG_SZ, "test\\path")
for name, value_type, value in key.values():
print(name, value_type, value)

102
settings.py Normal file
View File

@@ -0,0 +1,102 @@
from __future__ import annotations
import json
from enum import Enum
from pathlib import Path
from typing import Any, TypedDict, TYPE_CHECKING
from constants import JsonType, SETTINGS_PATH
if TYPE_CHECKING:
from main import ParsedArgs
PATH = Path(SETTINGS_PATH)
class SettingsFile(TypedDict):
autostart: bool
exclude: set[str]
priority: list[str]
priority_only: bool
autostart_tray: bool
serialize_env: dict[str, type] = {
"set": set,
}
default_settings: SettingsFile = {
"priority": [],
"exclude": set(),
"autostart": False,
"priority_only": True,
"autostart_tray": False,
}
def serialize(obj: Any) -> Any:
if isinstance(obj, (set, Enum)):
if isinstance(obj, set):
d = list(obj)
elif isinstance(obj, Enum):
d = obj.value
return {
"__type": type(obj).__name__,
"data": d,
}
raise TypeError(obj)
def deserialize(obj: JsonType) -> Any:
if "__type" in obj:
t = eval(obj["__type"], None, serialize_env)
return t(obj["data"])
return obj
class Settings:
# from args
log: bool
tray: bool
no_run_check: bool
# args properties
debug_ws: int
debug_gql: int
logging_level: int
# from settings file
autostart: bool
exclude: set[str]
priority: list[str]
priority_only: bool
autostart_tray: bool
def __init__(self, args: ParsedArgs):
self._settings: SettingsFile = default_settings.copy()
if PATH.exists():
with open(PATH, 'r') as file:
self._settings.update(json.load(file, object_hook=deserialize))
self._args: ParsedArgs = args
# default logic of reading settings is to check args first, then the settings file
def __getattr__(self, name: str, /) -> Any:
if hasattr(self._args, name):
return getattr(self._args, name)
elif name in self._settings:
return self._settings[name] # type: ignore[misc]
return getattr(super(), name)
def __setattr__(self, name: str, value: Any, /) -> None:
if name in ("_settings", "_args"):
# passthrough
return super().__setattr__(name, value)
elif name in self._settings:
self._settings[name] = value # type: ignore[misc]
return
raise TypeError(f"{name} is missing a custom setter")
def __delattr__(self, name: str, /) -> None:
raise RuntimeError("settings can't be deleted")
def save(self) -> None:
with open(PATH, 'w') as file:
json.dump(self._settings, file, default=serialize, sort_keys=True, indent=4)

View File

@@ -41,7 +41,7 @@ if TYPE_CHECKING:
from utils import Game
from gui import LoginForm
from main import ParsedArgs
from settings import Settings
from inventory import TimedDrop
from constants import JsonType, GQLOperation
@@ -57,8 +57,8 @@ def viewers_key(channel: Channel) -> int:
class Twitch:
def __init__(self, options: ParsedArgs):
self.options = options
def __init__(self, settings: Settings):
self.settings: Settings = settings
# State management
self._state: State = State.IDLE
self._state_change = asyncio.Event()
@@ -99,12 +99,14 @@ class Twitch:
if self._watching_task is not None:
self._watching_task.cancel()
self._watching_task = None
# close session and stop websocket
# close session, save cookies and stop websocket
if self._session is not None:
self._session.cookie_jar.save(COOKIES_PATH) # type: ignore
await self._session.close()
self._session = None
await self.websocket.stop()
# save settings
self.settings.save()
# wait at least one full second + whatever it takes to complete the closing
# this allows aiohttp to safely close the session
await asyncio.sleep(start_time + 0.5 - time())
@@ -175,7 +177,6 @@ class Twitch:
WebsocketTopic("User", "Drops", self._user_id, self.process_drops),
WebsocketTopic("User", "CommunityPoints", self._user_id, self.process_points),
])
first_select: bool = True
full_cleanup: bool = False
channels: Final[OrderedDict[int, Channel]] = self.channels
self.change_state(State.INVENTORY_FETCH)
@@ -211,21 +212,14 @@ class Twitch:
return
# only start the websocket after we confirm there are drops to mine
await self.websocket.start()
self.gui.games.set_games(games)
self.gui.set_games(games)
self.change_state(State.GAME_SELECT)
elif self._state is State.GAME_SELECT:
new_game: Game | None = self.gui.games.get_selection()
if new_game is None:
if first_select:
# on first select, let the user make the choice
first_select = False
else:
new_game = self.gui.games.set_first()
if new_game is not None:
# restart the watch loop immediately on new game selected
self.restart_watching()
# signal channel cleanup that we're removing everything
new_game: Game | None = self.gui.games.get_next()
if new_game != self.game:
if new_game is not None:
# restart the watch loop immediately on new game selected
self.restart_watching()
self.game = new_game
full_cleanup = True
self.change_state(State.CHANNELS_CLEANUP)