diff --git a/channel.py b/channel.py index 55f436c..375a027 100644 --- a/channel.py +++ b/channel.py @@ -6,15 +6,16 @@ import asyncio import logging from base64 import b64encode from functools import cached_property -from typing import Any, Optional, List, SupportsInt, TYPE_CHECKING +from typing import Any, SupportsInt, TYPE_CHECKING from utils import Game, invalidate_cache from exceptions import MinerException, RequestException -from constants import JsonType, BASE_URL, GQL_OPERATIONS, ONLINE_DELAY, DROPS_ENABLED_TAG +from constants import BASE_URL, GQL_OPERATIONS, ONLINE_DELAY, DROPS_ENABLED_TAG if TYPE_CHECKING: from twitch import Twitch from gui import ChannelList + from constants import JsonType logger = logging.getLogger("TwitchDrops") @@ -26,16 +27,16 @@ class Stream: channel: Channel, *, id: SupportsInt, - game: Optional[JsonType], + game: JsonType | None, viewers: int, title: str, - tags: List[JsonType], + tags: list[JsonType], ): self.channel: Channel = channel self.broadcast_id = int(id) self.viewers: int = viewers self.drops_enabled: bool = any(t["id"] == DROPS_ENABLED_TAG for t in tags) - self.game: Optional[Game] = Game(game) if game else None + self.game: Game | None = Game(game) if game else None self.title: str = title @classmethod @@ -70,18 +71,18 @@ class Channel: *, id: SupportsInt, login: str, - display_name: Optional[str] = None, + display_name: str | None = None, priority: bool = False, ): self._twitch: Twitch = twitch self._gui_channels: ChannelList = twitch.gui.channels self.id: int = int(id) self._login: str = login - self._display_name: Optional[str] = display_name - self._spade_url: Optional[str] = None - self.points: Optional[int] = None - self._stream: Optional[Stream] = None - self._pending_stream_up: Optional[asyncio.Task[Any]] = None + self._display_name: str | None = display_name + self._spade_url: str | None = None + self.points: int | None = None + self._stream: Stream | None = None + self._pending_stream_up: asyncio.Task[Any] | None = None # Priority channels are: # • considered first when switching channels # • if we're watching a non-priority channel, a priority channel going up triggers a switch @@ -174,13 +175,13 @@ class Channel: return self._pending_stream_up is not None @property - def game(self) -> Optional[Game]: + def game(self) -> Game | None: if self._stream is not None and self._stream.game is not None: return self._stream.game return None @property - def viewers(self) -> Optional[int]: + def viewers(self) -> int | None: if self._stream is not None: return self._stream.viewers return None @@ -229,13 +230,13 @@ class Channel: raise MinerException("Error while spade_url extraction: step #2") return match.group(1) - async def get_stream(self) -> Optional[Stream]: - response: Optional[JsonType] = await self._twitch.gql_request( + async def get_stream(self) -> Stream | None: + response: JsonType | None = await self._twitch.gql_request( GQL_OPERATIONS["GetStreamInfo"].with_variables({"channel": self._login}) ) if not response: return None - stream_data: Optional[JsonType] = response["data"]["user"] + stream_data: JsonType | None = response["data"]["user"] if not stream_data: return None # fill in channel_id and display name diff --git a/constants.py b/constants.py index 74b4368..9d688de 100644 --- a/constants.py +++ b/constants.py @@ -2,16 +2,19 @@ from __future__ import annotations import logging from copy import copy +from collections import abc from enum import Enum, auto from datetime import timedelta -from typing import Any, Optional, Dict, Literal, Callable +from typing import Any, Literal + +from typing_extensions import TypeAlias from version import __version__ # Typing -JsonType = Dict[str, Any] -TopicProcess = Callable[[int, JsonType], Any] +JsonType: TypeAlias = "dict[str, Any]" +TopicProcess = abc.Callable[[int, JsonType], Any] # Values MAX_WEBSOCKETS = 8 WS_TOPICS_LIMIT = 50 @@ -58,7 +61,7 @@ class State(Enum): class GQLOperation(JsonType): - def __init__(self, name: str, sha256: str, *, variables: Optional[JsonType] = None): + def __init__(self, name: str, sha256: str, *, variables: JsonType | None = None): super().__init__( operationName=name, extensions={ @@ -81,7 +84,7 @@ class GQLOperation(JsonType): return modified -GQL_OPERATIONS: Dict[str, GQLOperation] = { +GQL_OPERATIONS: dict[str, GQLOperation] = { # returns stream information for a particular channel "GetStreamInfo": GQLOperation( "VideoPlayerStreamInfoOverlayChannel", @@ -188,7 +191,9 @@ class WebsocketTopic: self._process: TopicProcess = process @classmethod - def as_str(cls, category: Literal["User", "Channel"], topic_name: str, target_id: int) -> str: + def as_str( + cls, category: Literal["User", "Channel"], topic_name: str, target_id: int + ) -> str: return f"{WEBSOCKET_TOPICS[category][topic_name]}.{target_id}" def __call__(self, message: JsonType): @@ -211,7 +216,7 @@ class WebsocketTopic: return hash((self.__class__.__name__, self._id)) -WEBSOCKET_TOPICS: Dict[str, Dict[str, str]] = { +WEBSOCKET_TOPICS: dict[str, dict[str, str]] = { "User": { # Using user_id "Drops": "user-drop-events", "CommunityPoints": "community-points-user-v1", diff --git a/gui.py b/gui.py index c6babaf..c8c16df 100644 --- a/gui.py +++ b/gui.py @@ -9,9 +9,7 @@ from math import log10, ceil from tkinter.font import Font from collections import namedtuple, OrderedDict from tkinter import Tk, ttk, StringVar, DoubleVar -from typing import ( - Any, Optional, List, Dict, Set, Tuple, TypedDict, Iterable, NoReturn, TYPE_CHECKING -) +from typing import Any, TypedDict, NoReturn, TYPE_CHECKING try: import pystray @@ -23,6 +21,7 @@ from constants import FORMATTER, WS_TOPICS_LIMIT, MAX_WEBSOCKETS, WINDOW_TITLE, if TYPE_CHECKING: from twitch import Twitch from channel import Channel + from collections import abc from inventory import Game, TimedDrop @@ -93,7 +92,7 @@ class PlaceholderEntry(ttk.Entry): self.config(foreground=self._ph_color, show='') self.insert(0, self._ph_text) - def _store_option(self, options: Dict[str, Any], attr: str, name: str): + def _store_option(self, options: dict[str, Any], attr: str, name: str): value = options.get(name) if value is not None: setattr(self, attr, value) @@ -157,10 +156,10 @@ class WebsocketStatus: justify="right", font=WS_FONT, ).grid(column=2, row=0) - self._items: Dict[int, Optional[_WSEntry]] = {i: None for i in range(MAX_WEBSOCKETS)} + self._items: dict[int, _WSEntry | None] = {i: None for i in range(MAX_WEBSOCKETS)} self._update() - def update(self, idx: int, status: Optional[str] = None, topics: Optional[int] = None): + def update(self, idx: int, status: str | None = None, topics: int | None = None): if status is None and topics is None: raise TypeError("You need to provide at least one of: status, topics") entry = self._items.get(idx) @@ -178,8 +177,8 @@ class WebsocketStatus: self._update() def _update(self): - status_lines: List[str] = [] - topic_lines: List[str] = [] + status_lines: list[str] = [] + topic_lines: list[str] = [] for idx in range(MAX_WEBSOCKETS): item = self._items.get(idx) if item is None: @@ -236,7 +235,7 @@ class LoginForm: data = LoginData(self._login_entry.get(), self._pass_entry.get(), self._token_entry.get()) return data - def update(self, status: str, user_id: Optional[int]): + def update(self, status: str, user_id: int | None): if user_id is not None: user_str = str(user_id) else: @@ -260,12 +259,12 @@ class GameSelector: highlightthickness=0, ) self._list.pack(fill="both", expand=True) - self._selection: Optional[str] = self._manager._twitch.options.game + self._selection: str | None = self._manager._twitch.options.game self._games: OrderedDict[str, Game] = OrderedDict() - self._excluded: Set[int] = set() + self._excluded: set[int] = set() self._list.bind("<>", self._on_select) - def set_games(self, games: Iterable[Game]): + def set_games(self, games: abc.Iterable[Game]): self._games.clear() self._games.update((str(g), g) for g in games) self._list.delete(0, "end") @@ -273,7 +272,7 @@ class GameSelector: self._list.config(width=0) # autoadjust listbox width # process excluded games and relink the selection self._excluded.clear() - selected_index: Optional[int] = None + selected_index: int | None = None exclude = self._manager._twitch.options.exclude for i, game_name in enumerate(self._list.get(0, "end")): if game_name in exclude: @@ -290,7 +289,7 @@ class GameSelector: self._selection = None def _on_select(self, event): - current: Tuple[int, ...] = self._list.curselection() + current: tuple[int, ...] = self._list.curselection() if not current: # can happen when the user clicks on an empty list return @@ -311,12 +310,12 @@ class GameSelector: self._selection = new_selection self._manager._twitch.change_state(State.GAME_SELECT) - def get_selection(self) -> Optional[Game]: + def get_selection(self) -> Game | None: if self._selection is None: return None return self._games[self._selection] - def set_first(self) -> Optional[Game]: + def set_first(self) -> Game | None: # select and return the first non-excluded game from the list self._list.selection_clear(0, "end") for i, game_name in enumerate(self._list.get(0, "end")): @@ -402,12 +401,12 @@ class CampaignProgress: maximum=1, variable=self._vars["drop"]["progress"], ).grid(column=0, row=10, columnspan=2) - self._drop: Optional[TimedDrop] = None - self._timer_task: Optional[asyncio.Task[None]] = None + self._drop: TimedDrop | None = None + self._timer_task: asyncio.Task[None] | None = None self._update_time(0) @staticmethod - def _divmod(minutes: int, seconds: int) -> Tuple[int, int]: + def _divmod(minutes: int, seconds: int) -> tuple[int, int]: if seconds < 60 and minutes > 0: minutes -= 1 hours, minutes = divmod(minutes, 60) @@ -560,7 +559,7 @@ class ChannelList: table.grid(column=0, row=1, sticky="nsew") scroll.grid(column=1, row=1, sticky="ns") self._font = Font(frame, manager._style.lookup("Treeview", "font")) - self._const_width: Set[str] = set() + self._const_width: set[str] = set() table.tag_configure("watching", background="gray70") table.bind("", self._disable_column_resize) table.bind("<>", self._selected) @@ -572,7 +571,7 @@ class ChannelList: self._add_column("viewers", "Viewers", width_template="1234567") self._add_column("points", "Points", width_template="1234567") self._add_column("priority", "❗", width_template="✔") - self._channel_map: Dict[str, Channel] = {} + self._channel_map: dict[str, Channel] = {} def _add_column( self, @@ -580,13 +579,13 @@ class ChannelList: name: str, *, anchor: tk._Anchor = "center", - width: Optional[int] = None, - width_template: Optional[str] = None, + width: int | None = None, + width_template: str | None = None, ): table = self._table # we need to save the column settings and headings before modifying the columns... - columns: Tuple[str, ...] = table.cget("columns") or () - column_settings: Dict[str, Tuple[str, tk._Anchor, int, int]] = {} + columns: tuple[str, ...] = table.cget("columns") or () + column_settings: dict[str, tuple[str, tk._Anchor, int, int]] = {} for s_cid in columns: s_column = table.column(s_cid) assert s_column is not None @@ -665,8 +664,8 @@ class ChannelList: self._table.set(iid, column, value) self._adjust_width(column, value) - def _insert(self, iid: str, values: Dict[str, str]): - to_insert: List[str] = [] + def _insert(self, iid: str, values: dict[str, str]): + to_insert: list[str] = [] for cid in self._table.cget("columns"): value = values[cid] to_insert.append(value) @@ -683,7 +682,7 @@ class ChannelList: self._table.item(iid, tags="watching") self._table.see(iid) - def get_selection(self) -> Optional[Channel]: + def get_selection(self) -> Channel | None: if not self._channel_map: return None selection = self._table.selection() @@ -755,7 +754,7 @@ class TrayIcon: def __init__(self, manager: GUIManager, master: ttk.Widget): self._manager = manager - self.icon: Optional[pystray.Icon] = None + self.icon: pystray.Icon | None = None self._button = ttk.Button(master, command=self.minimize, text="Minimize to Tray") self._button.grid(column=0, row=0, sticky="e") if manager._twitch.options.tray: @@ -765,7 +764,7 @@ class TrayIcon: def is_tray(self) -> bool: return self.icon is not None - def get_title(self, drop: Optional[TimedDrop]) -> str: + def get_title(self, drop: TimedDrop | None) -> str: if drop is None: return self.TITLE return ( @@ -815,7 +814,7 @@ class TrayIcon: self.stop() self._manager._root.deiconify() - def notify(self, message: str, title: Optional[str] = None, duration: float = 10): + def notify(self, message: str, title: str | None = None, duration: float = 10): if self.icon is not None: icon = self.icon @@ -834,7 +833,7 @@ class TrayIcon: class GUIManager: def __init__(self, twitch: Twitch): self._twitch: Twitch = twitch - self._poll_task: Optional[asyncio.Task[NoReturn]] = None + self._poll_task: asyncio.Task[NoReturn] | None = None self._closed = asyncio.Event() self._root = root = Tk() # withdraw immediately to prevent the window from flashing @@ -978,7 +977,7 @@ if __name__ == "__main__": def create_channel( name: str, status: int, - game: Optional[str], + game: str | None, drops: bool, viewers: int, points: int, @@ -991,7 +990,7 @@ if __name__ == "__main__": else: pending = False if game is not None: - game_obj: Optional[StrNamespace] = create_game(0, game) + game_obj: StrNamespace | None = create_game(0, game) else: game_obj = None global iid diff --git a/inventory.py b/inventory.py index 5daca42..1dd81c4 100644 --- a/inventory.py +++ b/inventory.py @@ -1,30 +1,32 @@ from __future__ import annotations +from typing import TYPE_CHECKING from functools import cached_property from datetime import datetime, timezone -from typing import Optional, List, Dict, Iterable, TYPE_CHECKING -from channel import Channel -from constants import GQL_OPERATIONS, JsonType +from constants import GQL_OPERATIONS from utils import timestamp, invalidate_cache, Game if TYPE_CHECKING: from twitch import Twitch + from channel import Channel + from collections import abc + from constants import JsonType from gui import CampaignProgress class BaseDrop: def __init__( - self, campaign: DropsCampaign, data: JsonType, claimed_benefits: Dict[str, datetime] + self, campaign: DropsCampaign, data: JsonType, claimed_benefits: dict[str, datetime] ): self._twitch: Twitch = campaign._twitch self.id: str = data["id"] self.name: str = data["name"] self.campaign: DropsCampaign = campaign - self.rewards: List[str] = [b["benefit"]["name"] for b in data["benefitEdges"]] + self.rewards: list[str] = [b["benefit"]["name"] for b in data["benefitEdges"]] self.starts_at: datetime = timestamp(data["startAt"]) self.ends_at: datetime = timestamp(data["endAt"]) - self.claim_id: Optional[str] = None + self.claim_id: str | None = None self.is_claimed: bool = False if "self" in data: self.claim_id = data["self"]["dropInstanceID"] @@ -46,7 +48,7 @@ class BaseDrop: and all(self.starts_at <= dt < self.ends_at for dt in dts) ): self.is_claimed = True - self._precondition_drops: List[str] = [d["id"] for d in (data["preconditionDrops"] or [])] + self._precondition_drops: list[str] = [d["id"] for d in (data["preconditionDrops"] or [])] def __repr__(self) -> str: if self.is_claimed: @@ -62,7 +64,7 @@ class BaseDrop: campaign = self.campaign return all(campaign.timed_drops[pid].is_claimed for pid in self._precondition_drops) - def can_earn(self, channel: Optional[Channel] = None) -> bool: + def can_earn(self, channel: Channel | None = None) -> bool: return ( self.preconditions # preconditions are met and not self.is_claimed # drop isn't already claimed @@ -128,7 +130,7 @@ class BaseDrop: class TimedDrop(BaseDrop): def __init__( - self, campaign: DropsCampaign, data: JsonType, claimed_benefits: Dict[str, datetime] + self, campaign: DropsCampaign, data: JsonType, claimed_benefits: dict[str, datetime] ): super().__init__(campaign, data, claimed_benefits) self._gui_progress: CampaignProgress = self._twitch.gui.progress @@ -185,19 +187,19 @@ class TimedDrop(BaseDrop): class DropsCampaign: - def __init__(self, twitch: Twitch, data: JsonType, claimed_benefits: Dict[str, datetime]): + def __init__(self, twitch: Twitch, data: JsonType, claimed_benefits: dict[str, datetime]): self._twitch: Twitch = twitch self.id: str = data["id"] self.name: str = data["name"] self.game: Game = Game(data["game"]) self.starts_at: datetime = timestamp(data["startAt"]) self.ends_at: datetime = timestamp(data["endAt"]) - allowed = data["allow"] - self.allowed_channels: List[Channel] = ( + allowed: JsonType = data["allow"] + self.allowed_channels: list[Channel] = ( [Channel.from_acl(twitch, channel_data) for channel_data in allowed["channels"]] if allowed["channels"] and allowed.get("isEnabled", True) else [] ) - self.timed_drops: Dict[str, TimedDrop] = { + self.timed_drops: dict[str, TimedDrop] = { drop_data["id"]: TimedDrop(self, drop_data, claimed_benefits) for drop_data in data["timeBasedDrops"] } @@ -206,7 +208,7 @@ class DropsCampaign: return f"Campaign({self.name}({self.game!s}), {self.claimed_drops}/{self.total_drops})" @property - def drops(self) -> Iterable[TimedDrop]: + def drops(self) -> abc.Iterable[TimedDrop]: return self.timed_drops.values() @property @@ -249,5 +251,5 @@ class DropsCampaign: def _on_minutes_changed(self) -> None: invalidate_cache(self, "progress", "remaining_minutes") - def get_drop(self, drop_id: str) -> Optional[TimedDrop]: + def get_drop(self, drop_id: str) -> TimedDrop | None: return self.timed_drops.get(drop_id) diff --git a/main.py b/main.py index bf97ab6..092ac21 100644 --- a/main.py +++ b/main.py @@ -12,7 +12,7 @@ import tkinter as tk from copy import copy from pathlib import Path from tkinter import messagebox -from typing import Optional, Union, Set, NoReturn, Generic +from typing import Generic, NoReturn from utils import _T from twitch import Twitch @@ -21,7 +21,7 @@ from exceptions import CaptchaRequired from constants import FORMATTER, LOG_PATH, WINDOW_TITLE -# we need an dummy invisible window for the parser +# we need a dummy invisible window for the parser root = tk.Tk() root.overrideredirect(True) root.withdraw() @@ -49,9 +49,9 @@ class SetCollectAction(argparse.Action, Generic[_T]): option_strings, dest, *, - nargs: Optional[Union[int, str]] = None, - const: Optional[_T] = None, - default: Optional[Set[_T]] = None, + const: _T | None = None, + nargs: int | str | None = None, + default: set[_T] | None = None, **kwargs, ) -> None: if nargs is not None and nargs in ('?', '*') or isinstance(nargs, int) and nargs <= 0: @@ -63,7 +63,7 @@ class SetCollectAction(argparse.Action, Generic[_T]): super().__init__(option_strings, dest, nargs, const, default, **kwargs) def __call__(self, parser, namespace, values, option_string=None): - items: Set[_T] = getattr(namespace, self.dest, self.default) + items: set[_T] = getattr(namespace, self.dest, self.default) items = copy(items) for value in values: items.add(value) @@ -76,9 +76,9 @@ class ParsedArgs(argparse.Namespace): _debug_gql: bool log: bool tray: bool - exclude: Set[str] + game: str | None + exclude: set[str] no_run_check: bool - game: Optional[str] @property def logging_level(self) -> int: diff --git a/twitch.py b/twitch.py index b59861d..3952e05 100644 --- a/twitch.py +++ b/twitch.py @@ -6,22 +6,10 @@ import logging from yarl import URL from time import time from itertools import chain -from datetime import datetime from functools import partial +from typing import TYPE_CHECKING +from collections import OrderedDict from contextlib import suppress, asynccontextmanager -from typing import ( - Optional, - Union, - List, - Dict, - Set, - OrderedDict, - Callable, - AsyncIterator, - Final, - cast, - TYPE_CHECKING, -) try: import aiohttp @@ -31,9 +19,9 @@ except ModuleNotFoundError as exc: from gui import GUIManager from channel import Channel from websocket import WebsocketPool -from inventory import DropsCampaign, TimedDrop +from inventory import DropsCampaign +from utils import task_wrapper, timestamp, AwaitableValue, OrderedSet from exceptions import RequestException, LoginException, CaptchaRequired -from utils import task_wrapper, timestamp, Game, AwaitableValue, OrderedSet from constants import ( GQL_URL, AUTH_URL, @@ -45,15 +33,20 @@ from constants import ( WATCH_INTERVAL, WS_TOPICS_LIMIT, DROPS_ENABLED_TAG, - JsonType, State, - GQLOperation, WebsocketTopic, ) if TYPE_CHECKING: + from collections import abc + from datetime import datetime + from typing import Final, cast + + from utils import Game from gui import LoginForm from main import ParsedArgs + from inventory import TimedDrop + from constants import JsonType, GQLOperation logger = logging.getLogger("TwitchDrops") @@ -73,21 +66,21 @@ class Twitch: # State management self._state: State = State.IDLE self._state_change = asyncio.Event() - self.game: Optional[Game] = None - self.inventory: Dict[Game, List[DropsCampaign]] = {} + self.game: Game | None = None + self.inventory: dict[Game, list[DropsCampaign]] = {} # GUI self.gui = GUIManager(self) # Cookies, session and auth - self._session: Optional[aiohttp.ClientSession] = None - self._access_token: Optional[str] = None - self._user_id: Optional[int] = None + self._session: aiohttp.ClientSession | None = None + self._access_token: str | None = None + self._user_id: int | None = None self._is_logged_in = asyncio.Event() # Storing and watching channels self.channels: OrderedDict[int, Channel] = OrderedDict() self.watching_channel: AwaitableValue[Channel] = AwaitableValue() - self._watching_task: Optional[asyncio.Task[None]] = None + self._watching_task: asyncio.Task[None] | None = None self._watching_restart = asyncio.Event() - self._drop_update: Optional[asyncio.Future[bool]] = None + self._drop_update: asyncio.Future[bool] | None = None # Websocket self.websocket = WebsocketPool(self) @@ -127,7 +120,7 @@ class Twitch: self._state = state self._state_change.set() - def state_change(self, state: State) -> Callable[[], None]: + def state_change(self, state: State) -> abc.Callable[[], None]: # this is identical to change_state, but defers the call # perfect for GUI usage return partial(self.change_state, state) @@ -192,7 +185,7 @@ class Twitch: self.change_state(State.GAMES_UPDATE) elif self._state is State.GAMES_UPDATE: # Figure out which games to watch, and claim the drops we can - games: List[Game] = [] + games: list[Game] = [] for game, campaigns in self.inventory.items(): add_game = False for campaign in campaigns: @@ -233,7 +226,7 @@ class Twitch: elif self._state is State.CHANNELS_CLEANUP: if self.game is None or full_cleanup: # no game selected or we're doing full cleanup: remove everything - to_remove: List[Channel] = list(channels.values()) + to_remove: list[Channel] = list(channels.values()) else: # remove all channels that: to_remove = [ @@ -291,7 +284,7 @@ class Twitch: # sort them descending by viewers, # then by priority so that prioritized ones are first # NOTE: We can drop OrderedSet now because there's no more channels being added - ordered_channels: List[Channel] = sorted( + ordered_channels: list[Channel] = sorted( new_channels, key=viewers_key, reverse=True ) ordered_channels.sort(key=lambda ch: ch.priority, reverse=True) @@ -348,7 +341,7 @@ class Twitch: else: # Change into the selected channel, stay in the watching channel, # or select a new channel that meets the required conditions - priority_channels: List[Channel] = [] + priority_channels: list[Channel] = [] selected_channel = self.gui.channels.get_selection() if selected_channel is not None: self.gui.channels.clear_selection() @@ -409,7 +402,7 @@ class Twitch: if not use_active: # we need to use GQL to get the current progress context = await self.gql_request(GQL_OPERATIONS["CurrentDrop"]) - drop_data: Optional[JsonType] = ( + drop_data: JsonType | None = ( context["data"]["currentUser"]["dropCurrentSession"] ) if drop_data is not None: @@ -535,7 +528,7 @@ class Twitch: if msg_type not in ("drop-progress", "drop-claim"): return drop_id: str = message["data"]["drop_id"] - drop: Optional[TimedDrop] = self.get_drop(drop_id) + drop: TimedDrop | None = self.get_drop(drop_id) if msg_type == "drop-claim": if drop is None: logger.error( @@ -561,7 +554,7 @@ class Twitch: await asyncio.sleep(4) for attempt in range(8): context = await self.gql_request(GQL_OPERATIONS["CurrentDrop"]) - drop_data: Optional[JsonType] = ( + drop_data: JsonType | None = ( context["data"]["currentUser"]["dropCurrentSession"] ) if drop_data is None or drop_data["dropID"] != drop.id: @@ -636,7 +629,7 @@ class Twitch: msg_type = message["type"] if msg_type == "points-earned": data: JsonType = message["data"] - channel: Optional[Channel] = self.channels.get(int(data["channel_id"])) + channel: Channel | None = self.channels.get(int(data["channel_id"])) points: int = data["point_gain"]["total_points"] balance: int = data["balance"]["balance"] if channel is not None: @@ -780,13 +773,13 @@ class Twitch: @asynccontextmanager async def request( self, method: str, url: str, *, attempts: int = 5, **kwargs - ) -> AsyncIterator[aiohttp.ClientResponse]: + ) -> abc.AsyncIterator[aiohttp.ClientResponse]: if self._session is None: await self.initialize() session = self._session assert session is not None method = method.upper() - cause: Optional[Exception] = None + cause: Exception | None = None for attempt in range(attempts): logger.debug(f"Request: ({method=}, {url=}, {attempts=}, {kwargs=})") try: @@ -815,7 +808,7 @@ class Twitch: return response_json async def fetch_campaign( - self, campaign_id: str, claimed_benefits: Dict[str, datetime] + self, campaign_id: str, claimed_benefits: dict[str, datetime] ) -> DropsCampaign: response = await self.gql_request( GQL_OPERATIONS["CampaignDetails"].with_variables( @@ -829,7 +822,7 @@ class Twitch: response = await self.gql_request(GQL_OPERATIONS["Campaigns"]) data = response["data"]["currentUser"]["dropCampaigns"] or [] applicable_statuses = ("ACTIVE", "UPCOMING") - available_campaigns: Set[str] = set( + available_campaigns: set[str] = set( c["id"] for c in data if c["status"] in applicable_statuses and c["self"]["isAccountConnected"] ) @@ -838,10 +831,10 @@ class Twitch: inventory = response["data"]["currentUser"]["inventory"] ongoing_campaigns = inventory["dropCampaignsInProgress"] or [] # this contains claimed benefit edge IDs, not drop IDs - claimed_benefits: Dict[str, datetime] = { + claimed_benefits: dict[str, datetime] = { b["id"]: timestamp(b["lastAwardedAt"]) for b in inventory["gameEventDrops"] } - campaigns: List[DropsCampaign] = [ + campaigns: list[DropsCampaign] = [ DropsCampaign(self, campaign_data, claimed_benefits) for campaign_data in ongoing_campaigns ] @@ -862,7 +855,7 @@ class Twitch: self.inventory[game] = [] self.inventory[game].append(campaign) - def get_drop(self, drop_id: str) -> Optional[TimedDrop]: + def get_drop(self, drop_id: str) -> TimedDrop | None: """ Returns a drop from the inventory, based on it's ID. """ @@ -877,7 +870,7 @@ class Twitch: return drop return None - def get_active_drop(self, channel: Optional[Channel] = None) -> Optional[TimedDrop]: + def get_active_drop(self, channel: Channel | None = None) -> TimedDrop | None: if self.game is None: return None watching_channel = self.watching_channel.get_with_default(channel) @@ -895,7 +888,7 @@ class Twitch: return drops[0] return None - async def get_live_streams(self) -> List[Channel]: + async def get_live_streams(self) -> list[Channel]: if self.game is None: return [] limit = 30 @@ -914,7 +907,7 @@ class Twitch: for stream_channel_data in response["data"]["game"]["streams"]["edges"] ] - async def claim_points(self, channel_id: Union[str, int], claim_id: str) -> None: + async def claim_points(self, channel_id: str | int, claim_id: str) -> None: await self.gql_request( GQL_OPERATIONS["ClaimCommunityPoints"].with_variables( {"input": {"channelID": str(channel_id), "claimID": claim_id}} diff --git a/utils.py b/utils.py index 0583d49..abdb097 100644 --- a/utils.py +++ b/utils.py @@ -6,23 +6,9 @@ import asyncio import logging from functools import wraps from contextlib import suppress -from collections import OrderedDict from datetime import datetime, timezone -from typing import ( - Any, - Literal, - Union, - List, - MutableSet, - Callable, - Iterable, - Iterator, - Coroutine, - Generic, - TypeVar, - TYPE_CHECKING, - cast, -) +from collections import abc, OrderedDict +from typing import Any, Literal, Generic, TypeVar, cast, TYPE_CHECKING from constants import JsonType @@ -50,13 +36,13 @@ def create_nonce(length: int = 30) -> str: return ''.join(random.choices(NONCE_CHARS, k=length)) -def deduplicate(iterable: Iterable[_T]) -> List[_T]: +def deduplicate(iterable: abc.Iterable[_T]) -> list[_T]: return list(OrderedDict.fromkeys(iterable).keys()) def task_wrapper( - afunc: Callable[_P, Coroutine[Any, Any, _T]] -) -> Callable[_P, Coroutine[Any, Any, _T]]: + afunc: abc.Callable[_P, abc.Coroutine[Any, Any, _T]] +) -> abc.Callable[_P, abc.Coroutine[Any, Any, _T]]: @wraps(afunc) async def wrapper(*args: _P.args, **kwargs: _P.kwargs): try: @@ -76,12 +62,12 @@ def invalidate_cache(instance, *attrnames): delattr(instance, name) -class OrderedSet(MutableSet[_T]): +class OrderedSet(abc.MutableSet[_T]): """ Implementation of a set that preserves insertion order, based on OrderedDict with values set to None. """ - def __init__(self, iterable: Iterable[_T] = [], /): + def __init__(self, iterable: abc.Iterable[_T] = [], /): self._items: OrderedDict[_T, None] = OrderedDict((item, None) for item in iterable) def __repr__(self) -> str: @@ -90,7 +76,7 @@ class OrderedSet(MutableSet[_T]): def __contains__(self, item: object, /) -> bool: return item in self._items - def __iter__(self) -> Iterator[_T]: + def __iter__(self) -> abc.Iterator[_T]: return iter(self._items) def __len__(self) -> int: @@ -103,13 +89,13 @@ class OrderedSet(MutableSet[_T]): with suppress(KeyError): del self._items[item] - def update(self, *others: Iterable[_T]) -> None: + def update(self, *others: abc.Iterable[_T]) -> None: for it in others: for item in it: if item not in self._items: self._items[item] = None - def difference_update(self, *others: Iterable[_T]) -> None: + def difference_update(self, *others: abc.Iterable[_T]) -> None: for it in others: for item in it: if item in self._items: @@ -124,10 +110,10 @@ class AwaitableValue(Generic[_T]): def has_value(self) -> bool: return self._event.is_set() - def wait(self) -> Coroutine[Any, Any, Literal[True]]: - return cast(Coroutine[Any, Any, Literal[True]], self._event.wait()) + def wait(self) -> abc.Coroutine[Any, Any, Literal[True]]: + return cast(abc.Coroutine[Any, Any, Literal[True]], self._event.wait()) - def get_with_default(self, default: _D) -> Union[_T, _D]: + def get_with_default(self, default: _D) -> _T | _D: if self._event.is_set(): return self._value return default diff --git a/websocket.py b/websocket.py index 56af13c..6d24660 100644 --- a/websocket.py +++ b/websocket.py @@ -5,7 +5,7 @@ import asyncio import logging from time import time from contextlib import suppress -from typing import Optional, List, Dict, Set, Iterable, TYPE_CHECKING +from typing import TYPE_CHECKING try: from websockets.exceptions import ConnectionClosed, ConnectionClosedOK @@ -15,18 +15,12 @@ except ModuleNotFoundError as exc: from exceptions import MinerException from utils import task_wrapper, create_nonce, AwaitableValue -from constants import ( - JsonType, - WebsocketTopic, - WEBSOCKET_URL, - PING_INTERVAL, - PING_TIMEOUT, - MAX_WEBSOCKETS, - WS_TOPICS_LIMIT, -) +from constants import WEBSOCKET_URL, PING_INTERVAL, PING_TIMEOUT, MAX_WEBSOCKETS, WS_TOPICS_LIMIT if TYPE_CHECKING: from twitch import Twitch + from collections import abc + from constants import JsonType, WebsocketTopic logger = logging.getLogger("TwitchDrops") @@ -49,10 +43,10 @@ class Websocket: self._next_ping: float = time() self._max_pong: float = self._next_ping + PING_TIMEOUT.total_seconds() # main task, responsible for receiving messages, sending them, and websocket ping - self._handle_task: Optional[asyncio.Task[None]] = None + self._handle_task: asyncio.Task[None] | None = None # topics stuff - self.topics: Dict[str, WebsocketTopic] = {} - self._submitted: Set[WebsocketTopic] = set() + self.topics: dict[str, WebsocketTopic] = {} + self._submitted: set[WebsocketTopic] = set() # notify GUI self.set_status("Disconnected") @@ -63,7 +57,7 @@ class Websocket: def wait_until_connected(self): return self._ws.wait() - def set_status(self, status: Optional[str] = None, refresh_topics: bool = False): + def set_status(self, status: str | None = None, refresh_topics: bool = False): self._twitch.gui.websockets.update( self._idx, status=status, topics=(len(self.topics) if refresh_topics else None) ) @@ -174,7 +168,7 @@ class Websocket: # nothing to do return self._topics_changed.clear() - current: Set[WebsocketTopic] = set(self.topics.values()) + current: set[WebsocketTopic] = set(self.topics.values()) # handle removed topics removed = self._submitted.difference(current) if removed: @@ -206,7 +200,7 @@ class Websocket: ) self._submitted.update(added) - async def _gather_recv(self, messages: List[JsonType]): + async def _gather_recv(self, messages: list[JsonType]): """ Gather incoming messages over the timeout specified. Note that there's no return value - this modifies `messages` in-place. @@ -231,7 +225,7 @@ class Websocket: Handle receiving messages from the websocket. """ # listen over 0.5s for incoming messages - messages: List[JsonType] = [] + messages: list[JsonType] = [] with suppress(asyncio.TimeoutError): await asyncio.wait_for(self._gather_recv(messages), timeout=0.5) # process them @@ -251,7 +245,7 @@ class Websocket: else: ws_logger.warning(f"Websocket[{self._idx}] received unknown payload: {message}") - def add_topics(self, topics_set: Set[WebsocketTopic]): + def add_topics(self, topics_set: set[WebsocketTopic]): changed: bool = False while topics_set and len(self.topics) < WS_TOPICS_LIMIT: topic = topics_set.pop() @@ -261,7 +255,7 @@ class Websocket: self._topics_changed.set() self.set_status(refresh_topics=True) - def remove_topics(self, topics_set: Set[str]): + def remove_topics(self, topics_set: set[str]): existing = topics_set.intersection(self.topics.keys()) if not existing: # nothing to remove from here @@ -285,7 +279,7 @@ class WebsocketPool: def __init__(self, twitch: Twitch): self._twitch: Twitch = twitch self._running = asyncio.Event() - self.websockets: List[Websocket] = [] + self.websockets: list[Websocket] = [] @property def running(self) -> bool: @@ -307,7 +301,7 @@ class WebsocketPool: self._running.clear() await asyncio.gather(*(ws.stop() for ws in self.websockets)) - def add_topics(self, topics: Iterable[WebsocketTopic]): + def add_topics(self, topics: abc.Iterable[WebsocketTopic]): # ensure no topics end up duplicated topics_set = set(topics) if not topics_set: @@ -335,7 +329,7 @@ class WebsocketPool: # if we're here, there were leftover topics after filling up all websockets raise MinerException("Maximum topics limit has been reached") - def remove_topics(self, topics: Iterable[str]): + def remove_topics(self, topics: abc.Iterable[str]): topics_set = set(topics) if not topics_set: # nothing to remove @@ -344,7 +338,7 @@ class WebsocketPool: ws.remove_topics(topics_set) # count up all the topics - if we happen to have more websockets connected than needed, # stop the last one and recycle topics from it - repeat until we have enough - recycled_topics: List[WebsocketTopic] = [] + recycled_topics: list[WebsocketTopic] = [] while True: count = sum(len(ws.topics) for ws in self.websockets) if count <= (len(self.websockets) - 1) * WS_TOPICS_LIMIT: