Huge GUI update

This commit is contained in:
DevilXD
2021-12-29 13:55:05 +01:00
parent bddbff7c35
commit c8d3b992d7
10 changed files with 1460 additions and 453 deletions

2
.gitignore vendored
View File

@@ -2,6 +2,6 @@ __pycache__
.mypy_cache
/build
/dist
/cookies.pickle
/cookies.jar
/settings.json
/*.spec

8
.vscode/launch.json vendored
View File

@@ -38,5 +38,13 @@
"console": "integratedTerminal",
"justMyCode": false
},
{
"name": "Debug GUI",
"type": "python",
"request": "launch",
"program": "gui.py",
"console": "integratedTerminal",
"justMyCode": false
},
]
}

View File

@@ -4,13 +4,16 @@ import re
import json
import asyncio
import logging
from time import time
from base64 import b64encode
from datetime import datetime, timezone
from typing import Any, Optional, TYPE_CHECKING
from inventory import Game
from exceptions import MinerException
from constants import JsonType, BASE_URL, GQL_OPERATIONS, ONLINE_DELAY, DROPS_ENABLED_TAG
from constants import (
JsonType, BASE_URL, GQL_OPERATIONS, ONLINE_DELAY, WATCH_INTERVAL, DROPS_ENABLED_TAG
)
if TYPE_CHECKING:
from twitch import Twitch
@@ -21,17 +24,17 @@ logger = logging.getLogger("TwitchDrops")
class Stream:
def __init__(self, channel: Channel, data: JsonType):
self._twitch = channel._twitch
self.channel = channel
self._twitch: Twitch = channel._twitch
self.channel: Channel = channel
stream = data["stream"]
self.broadcast_id = int(stream["id"])
self.viewer_count = stream["viewersCount"]
self.drops_enabled = any(tag["id"] == DROPS_ENABLED_TAG for tag in stream["tags"])
self.viewers: int = stream["viewersCount"]
self.drops_enabled: bool = any(tag["id"] == DROPS_ENABLED_TAG for tag in stream["tags"])
settings = data["broadcastSettings"]
self.game: Optional[Game] = None
if settings["game"] is not None:
self.game = Game(settings["game"])
self.title = settings["title"]
self.title: str = settings["title"]
self._timestamp = datetime.now(timezone.utc)
@classmethod
@@ -40,7 +43,7 @@ class Stream:
self._twitch = channel._twitch
self.channel = channel
self.broadcast_id = data["id"]
self.viewer_count = data["viewersCount"]
self.viewers = data["viewersCount"]
self.drops_enabled = any(tag["id"] == DROPS_ENABLED_TAG for tag in data["tags"])
self.game = Game(data["game"])
self.title = data["title"]
@@ -64,7 +67,8 @@ class Channel:
self.name: str = channel_name
self.url: str = f"{BASE_URL}/{channel_name}"
self._spade_url: Optional[str] = None
self.stream: Optional[Stream] = None
self.points: Optional[int] = None
self._stream: Optional[Stream] = None
self._pending_stream_up: Optional[asyncio.Task[Any]] = None
await self.get_stream()
@@ -77,10 +81,14 @@ class Channel:
self.name = channel["displayName"]
self.url = f"{BASE_URL}/{self.name}"
self._spade_url = None
self.stream = Stream.from_directory(self, data)
self.points = None
self._stream = Stream.from_directory(self, data)
self._pending_stream_up = None
return self
def __repr__(self) -> str:
return f"Channel({self.name}, {self.id})"
def __eq__(self, other: object):
if isinstance(other, self.__class__):
return self.id == other.id
@@ -89,12 +97,17 @@ class Channel:
def __hash__(self) -> int:
return hash((self.__class__.__name__, self.id))
@property
def iid(self) -> str:
# this is responsible for the ID/Key of the columns inside channel list
return str(self.id)
@property
def online(self) -> bool:
"""
Returns True if the streamer is online and is currently streaming, False otherwise.
"""
return self.stream is not None
return self._stream is not None
@property
def pending_online(self) -> bool:
@@ -105,6 +118,39 @@ class Channel:
"""
return self._pending_stream_up is not None
@property
def game(self) -> Optional[Game]:
if self._stream is not None and self._stream.game is not None:
return self._stream.game
return None
@property
def viewers(self) -> Optional[int]:
if self._stream is not None:
return self._stream.viewers
return None
@viewers.setter
def viewers(self, value: int):
if self._stream is not None:
self._stream.viewers = value
self.display()
@property
def drops_enabled(self) -> Optional[bool]:
if self._stream is not None:
return self._stream.drops_enabled
return None
def display(self):
self._twitch.gui.channels.display(self)
def remove(self):
if self._pending_stream_up is not None:
self._pending_stream_up.cancel()
self._pending_stream_up = None
self._twitch.gui.channels.remove(self)
async def get_spade_url(self) -> str:
"""
To get this monstrous thing, you have to walk a chain of requests.
@@ -137,10 +183,10 @@ class Channel:
stream_data = response["data"]["user"]
self.id = int(stream_data["id"]) # fill channel_id
if stream_data["stream"]:
self.stream = Stream(self, stream_data)
self._stream = Stream(self, stream_data)
else:
self.stream = None
return self.stream
self._stream = None
return self._stream
async def check_online(self) -> bool:
stream = await self.get_stream()
@@ -149,9 +195,11 @@ class Channel:
return True
async def _online_delay(self):
self.display()
await asyncio.sleep(ONLINE_DELAY.total_seconds())
await self.get_stream()
await self.check_online()
self._pending_stream_up = None
self.display()
def set_online(self):
if self.online or self.pending_online:
@@ -160,22 +208,42 @@ class Channel:
# stream-up is sent before the stream actually goes online, so just wait a bit
# and check if it's actually online by then
self._pending_stream_up = asyncio.create_task(self._online_delay())
# display is called from within the task
def set_offline(self):
# to be called externally, if we receive an event about this happening
if self._pending_stream_up is not None:
self._pending_stream_up.cancel()
self._pending_stream_up = None
self.stream = None
self._stream = None
self.display()
async def claim_bonus(self):
# this also fills out the 'points' attribute
response = await self._twitch.gql_request(
GQL_OPERATIONS["ChannelPointsContext"].with_variables({"channelLogin": self.name})
)
channel_data: JsonType = response["data"]["community"]["channel"]
self.points = channel_data["self"]["communityPoints"]["balance"]
claim_available: JsonType = (
channel_data["self"]["communityPoints"]["availableClaim"]
)
if claim_available:
await self._twitch.claim_points(channel_data["id"], claim_available["id"])
logger.info("Claimed bonus points")
else:
# calling claim_points is going to refresh the display, so if we're not calling it,
# we need to do it ourselves
self.display()
def _encode_payload(self):
assert self.stream is not None
assert self._stream is not None
payload = [
{
"event": "minute-watched",
"properties": {
"channel_id": self.id,
"broadcast_id": self.stream.broadcast_id,
"broadcast_id": self._stream.broadcast_id,
"player": "site",
"user_id": self._twitch._user_id,
}
@@ -184,17 +252,33 @@ class Channel:
json_event = json.dumps(payload, separators=(",", ":"))
return {"data": (b64encode(json_event.encode("utf8"))).decode("utf8")}
async def _send_watch(self):
async def _send_watch(self) -> bool:
"""
This uses the encoded payload on spade url to simulate watching the stream.
Optimally, send every 60 seconds to advance drops.
"""
if not self.online:
return
return False
if self._spade_url is None:
self._spade_url = await self.get_spade_url()
logger.debug(f"Sending minute-watched to {self.name}")
self._twitch._last_watch = time()
async with self._twitch._session.post(
self._spade_url, data=self._encode_payload()
) as response:
return response.status == 204
async def watch_loop(self):
# last_watch is a timestamp of the last time we've sent a watch payload
# We need this because watch_loop can be cancelled and rescheduled multiple times
# in quick succession, and apparently Twitch doesn't like that very much
interval = WATCH_INTERVAL.total_seconds()
await asyncio.sleep(self._twitch._last_watch + interval - time())
i = 0
while True:
await self._send_watch()
if i == 0:
# ensure every 30 minutes that we don't have unclaimed points bonus
await self.claim_bonus()
i = (i + 1) % 30
await asyncio.sleep(interval)

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from copy import copy
from enum import Enum, auto
from datetime import timedelta
from typing import Any, Optional, Dict, Literal, Callable
@@ -23,15 +24,22 @@ USER_AGENT = (
)
# Paths
SETTINGS_PATH = "settings.json"
COOKIES_PATH = "cookies.pickle"
COOKIES_PATH = "cookies.jar"
# Intervals and Delays
PING_INTERVAL = timedelta(minutes=3)
PING_TIMEOUT = timedelta(seconds=10)
ONLINE_DELAY = timedelta(seconds=30)
ONLINE_DELAY = timedelta(seconds=60)
WATCH_INTERVAL = timedelta(seconds=58.5)
# Tags
DROPS_ENABLED_TAG = "c2542d6d-cd10-4532-919b-3d19f30a768b"
# Strings
TERMINATED_STR = "Application Terminated.\nClose the console window to exit the application."
class State(Enum):
INVENTORY_FETCH = auto()
GAME_SELECT = auto()
CHANNEL_FETCH = auto()
CHANNEL_CLEANUP = auto()
CHANNEL_SWITCH = auto()
class GQLOperation(JsonType):
@@ -83,6 +91,10 @@ GQL_OPERATIONS: Dict[str, GQLOperation] = {
"Inventory",
"e0765ebaa8e8eeb4043cc6dfeab3eac7f682ef5f724b81367e6e55c7aef2be4c",
),
"CurrentDrop": GQLOperation(
"DropCurrentSessionContext",
"2e4b3630b91552eb05b76a94b6850eb25fe42263b7cf6d06bee6d156dd247c1c",
),
"ViewerDropsDashboard": GQLOperation(
"ViewerDropsDashboard",
"c4d61d7b71d03b324914d3cf8ca0bc23fe25dacf54120cc954321b9704a3f4e2",

827
gui.py Normal file
View File

@@ -0,0 +1,827 @@
from __future__ import annotations
import asyncio
import logging
import tkinter as tk
from math import log10, ceil
from tkinter.font import Font
from collections import namedtuple, OrderedDict
from tkinter import Tk, ttk, StringVar, DoubleVar
from typing import Any, Optional, List, Dict, Set, TypedDict, Iterable, NoReturn, TYPE_CHECKING
from version import __version__
from constants import WS_TOPICS_LIMIT, MAX_WEBSOCKETS, State
if TYPE_CHECKING:
from twitch import Twitch
from channel import Channel
from inventory import Game, TimedDrop
digits = ceil(log10(WS_TOPICS_LIMIT))
WS_FONT = ("Courier New", 10)
class TKOutputHandler(logging.Handler):
def __init__(self, output: GUIManager):
super().__init__()
self._output = output
def emit(self, record):
self._output.print(self.format(record))
class PlaceholderEntry(ttk.Entry):
def __init__(
self,
master,
*args,
placeholder: str,
placeholdercolor: str = "grey60",
**kwargs,
):
super().__init__(master, *args, **kwargs)
self._show: str = kwargs.get("show", '')
self._text_color: str = kwargs.get("foreground", '')
self._ph_color: str = placeholdercolor
self._ph_text: str = placeholder
self.bind("<FocusIn>", self._focus_in)
self.bind("<FocusOut>", self._focus_out)
self._ph: bool = False
self._focus_out(None)
def _focus_in(self, event):
"""
On focus in, if we've had a placeholder, clear the box and set normal text colour and show.
"""
if self._ph:
self._ph = False
self.config(foreground=self._text_color, show=self._show)
self.delete(0, "end")
def _focus_out(self, event):
"""
On focus out, if we're empty, insert a placeholder,
set placeholder text color and make sure it's shown.
If we're not empty, leave the box as is.
"""
if not super().get():
self._ph = True
self.config(foreground=self._ph_color, show='')
self.insert(0, self._ph_text)
def _store_option(self, options: Dict[str, Any], attr: str, name: str):
value = options.get(name)
if value is not None:
setattr(self, attr, value)
def configure(self, *args, **kwargs):
if args:
options = args[0]
if kwargs:
options = kwargs
self._store_option(options, "_show", "show")
self._store_option(options, "_ph_text", "placeholder")
self._store_option(options, "_text_color", "foreground")
self._store_option(options, "_ph_color", "placeholdercolor")
super().configure(*args, *kwargs)
def get(self):
if self._ph:
return ''
return super().get()
def clear(self):
self.delete(0, "end")
self._ph = True
self.config(foreground=self._ph_color, show='')
self.insert(0, self._ph_text)
class _WSEntry(TypedDict):
status: str
topics: int
class WebsocketStatus:
def __init__(self, manager: GUIManager, master: tk.Misc):
self._status_var = StringVar()
self._topics_var = StringVar()
frame = ttk.LabelFrame(master, text="Websocket Status", padding=(4, 0, 4, 4))
frame.grid(column=0, row=0, sticky="nsew", padx=2)
ttk.Label(
frame,
text='\n'.join(f"Websocket #{i}:" for i in range(1, MAX_WEBSOCKETS + 1)),
font=WS_FONT,
).grid(column=0, row=0)
ttk.Label(
frame,
textvariable=self._status_var,
width=16,
justify="left",
font=WS_FONT,
).grid(column=1, row=0)
ttk.Label(
frame,
textvariable=self._topics_var,
width=(digits * 2 + 1),
justify="right",
font=WS_FONT,
).grid(column=2, row=0)
self._items: Dict[int, Optional[_WSEntry]] = {i: None for i in range(MAX_WEBSOCKETS)}
self._update()
def update(self, idx: int, status: Optional[str] = None, topics: Optional[int] = None):
if status is None and topics is None:
raise TypeError("You need to provide at least one of: status, topics")
entry = self._items.get(idx)
if entry is None:
entry = self._items[idx] = _WSEntry(status="Disconnected", topics=0)
if status is not None:
entry["status"] = status
if topics is not None:
entry["topics"] = topics
self._update()
def remove(self, idx: int):
if idx in self._items:
del self._items[idx]
def _update(self):
status_lines: List[str] = []
topic_lines: List[str] = []
for idx in range(MAX_WEBSOCKETS):
item = self._items.get(idx)
if item is None:
status_lines.append('')
topic_lines.append('')
else:
status_lines.append(item["status"])
topic_lines.append(f"{item['topics']:>{digits}}/{WS_TOPICS_LIMIT}")
self._status_var.set('\n'.join(status_lines))
self._topics_var.set('\n'.join(topic_lines))
LoginData = namedtuple("LoginData", ["username", "password", "token"])
class LoginForm:
def __init__(self, manager: GUIManager, master: tk.Misc):
self._manager = manager
self._var = StringVar()
frame = ttk.LabelFrame(master, text="Login Status", padding=(4, 0, 4, 4))
frame.grid(column=1, row=0, sticky="nsew", padx=2)
frame.columnconfigure(0, weight=2)
frame.columnconfigure(1, weight=1)
frame.rowconfigure(4, weight=1)
ttk.Label(frame, text=("Status:\nUser ID:")).grid(column=0, row=0)
ttk.Label(frame, textvariable=self._var, justify="center").grid(column=1, row=0)
self._login_entry = PlaceholderEntry(frame, placeholder="Login")
self._login_entry.grid(column=0, row=1, columnspan=2)
self._pass_entry = PlaceholderEntry(frame, placeholder="Password", show='')
self._pass_entry.grid(column=0, row=2, columnspan=2)
self._token_entry = PlaceholderEntry(frame, placeholder="2FA Code")
self._token_entry.grid(column=0, row=3, columnspan=2)
self._confirm = asyncio.Event()
self._button = ttk.Button(frame, text="Login", command=self._confirm.set, state="disabled")
self._button.grid(column=0, row=4, columnspan=2)
self.update("Logged out", None)
def clear(self, login: bool = False, password: bool = False, token: bool = False):
clear_all = not login and not password and not token
if login or clear_all:
self._login_entry.clear()
if password or clear_all:
self._pass_entry.clear()
if token or clear_all:
self._token_entry.clear()
async def ask_login(self) -> LoginData:
self._manager.print("Please log in.")
self._confirm.clear()
self._button.config(state="normal")
await self._confirm.wait()
self._button.config(state="disabled")
data = LoginData(self._login_entry.get(), self._pass_entry.get(), self._token_entry.get())
return data
def update(self, status: str, user_id: Optional[int]):
if user_id is not None:
user_str = str(user_id)
else:
user_str = "-"
self._var.set(f"{status}\n{user_str}")
class GameSelector:
def __init__(self, manager: GUIManager, master: tk.Misc):
self._manager = manager
self._var = StringVar()
frame = ttk.LabelFrame(master, text="Game Selector", padding=(4, 0, 4, 4))
frame.grid(column=1, row=1, sticky="nsew", padx=2)
frame.columnconfigure(0, weight=1)
self._list = tk.Listbox(
frame,
height=5,
selectmode="single",
activestyle="none",
exportselection=False,
highlightthickness=0,
)
self._list.pack(fill="both", expand=True)
self._selection: Optional[str] = self._manager._twitch._options.game
self._games: OrderedDict[str, Game] = OrderedDict()
self._list.bind("<<ListboxSelect>>", self._on_select)
@property
def selected(self) -> Optional[str]:
return self._selection
def set_games(self, games: Iterable[Game]):
self._games.clear()
self._games.update((str(g), g) for g in sorted(games, key=lambda g: g.name))
self._list.delete(0, "end")
self._list.insert("end", *self._games.keys())
self._list.config(width=0) # autoadjust listbox width
if self._selection is not None:
selected_index: Optional[int] = next(
(
i
for i, str_game in enumerate(self._games.keys())
if str_game == self._selection
),
None,
)
if selected_index is not None:
# reselect the currently selected item
self._list.selection_set(selected_index)
else:
# the game we've had selected isn't there anymore - clear selection
self._selection = None
def _on_select(self, event):
current = self._list.curselection()
if not current:
# can happen when the user clicks on an empty list
self._selection = None
else:
self._selection = self._list.get(current[0])
def get_selection(self) -> Game:
if self._selection is None:
if not self._games:
raise RuntimeError("No games to select from")
# select and return the first game from the list
self._list.selection_set(0)
first_game = next(iter(self._games.values()))
self._selection = str(first_game)
return first_game
return self._games[self._selection]
def get_next_selection(self) -> Optional[Game]:
current = self._list.curselection()
if not current:
return self.get_selection()
game_name = self._list.get(current[0]+1)
if game_name:
return self._games[game_name]
else:
# this was the last game on the list
return None
class _BaseVars(TypedDict):
progress: DoubleVar
percentage: StringVar
remaining: StringVar
minutes: int
class _CampaignVars(_BaseVars):
name: StringVar
class _DropVars(_BaseVars):
rewards: StringVar
class _ProgressVars(TypedDict):
campaign: _CampaignVars
drop: _DropVars
seconds: int
class CampaignProgress:
BAR_LENGTH = 240
def __init__(self, manager: GUIManager, master: tk.Misc):
self._vars: _ProgressVars = {
"campaign": {
"name": StringVar(), # campaign name
"progress": DoubleVar(), # controls the progress bar
"percentage": StringVar(), # percentage display string
"remaining": StringVar(), # time remaining string
"minutes": 0, # remaining minutes
},
"drop": {
"rewards": StringVar(), # drop rewards
"progress": DoubleVar(), # as above
"percentage": StringVar(), # as above
"remaining": StringVar(), # as above
"minutes": 0, # as above
},
"seconds": 1, # remaining seconds (common for both campaign and drop)
}
self._frame = frame = ttk.LabelFrame(
master, text="Campaign Progress", padding=(4, 0, 4, 4)
)
frame.grid(column=0, row=1, sticky="nsew", padx=2)
frame.columnconfigure(0, weight=2)
frame.columnconfigure(1, weight=1)
ttk.Label(frame, text="Campaign:").grid(column=0, row=0, columnspan=2)
ttk.Label(
frame, textvariable=self._vars["campaign"]["name"]
).grid(column=0, row=1, columnspan=2)
ttk.Label(frame, text="Progress:").grid(column=0, row=2, rowspan=2)
ttk.Label(frame, textvariable=self._vars["campaign"]["percentage"]).grid(column=1, row=2)
ttk.Label(frame, textvariable=self._vars["campaign"]["remaining"]).grid(column=1, row=3)
ttk.Progressbar(
frame,
mode="determinate",
length=self.BAR_LENGTH,
maximum=1,
variable=self._vars["campaign"]["progress"],
).grid(column=0, row=4, columnspan=2)
ttk.Separator(
frame, orient="horizontal"
).grid(row=5, columnspan=2, sticky="ew", pady=(4, 0))
ttk.Label(frame, text="Drop:").grid(column=0, row=6, columnspan=2)
ttk.Label(
frame, textvariable=self._vars["drop"]["rewards"]
).grid(column=0, row=7, columnspan=2)
ttk.Label(frame, text="Progress:").grid(column=0, row=8, rowspan=2)
ttk.Label(frame, textvariable=self._vars["drop"]["percentage"]).grid(column=1, row=8)
ttk.Label(frame, textvariable=self._vars["drop"]["remaining"]).grid(column=1, row=9)
ttk.Progressbar(
frame,
mode="determinate",
length=self.BAR_LENGTH,
maximum=1,
variable=self._vars["drop"]["progress"],
).grid(column=0, row=10, columnspan=2)
self._timer_task: Optional[asyncio.Task[None]] = None
self._update_time()
def _update_time(self) -> bool:
# read vars
minutes_changed: bool = False
seconds: int = self._vars["seconds"]
drop_vars: _DropVars = self._vars["drop"]
campaign_vars: _CampaignVars = self._vars["campaign"]
drop_minutes: int = drop_vars["minutes"]
campaign_minutes: int = campaign_vars["minutes"]
# handle seconds
if seconds <= 0:
if drop_minutes > 0:
drop_minutes -= 1
minutes_changed = True
if campaign_minutes > 0:
campaign_minutes -= 1
minutes_changed = True
if minutes_changed:
seconds = 60
if seconds > 0:
seconds -= 1
# display time
hours, minutes = divmod(drop_minutes, 60)
drop_vars["remaining"].set(f"{hours:>2}:{minutes:02}:{seconds:02} remaining")
hours, minutes = divmod(campaign_minutes, 60)
campaign_vars["remaining"].set(f"{hours:>2}:{minutes:02}:{seconds:02} remaining")
# store back
self._vars["seconds"] = seconds
if minutes_changed:
drop_vars["minutes"] = drop_minutes
campaign_vars["minutes"] = campaign_minutes
# if there's no time left, stop the loop
if campaign_minutes + drop_minutes + seconds > 0:
return True
return False
async def _timer_loop(self):
run = self._update_time()
while run:
await asyncio.sleep(1)
run = self._update_time()
self._timer_task = None
def start_timer(self):
if self._timer_task is None:
self._vars["seconds"] = 1
self._timer_task = asyncio.create_task(self._timer_loop())
def stop_timer(self):
if self._timer_task is not None:
self._timer_task.cancel()
self._timer_task = None
def restart_timer(self):
self.stop_timer()
self.start_timer()
def update(self, drop: TimedDrop):
# campaign update
campaign = drop.campaign
vars_campaign = self._vars["campaign"]
vars_campaign["name"].set(campaign.name)
vars_campaign["progress"].set(campaign.progress)
vars_campaign["percentage"].set(
f"{campaign.progress:6.1%} ({campaign.claimed_drops}/{campaign.total_drops})"
)
vars_campaign["minutes"] = campaign.remaining_minutes
# drop update
vars_drop = self._vars["drop"]
vars_drop["rewards"].set(drop.rewards_text())
vars_drop["progress"].set(drop.progress)
vars_drop["percentage"].set(f"{drop.progress:6.1%}")
vars_drop["minutes"] = drop.remaining_minutes
# reschedule our seconds update timer
self.restart_timer()
class ConsoleOutput:
def __init__(self, manager: GUIManager, master: tk.Misc):
frame = ttk.LabelFrame(master, text="Output", padding=(4, 0, 4, 4))
frame.grid(column=0, row=2, columnspan=2, sticky="nsew", padx=2)
frame.rowconfigure(0, weight=1) # let the frame expand
frame.columnconfigure(0, weight=1)
master.rowconfigure(2, weight=1) # tell master frame that the containing row can expand
xscroll = ttk.Scrollbar(frame, orient="horizontal")
yscroll = ttk.Scrollbar(frame, orient="vertical")
self._text = tk.Text(
frame,
exportselection=False,
height=10,
width=52,
wrap="none",
state="disabled",
xscrollcommand=xscroll.set,
yscrollcommand=yscroll.set,
)
xscroll.config(command=self._text.xview)
yscroll.config(command=self._text.yview)
self._text.grid(column=0, row=0, sticky="nsew")
xscroll.grid(column=0, row=1, sticky="ew")
yscroll.grid(column=1, row=0, sticky="ns")
def print(self, *values, sep: str = ' ', end: str = '\n'):
self._text.config(state="normal")
self._text.insert("end", f"{sep.join(values)}{end}")
self._text.see("end") # scroll to the newly added line
self._text.config(state="disabled")
class Buttons(TypedDict):
frame: ttk.Frame
cleanup: ttk.Button
switch: ttk.Button
load_points: ttk.Button
class ChannelList:
def __init__(self, manager: GUIManager, master: tk.Misc):
self._manager = manager
frame = ttk.LabelFrame(master, text="Channels", padding=(4, 0, 4, 4))
frame.grid(column=2, row=0, rowspan=3, sticky="nsew", padx=2)
frame.rowconfigure(1, weight=1)
frame.columnconfigure(0, weight=1)
# tell master frame that the containing column can expand
master.columnconfigure(2, weight=1)
buttons_frame = ttk.Frame(frame)
self._buttons: Buttons = {
"frame": buttons_frame,
"cleanup": ttk.Button(
buttons_frame,
text="Cleanup",
command=manager._twitch.state_change(State.CHANNEL_CLEANUP),
),
"switch": ttk.Button(
buttons_frame,
text="Switch",
state="disabled",
command=manager._twitch.state_change(State.CHANNEL_SWITCH),
),
"load_points": ttk.Button(
buttons_frame, text="Load Points", command=self._load_points
),
}
buttons_frame.grid(column=0, row=0, columnspan=2)
self._buttons["cleanup"].grid(column=0, row=0)
self._buttons["switch"].grid(column=1, row=0)
self._buttons["load_points"].grid(column=2, row=0)
scroll = ttk.Scrollbar(frame, orient="vertical")
self._table = table = ttk.Treeview(
frame,
columns=("channel", "status", "game", "viewers", "points"),
yscrollcommand=scroll.set,
)
scroll.config(command=table.yview)
table.grid(column=0, row=1, sticky="nsew")
scroll.grid(column=1, row=1, sticky="ns")
self._font = Font(frame, manager._style.lookup("Treeview", "font"))
self._const_width: Set[str] = set()
table.tag_configure("watching", background="gray70")
table.bind("<Button-1>", self._disable_column_resize)
table.bind("<<TreeviewSelect>>", self._selected)
self._column("#0", '', width=0)
self._column("channel", "Channel", width=100, anchor='w')
self._column("status", "Status", width_template="OFFLINE ❌")
self._column("game", "Game", width=50)
self._column("viewers", "Viewers", width_template="0000000")
self._column("points", "Points", width_template="0000000")
self._channel_map: Dict[str, Channel] = {}
def _column(
self,
cid: str,
name: str,
*,
anchor: str = "center",
width: Optional[int] = None,
width_template: Optional[str] = None,
):
if width_template is not None:
width = self._measure(width_template)
self._const_width.add(cid)
assert width is not None
self._table.column(cid, width=width, stretch=False)
self._table.heading(cid, text=name, anchor=anchor)
def _disable_column_resize(self, event):
if self._table.identify_region(event.x, event.y) == "separator":
return "break"
def _selected(self, event):
selection = self._table.selection()
if selection:
self._buttons["switch"].config(state="normal")
else:
self._buttons["switch"].config(state="disabled")
def _load_points(self):
# disable the button afterwards
self._buttons["load_points"].config(state="disabled")
asyncio.gather(*(ch.claim_bonus() for ch in self._manager._twitch.channels.values()))
def _measure(self, text: str) -> int:
# we need this because columns have 9-10 pixels of padding that cuts text off
return self._font.measure(text) + 10
def _adjust_width(self, column: str, value: str):
# causes the column to expand if the value's width is greater than the current width
if column in self._const_width:
return
value_width = self._measure(value)
curr_width = self._table.column(column, "width")
if value_width > curr_width:
self._table.column(column, minwidth=value_width, width=value_width)
self._table.event_generate("<<ThemeChanged>>") # force redraw
def _set(self, iid: str, column: str, value: str):
self._table.set(iid, column, value)
self._adjust_width(column, value)
def _insert(self, iid: str, *args: str):
self._table.insert(parent='', index="end", iid=iid, values=args)
for column, value in zip(self._table.cget("columns"), args):
self._adjust_width(column, value)
def clear_watching(self):
for iid in self._table.tag_has("watching"):
self._table.item(iid, tags='')
def set_watching(self, channel: Channel):
self.clear_watching()
self._table.item(channel.iid, tags="watching")
def get_selection(self) -> Optional[Channel]:
if not self._channel_map:
return None
selection = self._table.selection()
if not selection:
return None
return self._channel_map[selection[0]]
def clear_selection(self):
self._table.selection_set('')
def display(self, channel: Channel):
# status
if channel.online:
status_str = "ONLINE ✅"
elif channel.pending_online:
status_str = "OFFLINE ⏰"
else:
status_str = "OFFLINE ❌"
# game
game_str = str(channel.game or '')
# viewers
viewers_str = ''
if channel.viewers is not None:
viewers_str = str(channel.viewers)
# points
points_str = ''
if channel.points is not None:
points_str = str(channel.points)
iid = channel.iid
if self._table.exists(iid):
self._set(iid, "status", status_str)
self._set(iid, "game", game_str)
self._set(iid, "viewers", viewers_str)
if points_str:
self._set(iid, "points", points_str)
else:
self._channel_map[iid] = channel
self._insert(iid, channel.name, status_str, game_str, viewers_str, points_str)
def remove(self, channel: Channel):
iid = channel.iid
del self._channel_map[iid]
self._table.delete(iid)
class GUIManager:
def __init__(self, twitch: Twitch):
self._twitch: Twitch = twitch
self._poll_task: Optional[asyncio.Task[NoReturn]] = None
self._closed = asyncio.Event()
self._root = root = Tk()
root.resizable(False, True)
root.iconbitmap("pickaxe.ico") # window icon
root.title(f"Twitch Drops Miner v{__version__} (by DevilXD)") # window title
root.protocol("WM_DELETE_WINDOW", self._on_close)
root.bind_all("<KeyPress-Escape>", self.unfocus)
self._style = ttk.Style(root)
self._style.map(
"Treeview",
foreground=self._fixed_map("foreground"),
background=self._fixed_map("background"),
)
main_frame = ttk.Frame(root, padding=8)
main_frame.grid(sticky="nsew")
root.rowconfigure(0, weight=1)
root.columnconfigure(0, weight=1)
self.websockets = WebsocketStatus(self, main_frame)
self.login = LoginForm(self, main_frame)
self.progress = CampaignProgress(self, main_frame)
self.games = GameSelector(self, main_frame)
self.output = ConsoleOutput(self, main_frame)
self.channels = ChannelList(self, main_frame)
# clamp minimum window height (update first, so that geometry calculates the size)
root.update_idletasks()
root.minsize(width=0, height=root.winfo_reqheight())
# register logging handler
handler = TKOutputHandler(self)
handler.setFormatter(
logging.Formatter("{asctime}: {levelname}: {message}", style='{', datefmt="%H:%M:%S")
)
logging.getLogger("TwitchDrops").addHandler(handler)
# https://stackoverflow.com/questions/56329342/tkinter-treeview-background-tag-not-working
def _fixed_map(self, option):
# Fix for setting text colour for Tkinter 8.6.9
# From: https://core.tcl.tk/tk/info/509cafafae
#
# Returns the style map for 'option' with any styles starting with
# ('!disabled', '!selected', ...) filtered out.
# style.map() returns an empty list for missing options, so this
# should be future-safe.
return [
elm for elm in self._style.map("Treeview", query_opt=option)
if elm[:2] != ("!disabled", "!selected")
]
@property
def running(self) -> bool:
return self._poll_task is not None
@property
def close_requested(self) -> bool:
return self._closed.is_set()
def start(self):
if self._poll_task is None:
self._poll_task = asyncio.create_task(self._poll())
# self.progress.start_timer()
def stop(self):
self.progress.stop_timer()
if self._poll_task is not None:
self._poll_task.cancel()
self._poll_task = None
async def _poll(self):
"""
This runs the Tkinter event loop via asyncio instead of calling mainloop.
0.05s gives similar performance and CPU usage.
Not ideal, but the simplest way to avoid threads, thread safety,
loop.call_soon_threadsafe, futures and all of that.
"""
update = self._root.update
while True:
update()
await asyncio.sleep(0.05)
def unfocus(self, event):
self._root.focus_set()
self.channels.clear_selection()
def _on_close(self):
self._closed.set()
# notify client we're supposed to close
self._twitch.request_close()
def prevent_close(self):
self._closed.clear()
async def wait_until_closed(self):
# wait until the user closes the window
await self._closed.wait()
def close(self):
self.stop()
if self._root is not None:
self._root.destroy()
self._closed.set()
def print(self, *args, **kwargs):
# print to our custom output
self.output.print(*args, **kwargs)
if __name__ == "__main__":
# Everything below is for debug purposes only
from types import SimpleNamespace
class StrNamespace(SimpleNamespace):
def __str__(self):
if hasattr(self, "_str__"):
return self._str__(self)
return super().__str__()
def state_change(state: State):
def changer(state: State = state):
gui.print(f"State change: {state.value}")
return changer
gui: GUIManager
mock = SimpleNamespace(
_options=SimpleNamespace(game=None),
state_change=state_change,
)
gui = GUIManager(mock) # type: ignore
mock.request_close = gui._root.destroy
def create_game(id: int, name: str):
return StrNamespace(name=name, id=id, _str__=lambda s: s.name)
iid = 0
def create_channel(name: str, online: int, game: Optional[str], viewers: int, points: int):
if online == 1:
online = False
pending = True
else:
pending = False
if game is not None:
game_obj: Optional[StrNamespace] = create_game(0, game)
else:
game_obj = None
global iid
return SimpleNamespace(
name=name,
iid=(iid := iid + 1),
points=points,
online=bool(online),
pending_online=pending,
game=game_obj,
viewers=viewers,
)
# Game selctor
gui.games.set_games([
create_game(491115, "Paladins"),
create_game(460630, "Tom Clancy's Rainbow Six Siege"),
])
game = gui.games.get_next_selection()
game = gui.games.get_next_selection()
game = gui.games.get_next_selection()
# Channel list
gui.channels.display(create_channel("PaladinsGame", 0, None, 0, 0))
channel = create_channel("Traitus", 1, None, 0, 0)
gui.channels.display(channel)
gui.channels.display(create_channel("Testus", 2, "Paladins", 42, 1234567))
gui.channels.set_watching(channel)
gui._root.update()
gui.channels.get_selection()
gui._root.mainloop()

View File

@@ -17,6 +17,9 @@ class Game:
def __str__(self) -> str:
return self.name
def __repr__(self) -> str:
return f"Game({self.id}, {self.name})"
def __eq__(self, other: object):
if isinstance(other, self.__class__):
return self.id == other.id
@@ -93,15 +96,14 @@ class TimedDrop(BaseDrop):
@property
def remaining_minutes(self) -> int:
return self.required_minutes - self.current_minutes
return self.required_minutes - self.current_minutes + 1
@property
def progress(self) -> float:
return self.current_minutes / self.required_minutes
def update(self, message: JsonType):
# {"type": "drop-progress", data: {"current_progress_min": 3, "required_progress_min": 10}}
# {"type": "drop-claim", data: {"drop_instance_id": ...}}
# See Twitch.process_drop for message examples
msg_type = message["type"]
if msg_type == "drop-progress":
self.current_minutes = message["data"]["current_progress_min"]
@@ -109,6 +111,10 @@ class TimedDrop(BaseDrop):
elif msg_type == "drop-claim":
self.claim_id = message["data"]["drop_instance_id"]
def bump_minutes(self):
if self.current_minutes < self.required_minutes:
self.current_minutes += 1
class DropsCampaign:
def __init__(self, twitch: Twitch, data: JsonType):
@@ -149,3 +155,9 @@ class DropsCampaign:
def get_drop(self, drop_id: str) -> Optional[TimedDrop]:
return self.timed_drops.get(drop_id)
def get_active_drop(self) -> Optional[TimedDrop]:
for drop in self.timed_drops.values():
if drop.can_earn:
return drop
return None

99
main.py
View File

@@ -1,51 +1,18 @@
from __future__ import annotations
__version__ = 4
import sys
import json
import ctypes
import logging
import asyncio
import argparse
import warnings
import traceback
import threading
from typing import NoReturn
from typing import Optional
from twitch import Twitch
from constants import JsonType, SETTINGS_PATH, TERMINATED_STR
try:
import win32api
from win32con import CTRL_CLOSE_EVENT, CTRL_LOGOFF_EVENT, CTRL_SHUTDOWN_EVENT
except (ImportError, ModuleNotFoundError):
raise ImportError("You have to run 'python -m pip install pywin32' first")
# disable some warnings
warnings.simplefilter("ignore", RuntimeWarning)
warnings.simplefilter("ignore", DeprecationWarning)
# nice console title
try:
ctypes.windll.kernel32.SetConsoleTitleW(f"Twitch Drops Miner v{__version__} (by DevilXD)")
except AttributeError:
# ensure we're on windows and there was no import problems
print("Only Windows supported!")
quit()
assert sys.platform == "win32"
def terminate() -> NoReturn:
forever = threading.Event()
print(f"\n{TERMINATED_STR}")
forever.wait()
raise RuntimeError("Uh oh") # this will never run, solely for MyPy
from version import __version__
class ParsedArgs(argparse.Namespace):
_verbose: int
_debug_ws: bool
_debug_gql: bool
game: Optional[str]
@property
def logging_level(self) -> int:
@@ -87,6 +54,7 @@ parser.add_argument("-V", "--version", action="version", version=f"v{__version__
parser.add_argument("-v", dest="_verbose", action="count", default=0)
parser.add_argument("--debug-ws", dest="_debug_ws", action="store_true")
parser.add_argument("--debug-gql", dest="_debug_gql", action="store_true")
parser.add_argument("-g", "--game", default=None)
options: ParsedArgs = parser.parse_args(namespace=ParsedArgs())
# handle logging stuff
if options.logging_level > logging.DEBUG:
@@ -94,62 +62,9 @@ if options.logging_level > logging.DEBUG:
# that aren't ours. This always runs, unless the main logging level is DEBUG or below.
logging.getLogger().addHandler(logging.NullHandler())
logger = logging.getLogger("TwitchDrops")
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("{levelname}: {message}", style='{'))
logger.addHandler(handler)
logger.setLevel(options.logging_level)
logging.getLogger("TwitchDrops.gql").setLevel(options.debug_gql)
logging.getLogger("TwitchDrops.websocket").setLevel(options.debug_ws)
# handle settings
try:
with open(SETTINGS_PATH, 'r', encoding="utf8") as file:
settings: JsonType = json.load(file)
except json.JSONDecodeError as exc:
print(f"Error while reading the settings file:\n{str(exc)}")
terminate()
except FileNotFoundError:
settings = {}
# asyncio loop
loop = asyncio.get_event_loop()
# client init
client = Twitch(settings.get("username"), settings.get("password"))
# main task and it's close event
main_task = loop.create_task(client.run(settings.get("channels")))
close_event = threading.Event()
def clean_exit(code: int):
if code not in (CTRL_CLOSE_EVENT, CTRL_LOGOFF_EVENT, CTRL_SHUTDOWN_EVENT):
# filter only events we want
return False
# cancel the main task - this triggers the cleanup
main_task.cancel()
# wait until cleanup completes
close_event.wait()
# tell OS that we're free to exit now
return True
# ensures clean exit upon closing the console
win32api.SetConsoleCtrlHandler(clean_exit, True)
try:
loop.run_until_complete(main_task)
except (asyncio.CancelledError, KeyboardInterrupt):
# KeyboardInterrupt causes run_until_complete to exit, but without cancelling the task.
# The loop stops and thus the task gets frozen, until the loop runs again.
# Because we don't want anything from there to actually run during cleanup,
# we need to explicitly cancel the task ourselves here.
main_task.cancel()
# main_task was cancelled due to program shutting down - do the cleanup
loop.run_until_complete(client.close())
# notify we're free to exit
close_event.set()
except Exception:
# Remove the handler so it doesn't delay exit
win32api.SetConsoleCtrlHandler(clean_exit, False)
print("Fatal error encountered:\n")
traceback.print_exc()
terminate()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
# client run
client = Twitch(options)
client.start()

698
twitch.py
View File

@@ -1,12 +1,14 @@
from __future__ import annotations
import os
import msvcrt
import asyncio
import logging
import traceback
from yarl import URL
from time import time
from itertools import chain
from functools import partial
from typing import Any, Optional, Union, List, Dict, Collection, cast
from typing import Any, Callable, Iterable, Optional, Union, List, Dict, Set, cast, TYPE_CHECKING
try:
import aiohttp
@@ -14,10 +16,12 @@ except ImportError:
raise ImportError("You have to run 'python -m pip install aiohttp' first")
from channel import Channel
from websocket import WebsocketPool
from inventory import DropsCampaign, Game
from gui import GUIManager, LoginData
from websocket import WebsocketPool, task_wrapper
from inventory import DropsCampaign, Game, TimedDrop
from exceptions import LoginException, CaptchaRequired
from constants import (
State,
JsonType,
WebsocketTopic,
CLIENT_ID,
@@ -27,66 +31,119 @@ from constants import (
GQL_URL,
GQL_OPERATIONS,
DROPS_ENABLED_TAG,
TERMINATED_STR,
MAX_WEBSOCKETS,
WS_TOPICS_LIMIT,
GQLOperation,
)
if TYPE_CHECKING:
from main import ParsedArgs
logger = logging.getLogger("TwitchDrops")
gql_logger = logging.getLogger("TwitchDrops.gql")
class Twitch:
def __init__(
self,
username: Optional[str] = None,
password: Optional[str] = None,
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
):
self.username: Optional[str] = username
self.password: Optional[str] = password
def __init__(self, options: ParsedArgs):
self._options = options
# GUI
self.gui = GUIManager(self)
# Cookies, session and auth
cookie_jar = aiohttp.CookieJar()
if os.path.isfile(COOKIES_PATH):
cookie_jar.load(COOKIES_PATH)
self._session = aiohttp.ClientSession(
cookie_jar=cookie_jar, headers={"User-Agent": USER_AGENT}, loop=loop
cookie_jar=cookie_jar, headers={"User-Agent": USER_AGENT}
)
self._access_token: Optional[str] = None
self._user_id: Optional[int] = None
self._is_logged_in = asyncio.Event()
# Storing, watching and changing channels
# State management
self._state: State = State.INVENTORY_FETCH
self._state_change = asyncio.Event()
self.inventory: List[DropsCampaign] = [] # inventory
# Storing and watching channels
self.channels: Dict[int, Channel] = {}
self._watching_channel: Optional[Channel] = None
self._watching_task: Optional[asyncio.Task[Any]] = None
self._channel_change = asyncio.Event()
# Inventory
self.inventory: List[DropsCampaign] = []
self._campaign_change = asyncio.Event()
self._last_watch = time() - 60
# Websocket
self.websocket = WebsocketPool(self)
# Runner task
self._main_task: Optional[asyncio.Task[None]] = None
def wait_until_login(self):
return self._is_logged_in.wait()
def reevaluate_campaigns(self):
self._campaign_change.set()
def change_state(self, state: State) -> None:
self._state = state
self._state_change.set()
def state_change(self, state: State) -> Callable[[], None]:
# this is identical to change_state, but defers the call
# perfect for GUI usage
return partial(self.change_state, state)
def request_close(self):
"""
Called when the application is requested to close,
usually by the console or application window being closed.
"""
self.stop()
def start(self):
self._loop = loop = asyncio.get_event_loop()
self._main_task = loop.create_task(self._run())
try:
loop.run_until_complete(self._main_task)
except asyncio.CancelledError:
# happens when the user requests close
pass
except KeyboardInterrupt:
# KeyboardInterrupt causes run_until_complete to exit, but without cancelling the task.
# The loop stops and thus the task gets frozen, until the loop runs again.
# Because we don't want anything from there to actually run during cleanup,
# we need to explicitly cancel the task ourselves here.
self.stop()
except CaptchaRequired:
self.gui.prevent_close()
self.gui.print(
"Your login attempt was denied by CAPTCHA.\nPlease try again in +12 hours."
)
except Exception:
self.gui.prevent_close()
self.gui.print("Fatal error encountered:\n")
self.gui.print(traceback.format_exc())
finally:
loop.run_until_complete(self.close())
loop.run_until_complete(loop.shutdown_asyncgens())
if not self.gui.close_requested:
self.gui.print(
"\nApplication Terminated.\nClose the window to exit the application."
)
loop.run_until_complete(self.gui.wait_until_closed())
loop.close()
def stop(self):
if self._main_task is not None:
self._main_task.cancel()
self._main_task = None
async def close(self):
print("Exiting...")
self._session.cookie_jar.save(COOKIES_PATH) # type: ignore
start_time = time()
self.gui.print("Exiting...")
self.stop_watching()
self._session.cookie_jar.save(COOKIES_PATH) # type: ignore
await self._session.close()
await self.websocket.stop()
await asyncio.sleep(1) # allows aiohttp to safely close the session
# wait at least one full second + whatever it takes to complete the closing
# this allows aiohttp to safely close the session
await asyncio.sleep(start_time + 1 - time())
def is_currently_watching(self, channel: Channel) -> bool:
def is_watching(self, channel: Channel) -> bool:
return self._watching_channel is not None and self._watching_channel == channel
async def run(self, channel_names: Optional[List[str]] = None):
async def _run(self):
"""
Main method that runs the whole client.
@@ -95,129 +152,130 @@ class Twitch:
• Selecting a stream to watch, and watching it
• Changing the stream that's being watched if necessary
"""
self.gui.start()
await self.check_login()
# Add default topics
assert self._user_id is not None
self.websocket.add_topics([
WebsocketTopic("User", "Drops", self._user_id, self.process_drops),
WebsocketTopic("User", "CommunityPoints", self._user_id, self.process_points),
])
await self.websocket.start()
games: Set[Game] = set()
selected_game: Optional[Game] = None
self.change_state(State.INVENTORY_FETCH)
while True:
# Claim the drops we can
self.inventory = await self.get_inventory()
games = set()
for campaign in self.inventory:
if campaign.status == "UPCOMING":
# we have no use in processing upcoming campaigns here
continue
for drop in campaign.timed_drops.values():
if drop.can_earn:
games.add(campaign.game)
if drop.can_claim:
await drop.claim()
# 'games' now has all games we want to farm drops for
# if it's empty, there's no point in continuing
if not games:
print(f"No active campaigns to farm drops for.\n\n{TERMINATED_STR}")
await asyncio.Future()
# Start our websocket connection, only after we confirm that there are drops to mine
await self.websocket.start()
if not channel_names:
# get a list of all channels with drops enabled
print("Fetching suitable live channels to watch...")
live_streams: Dict[Game, List[Channel]] = await self.get_live_streams(
games, [DROPS_ENABLED_TAG]
)
for game, channels in live_streams.items():
for channel in channels:
if channel.id not in self.channels:
self.channels[channel.id] = channel
print(f"Added channel: {channel.name} for game: {game.name}")
else:
# Fetch information about all channels we're supposed to handle
for channel_name in channel_names:
channel: Channel = await Channel(self, channel_name) # type: ignore
self.channels[channel.id] = channel
# Sub to these channel updates
topics: List[WebsocketTopic] = [
WebsocketTopic("Channel", "VideoPlayback", channel_id, self.process_stream_state)
for channel_id in self.channels
]
self.websocket.add_topics(topics)
self._campaign_change.clear()
# Repeat: Change into a channel we can watch, then reset the flag
self._channel_change.set()
refresh_channels = False # we're entering having fresh channel data already
while True:
# wait for either the change channel signal, or campaign change signal
await asyncio.wait(
(
self._channel_change.wait(),
self._campaign_change.wait(),
),
return_when=asyncio.FIRST_COMPLETED,
)
if self._campaign_change.is_set():
# we need to reevaluate all campaigns
# stop watching
self.stop_watching()
# close the websocket
await self.websocket.stop()
break # cycle the outer loop
# otherwise, it was the channel change one
for channel in self.channels.values():
if (
channel.stream is not None # steam online
and channel.stream.game is not None # there's game information
and channel.stream.drops_enabled # drops are enabled
and channel.stream.game in games # it's a game we can earn drops in
):
self.watch(channel)
refresh_channels = True
self._channel_change.clear()
break
else:
# there's no available channel to watch
if refresh_channels:
# refresh the status of all channels,
# to make sure that our websocket didn't miss anything til this point
print("No suitable channel to watch, refreshing...")
for channel in self.channels.values():
await channel.get_stream()
await asyncio.sleep(0.5)
refresh_channels = False
if self._state is State.INVENTORY_FETCH:
# Claim the drops we can
await self.fetch_inventory()
games.clear()
for campaign in self.inventory:
if campaign.status == "UPCOMING":
# we have no use in processing upcoming campaigns here
continue
print("No suitable channel to watch, retrying in 120 seconds")
await asyncio.sleep(120)
for drop in campaign.timed_drops.values():
if drop.can_earn:
games.add(campaign.game)
if drop.can_claim:
await drop.claim()
self.change_state(State.GAME_SELECT)
elif self._state is State.GAME_SELECT:
# 'games' has all games we want to farm drops for
# if it's empty, there's no point in continuing
if not games:
self.gui.print("No active campaigns to farm drops for.")
return
self.gui.games.set_games(games)
selected_game = self.gui.games.get_selection()
self.change_state(State.CHANNEL_FETCH)
elif self._state is State.CHANNEL_FETCH:
if selected_game is None:
self.change_state(State.GAME_SELECT)
else:
# get a list of all live channels with drops enabled
live_streams: List[Channel] = await self.get_live_streams(
selected_game, [DROPS_ENABLED_TAG]
)
# filter out ones we already have
live_streams = [ch for ch in live_streams if ch.id not in self.channels]
for channel in live_streams:
self.channels[channel.id] = channel
channel.display()
# load points
# asyncio.gather(*(channel.claim_bonus() for channel in live_streams))
# Sub to these channel updates
topics: List[WebsocketTopic] = [
WebsocketTopic(
"Channel", "VideoPlayback", channel_id, self.process_stream_state
)
for channel_id in self.channels
]
self.websocket.add_topics(topics)
self.change_state(State.CHANNEL_SWITCH)
elif self._state is State.CHANNEL_CLEANUP:
# remove all channels that are offline,
# or aren't streaming the game we want anymore
to_remove = [
channel for channel in self.channels.values()
if not (channel.online or channel.pending_online)
or channel.game is None or channel.game != selected_game
]
self.websocket.remove_topics(
WebsocketTopic.as_str("Channel", "VideoPlayback", channel.id)
for channel in to_remove
)
for channel in to_remove:
del self.channels[channel.id]
channel.remove()
self.change_state(State.CHANNEL_FETCH)
elif self._state is State.CHANNEL_SWITCH:
if selected_game is None:
self.change_state(State.GAME_SELECT)
else:
# Change into the selected channel
channels: Iterable[Channel]
selected_channel = self.gui.channels.get_selection()
if selected_channel is not None:
self.gui.channels.clear_selection()
channels = chain([selected_channel], self.channels.values())
else:
channels = self.channels.values()
# If there's no selected channel, change into a channel we can watch
for channel in channels:
if (
channel.online # steam online
and channel.drops_enabled # drops are enabled
and channel.game == selected_game # it's a game we've selected
):
self.watch(channel)
# break the state change chain by clearing the flag
self._state_change.clear()
break
else:
self.stop_watching()
selected_game = self.gui.games.get_next_selection()
if selected_game is None:
self.gui.print("No suitable channel to watch.")
# TODO: Figure out what to do here.
return
self.change_state(State.CHANNEL_CLEANUP)
await self._state_change.wait()
def watch(self, channel: Channel):
if self.is_watching(channel):
# we're already watching the same channel, so there's no point switching
return
if self._watching_task is not None:
self._watching_task.cancel()
async def watcher(channel: Channel):
op = GQL_OPERATIONS["ChannelPointsContext"].with_variables(
{"channelLogin": channel.name}
)
i = 0
while True:
await channel._send_watch()
if i == 0:
# ensure every 30 minutes that we don't have unclaimed points bonus
response = await self.gql_request(op)
channel_data: JsonType = response["data"]["community"]["channel"]
claim_available: JsonType = (
channel_data["self"]["communityPoints"]["availableClaim"]
)
if claim_available:
await self.claim_points(channel_data["id"], claim_available["id"])
logger.info("Claimed bonus points")
i = (i + 1) % 30
await asyncio.sleep(58.5)
if channel.stream is not None and channel.stream.game is not None:
game_name = channel.stream.game.name
else:
game_name = "<Unknown>"
print(f"Watching: {channel.name}, game: {game_name}")
self.gui.channels.set_watching(channel)
self._watching_channel = channel
self._watching_task = asyncio.create_task(watcher(channel))
self._watching_task = asyncio.create_task(channel.watch_loop())
self.gui.progress.start_timer()
def stop_watching(self):
self.gui.progress.stop_timer()
self.gui.channels.clear_watching()
if self._watching_task is not None:
logger.info("Watching stopped.")
self._watching_task.cancel()
self._watching_task = None
self._watching_channel = None
@@ -231,10 +289,10 @@ class Twitch:
if msg_type == "stream-down":
logger.info(f"{channel.name} goes OFFLINE")
channel.set_offline()
if self.is_currently_watching(channel):
print(f"{channel.name} goes OFFLINE, switching...")
if self.is_watching(channel):
self.gui.print(f"{channel.name} goes OFFLINE, switching...")
# change the channel if we're currently watching it
self._channel_change.set()
self.change_state(State.CHANNEL_SWITCH)
elif msg_type == "stream-up":
logger.info(f"{channel.name} goes ONLINE")
channel.set_online()
@@ -243,17 +301,140 @@ class Twitch:
# if it's not online for some reason, set it so
channel.set_online()
else:
assert channel.stream is not None
viewers = message["viewers"]
channel.stream.viewer_count = viewers
channel.viewers = viewers
logger.debug(f"{channel.name} viewers: {viewers}")
@task_wrapper
async def process_drops(self, user_id: int, message: JsonType):
# Message examples:
# {"type": "drop-progress", data: {"current_progress_min": 3, "required_progress_min": 10}}
# {"type": "drop-claim", data: {"drop_instance_id": ...}}
msg_type: str = message["type"]
if msg_type not in ("drop-progress", "drop-claim"):
return
drop_id: str = message["data"]["drop_id"]
drop: Optional[TimedDrop] = self.get_drop(drop_id)
if msg_type == "drop-claim" and drop is None:
logger.error(
f"Received a drop claim ID for a non-existing drop: {drop_id}\n"
f"Drop claim ID: {message['data']['drop_instance_id']}"
)
return
# Sometimes, the drop update we receive doesn't actually match what we're mining.
# This is a Twitch bug workaround: use GQL to get the current drop progress.
if msg_type == "drop-progress" and (drop is None or not drop.campaign.active):
logger.debug(
"Received a drop update for an inactive campaign, using drop context instead"
)
context = await self.gql_request(GQL_OPERATIONS["CurrentDrop"])
drop_data = context["data"]["currentUser"]["dropCurrentSession"]
drop_id = drop_data["dropID"]
drop = self.get_drop(drop_id)
if drop is None:
logger.warning(f"Received an update for a non-existing drop: {drop_id}")
return
if not drop.campaign.active:
# Sometimes, even GQL fails to give us the correct drop.
# In that case, we can use the locally cached inventory to try and put together
# the drop that we're actually mining right now.
# TODO: Find a better way of figuring out the correct game
game = drop.campaign.game
drop = None
for campaign in self.inventory:
if campaign.game == game:
drop = campaign.get_active_drop()
break
if drop is None or not drop.campaign.active:
logger.error("Active drop search failed")
return
drop.bump_minutes()
self.gui.progress.update(drop)
return
else:
# TODO: Use a cleaner solution than modifying the raw payload
message["data"]["current_progress_min"] = drop_data["currentMinutesWatched"]
message["data"]["required_progress_min"] = drop_data["requiredMinutesWatched"]
assert drop is not None
drop.update(message)
if msg_type == "drop-claim":
campaign = drop.campaign
await drop.claim()
self.gui.print(
f"Claimed drop: {drop.rewards_text()} "
f"({campaign.claimed_drops}/{campaign.total_drops})"
)
if campaign.remaining_drops == 0:
self.change_state(State.INVENTORY_FETCH)
self.gui.progress.update(drop)
@task_wrapper
async def process_points(self, user_id: int, message: JsonType):
# Example payloads:
# {
# "type": "points-earned",
# "data": {
# "timestamp": "YYYY-MM-DDTHH:MM:SS.123456789Z",
# "channel_id": "123456789",
# "point_gain": {
# "user_id": "12345678",
# "channel_id": "123456789",
# "total_points": 10,
# "baseline_points": 10,
# "reason_code": "WATCH",
# "multipliers": []
# },
# "balance": {
# "user_id": "12345678",
# "channel_id": "123456789",
# "balance": 12345
# }
# }
# }
# {
# "type": "claim-available",
# "data": {
# "timestamp":"2021-12-23T21:41:35.784041064Z",
# "claim": {
# "id": "4ae6fefd-3658-40ae-ad3d-92254c576a91",
# "user_id": "94275183",
# "channel_id": "218893986",
# "point_gain": {
# "user_id": "94275183",
# "channel_id": "218893986",
# "total_points": 50,
# "baseline_points": 50,
# "reason_code": "CLAIM",
# "multipliers": []
# },
# "created_at": "2021-12-23T21:41:31Z"
# }
# }
# }
msg_type = message["type"]
if msg_type == "points-earned":
data = message["data"]
channel = self.channels.get(int(data["channel_id"]))
points = data["point_gain"]["total_points"]
balance = data["balance"]["balance"]
if channel is not None:
channel.points = balance
self.gui.channels.display(channel)
self.gui.print(f"Earned points for watching: {points:3}, total: {balance}")
elif msg_type == "claim-available":
claim_data = message["data"]["claim"]
points = claim_data["point_gain"]["total_points"]
await self.claim_points(claim_data["channel_id"], claim_data["id"])
self.gui.print(f"Claimed bonus points: {points}")
async def _validate_password(self, password: str) -> bool:
"""
Use Twitch's password validator to validate the password length, characters required, etc.
Helps avoid running into the CAPTCHA if you mistype your password by mistake.
Valid length: 8-71
"""
if not 8 <= len(password) <= 71:
return False
payload = {"password": password}
async with self._session.post(
f"{AUTH_URL}/api/v1/password_strength", json=payload
@@ -261,120 +442,97 @@ class Twitch:
strength_response = await response.json()
return strength_response["isValid"]
async def get_password(self, prompt: str = "Password: ") -> str:
"""
A loop that'll keep asking for password, until it's considered valid.
Use own implementation rather than `getpass.getpass`, to add some user feedback
on how many characters have been typed in.
"""
async def ask_login(self) -> LoginData:
while True:
for c in prompt:
msvcrt.putwch(c)
pass_chars: List[str] = []
try:
while True:
c = msvcrt.getwch()
if c in "\r\n":
break
elif c == '\003':
raise KeyboardInterrupt
elif c == '\b':
# backspace
if not pass_chars:
# we have nothing to remove
continue
pass_chars.pop()
# move one character back
msvcrt.putwch('\b')
# overwrite the • with a space
msvcrt.putwch(' ')
# move back again
msvcrt.putwch('\b')
else:
pass_chars.append(c)
msvcrt.putwch('')
finally:
msvcrt.putwch('\r')
msvcrt.putwch('\n')
password = ''.join(pass_chars)
if await self._validate_password(password):
return password
data = await self.gui.login.ask_login()
if await self._validate_password(data.password):
return data
async def _login(self) -> str:
logger.debug("Login flow started")
if self.username is None:
self.username = input("Username: ")
if self.password is None:
print("\nNote: Password can be pasted in by pressing right click inside the window.\n")
self.password = await self.get_password()
payload: JsonType = {
"username": self.username,
"password": self.password,
"client_id": CLIENT_ID,
"undelete_user": False,
"remember_me": True,
}
for attempt in range(10):
async with self._session.post(f"{AUTH_URL}/login", json=payload) as response:
login_response = await response.json()
while True:
username, password, token = await self.ask_login()
payload["username"] = username
payload["password"] = password
# remove stale 2FA tokens, if present
payload.pop("authy_token", None)
payload.pop("twitchguard_code", None)
for attempt in range(2):
async with self._session.post(f"{AUTH_URL}/login", json=payload) as response:
login_response: JsonType = await response.json()
# Feed this back in to avoid running into CAPTCHA if possible
if "captcha_proof" in login_response:
payload["captcha"] = {"proof": login_response["captcha_proof"]}
# Feed this back in to avoid running into CAPTCHA if possible
if "captcha_proof" in login_response:
payload["captcha"] = {"proof": login_response["captcha_proof"]}
# Error handling
if "error_code" in login_response:
error_code = login_response["error_code"]
logger.debug(f"Login error code: {error_code}")
if error_code == 1000:
# we've failed bois
logger.debug("Login failed due to CAPTCHA")
raise CaptchaRequired()
elif error_code == 3001:
# wrong password you dummy
logger.debug("Login failed due to incorrect login or pass")
print(f"Incorrect username or password.\nUsername: {self.username}")
self.password = await self.get_password()
elif error_code in (
3011, # Authy token needed
3012, # Invalid authy token
3022, # Email code needed
3023, # Invalid email code
):
# 2FA handling
email = error_code in (3022, 3023)
logger.debug("2FA token required")
token = input("2FA token: ")
if email:
# email code
payload["twitchguard_code"] = token
# Error handling
if "error_code" in login_response:
error_code: int = login_response["error_code"]
logger.debug(f"Login error code: {error_code}")
if error_code == 1000:
# we've failed bois
logger.debug("Login failed due to CAPTCHA")
raise CaptchaRequired()
elif error_code == 3001:
# wrong password you dummy
logger.debug("Login failed due to incorrect login or pass")
self.gui.print("Incorrect username or password.")
self.gui.login.clear(password=True)
break
elif error_code in (
3012, # Invalid authy token
3023, # Invalid email code
):
logger.debug("Login failed due to incorrect 2FA code")
if error_code == 3023:
self.gui.print("Incorrect email code.")
else:
self.gui.print("Incorrect 2FA code.")
self.gui.login.clear(token=True)
break
elif error_code in (
3011, # Authy token needed
3022, # Email code needed
):
# 2FA handling
email = error_code == 3022
if not token:
logger.debug("2FA token required")
# user didn't provide a token, so ask them for it
if email:
self.gui.print("Email code required. Check your email.")
else:
self.gui.print("2FA token required.")
break
if email:
payload["twitchguard_code"] = token
else:
payload["authy_token"] = token
continue
else:
# authy token
payload["authy_token"] = token
continue
else:
raise LoginException(login_response["error"])
# Success handling
if "access_token" in login_response:
# we're in bois
self._access_token = login_response["access_token"]
logger.debug(f"Access token: {self._access_token}")
break
if self._access_token is None:
# this means we've ran out of retries
raise LoginException("Ran out of login retries")
return self._access_token
raise LoginException(login_response["error"])
# Success handling
if "access_token" in login_response:
# we're in bois
self._access_token = cast(str, login_response["access_token"])
logger.debug("Access token granted")
self.gui.login.clear()
return self._access_token
async def check_login(self) -> None:
if self._access_token is not None and self._user_id is not None:
# we're all good
return
# looks like we're missing something
print("Logging in")
logger.debug("Checking login")
self.gui.login.update("Logging in...", None)
jar = cast(aiohttp.CookieJar, self._session.cookie_jar)
while True:
cookie = jar.filter_cookies("https://twitch.tv") # type: ignore
@@ -386,7 +544,7 @@ class Twitch:
elif self._access_token is None:
# have cookie - get our access token
self._access_token = cookie["auth-token"].value
logger.debug("Session restored from cookie")
logger.debug("Restoring session from cookie")
# validate our access token, by obtaining user_id
async with self._session.get(
"https://id.twitch.tv/oauth2/validate",
@@ -395,6 +553,7 @@ class Twitch:
status = response.status
if status == 401:
# the access token we have is invalid - clear the cookie and reauth
logger.debug("Restored session is invalid")
jar.clear_domain("twitch.tv")
continue
elif status == 200:
@@ -403,7 +562,8 @@ class Twitch:
self._user_id = int(validate_response["user_id"])
cookie["persistent"] = str(self._user_id)
self._is_logged_in.set()
print(f"Login successful, User ID: {self._user_id}")
logger.debug(f"Login successful, user ID: {self._user_id}")
self.gui.login.update("Logged in", self._user_id)
# update our cookie and save it
jar.update_cookies(cookie, URL("https://twitch.tv"))
jar.save(COOKIES_PATH)
@@ -420,34 +580,38 @@ class Twitch:
gql_logger.debug(f"GQL Response: {response_json}")
return response_json
async def get_inventory(self) -> List[DropsCampaign]:
async def fetch_inventory(self) -> None:
response = await self.gql_request(GQL_OPERATIONS["Inventory"])
inventory = response["data"]["currentUser"]["inventory"]
return [DropsCampaign(self, data) for data in inventory["dropCampaignsInProgress"]]
self.inventory = [
DropsCampaign(self, data) for data in inventory["dropCampaignsInProgress"]
]
async def get_live_streams(
self, games: Collection[Game], tag_ids: List[str]
) -> Dict[Game, List[Channel]]:
limit = min(int((MAX_WEBSOCKETS * WS_TOPICS_LIMIT) // len(games)), 100)
live_streams = {}
for game in games:
response = await self.gql_request(
GQL_OPERATIONS["GameDirectory"].with_variables({
"limit": limit,
"name": game.name,
"options": {
"includeRestricted": ["SUB_ONLY_LIVE"],
"tags": tag_ids,
},
})
)
live_streams[game] = [
Channel.from_directory(self, stream_channel_data["node"])
for stream_channel_data in response["data"]["game"]["streams"]["edges"]
]
return live_streams
def get_drop(self, drop_id: str) -> Optional[TimedDrop]:
for campaign in self.inventory:
drop = campaign.get_drop(drop_id)
if drop is not None:
return drop
return None
async def claim_points(self, channel_id: Union[str, int], claim_id: str):
async def get_live_streams(self, game: Game, tag_ids: List[str]) -> List[Channel]:
limit = 45
response = await self.gql_request(
GQL_OPERATIONS["GameDirectory"].with_variables({
"limit": limit,
"name": game.name,
"options": {
"includeRestricted": ["SUB_ONLY_LIVE"],
"tags": tag_ids,
},
})
)
return [
Channel.from_directory(self, stream_channel_data["node"])
for stream_channel_data in response["data"]["game"]["streams"]["edges"]
]
async def claim_points(self, channel_id: Union[str, int], claim_id: str) -> None:
variables = {"input": {"channelID": str(channel_id), "claimID": claim_id}}
await self.gql_request(
GQL_OPERATIONS["ClaimCommunityPoints"].with_variables(variables)

1
version.py Normal file
View File

@@ -0,0 +1 @@
__version__ = 4

View File

@@ -13,7 +13,6 @@ from typing import Any, Optional, List, Dict, Set, Iterable, TYPE_CHECKING
from websockets.exceptions import ConnectionClosed, ConnectionClosedOK
from websockets.client import WebSocketClientProtocol, connect as websocket_connect
from inventory import TimedDrop
from exceptions import MinerException
from constants import (
JsonType,
@@ -29,6 +28,7 @@ if TYPE_CHECKING:
from twitch import Twitch
logger = logging.getLogger("TwitchDrops")
ws_logger = logging.getLogger("TwitchDrops.websocket")
NONCE_CHARS = string.ascii_letters + string.digits
@@ -39,11 +39,11 @@ def create_nonce(length: int = 30) -> str:
def task_wrapper(afunc):
@wraps(afunc)
async def wrapper(self: Websocket, *args, **kwargs):
async def wrapper(self, *args, **kwargs):
try:
await afunc(self, *args, **kwargs)
except Exception:
ws_logger.exception("Exception in websocket task")
logger.exception("Exception in task")
raise # raise up to the wrapping task
return wrapper
@@ -55,7 +55,7 @@ class Websocket:
# websocket index
self._idx: int = index
# current websocket connection
self._ws: WebSocketClientProtocol
self._ws: Optional[WebSocketClientProtocol] = None
# set when there's an active websocket connection
self._connected_flag = asyncio.Event()
# set when the websocket needs to reconnect
@@ -70,6 +70,8 @@ class Websocket:
# topics stuff
self.topics: Dict[str, WebsocketTopic] = {}
self._submitted: Set[WebsocketTopic] = set()
# notify GUI
self.set_status("Disconnected")
@property
def connected(self) -> bool:
@@ -78,12 +80,25 @@ class Websocket:
def wait_until_connected(self):
return self._connected_flag.wait()
def set_status(self, status: Optional[str] = None, refresh_topics: bool = False):
kwargs: Dict[str, Any] = {}
if status is not None:
kwargs["status"] = status
if refresh_topics:
kwargs["topics"] = len(self.topics)
self._twitch.gui.websockets.update(self._idx, **kwargs)
def request_reconnect(self):
ws_logger.warning(f"Websocket[{self._idx}] requested reconnect.")
# reset our ping interval, so we send a PING after reconnect right away
self._next_ping = time()
self._reconnect_requested.set()
async def close(self):
self.set_status("Disconnecting...")
if self._ws is not None:
await self._ws.close()
async def start(self):
if self.connected:
return
@@ -98,27 +113,35 @@ class Websocket:
self._handle_task = asyncio.create_task(self._handle())
async def stop(self):
if self._ws is not None:
await self._ws.close()
await self.close()
if self._handle_task is not None:
# this raises back any stray exceptions
await self._handle_task
self._handle_task = None
def stop_nowait(self):
if self._ws is not None:
asyncio.create_task(self._ws.close())
asyncio.create_task(self.close())
# note: this detaches the handle task, so we have to assume it closes properly
self._handle_task = None
def remove(self):
# this stops the websocket, and then removes it from the gui list
async def remover():
await self.stop()
self._twitch.gui.websockets.remove(self._idx)
asyncio.create_task(remover())
@task_wrapper
async def _handle(self):
# ensure we're logged in before connecting
await self._twitch.wait_until_login()
self.set_status("Connecting...")
ws_logger.info(f"Websocket[{self._idx}] connecting...")
# Connect/Reconnect loop
async for websocket in websocket_connect(WEBSOCKET_URL, ssl=True, ping_interval=None):
websocket.BACKOFF_MAX = 3 * 60 # type: ignore # 3 minutes
self._ws = websocket
self.set_status("Connected")
try:
try:
self._reconnect_requested.clear()
@@ -135,20 +158,24 @@ class Websocket:
if isinstance(exc, ConnectionClosedOK):
if exc.rcvd_then_sent:
# server closed the connection, not us - reconnect
ws_logger.warning(f"Websocket[{self._idx}] disconnected.")
ws_logger.warning(f"Websocket[{self._idx}] got disconnected.")
else:
# we closed it - exit
self._ws = None
ws_logger.info(f"Websocket[{self._idx}] stopped.")
self.set_status("Disconnected")
return
if exc.rcvd is not None:
code = exc.rcvd.code
elif exc.sent is not None:
code = exc.sent.code
else:
code = -1
ws_logger.warning(f"Websocket[{self._idx}] closed unexpectedly: {code}")
if exc.rcvd is not None:
code = exc.rcvd.code
elif exc.sent is not None:
code = exc.sent.code
else:
code = -1
ws_logger.warning(f"Websocket[{self._idx}] closed unexpectedly: {code}")
except Exception:
ws_logger.exception(f"Exception in Websocket[{self._idx}]")
self.set_status("Reconnecting...")
ws_logger.warning(f"Websocket[{self._idx}] reconnecting...")
async def _handle_ping(self):
@@ -203,6 +230,7 @@ class Websocket:
Gather incoming messages over the timeout specified.
Note that there's no return value - this modifies `messages` in-place.
"""
assert self._ws is not None
while True:
raw_message = await self._ws.recv()
message = json.loads(raw_message)
@@ -211,10 +239,10 @@ class Websocket:
def _handle_message(self, message):
# request the assigned topic to process the response
topic_process = self.topics.get(message["data"]["topic"])
if topic_process is not None:
topic = self.topics.get(message["data"]["topic"])
if topic is not None:
# use a task to not block the websocket
asyncio.create_task(topic_process(json.loads(message["data"]["message"])))
asyncio.create_task(topic(json.loads(message["data"]["message"])))
async def _handle_recv(self):
"""
@@ -246,6 +274,7 @@ class Websocket:
topic = topics_set.pop()
self.topics[str(topic)] = topic
self._topics_changed.set()
self.set_status(refresh_topics=True)
def remove_topics(self, topics_set: Set[str]):
existing = topics_set.intersection(self.topics.keys())
@@ -256,6 +285,7 @@ class Websocket:
for topic in existing:
del self.topics[topic]
self._topics_changed.set()
self.set_status(refresh_topics=True)
async def send(self, message: JsonType):
if self._ws is None:
@@ -283,17 +313,12 @@ class WebsocketPool:
await self._twitch.wait_until_login()
if self.running:
return
# Add default topics
assert self._twitch._user_id is not None
user_id = self._twitch._user_id
self.add_topics([
WebsocketTopic("User", "Drops", user_id, self.process_drops),
WebsocketTopic("User", "CommunityPoints", user_id, self.process_points),
])
self._running.set()
await asyncio.gather(*(ws.start() for ws in self.websockets))
async def stop(self):
if not self.running:
return
self._running.clear()
await asyncio.gather(*(ws.stop() for ws in self.websockets))
@@ -340,49 +365,8 @@ class WebsocketPool:
if count <= (len(self.websockets) - 1) * WS_TOPICS_LIMIT:
ws = self.websockets.pop()
recycled_topics.extend(ws.topics.values())
ws.stop_nowait()
ws.remove()
else:
break
if recycled_topics:
self.add_topics(recycled_topics)
@task_wrapper
async def process_drops(self, user_id: int, message: JsonType):
drop_id = message["data"]["drop_id"]
drop: Optional[TimedDrop] = None
for campaign in self._twitch.inventory:
drop = campaign.get_drop(drop_id)
if drop is not None:
break
else:
ws_logger.warning(f"Drop with ID of {drop_id} not found!")
return
drop.update(message)
msg_type = message["type"]
campaign = drop.campaign
if msg_type == "drop-progress":
print(
f"Drop: {drop.rewards_text()} ({campaign.claimed_drops}/{campaign.total_drops}): "
f"{drop.progress:6.1%} ({drop.remaining_minutes} minutes remaining)"
)
elif msg_type == "drop-claim":
await drop.claim()
print(
f"Claimed drop: {drop.rewards_text()} "
f"({campaign.claimed_drops}/{campaign.total_drops})"
)
if campaign.remaining_drops == 0:
self._twitch.reevaluate_campaigns()
@task_wrapper
async def process_points(self, user_id: int, message: JsonType):
msg_type = message["type"]
if msg_type == "points-earned":
points = message["data"]["point_gain"]["total_points"]
balance = message["data"]["balance"]["balance"]
print(f"Earned points for watching: {points:3}, total: {balance}")
elif msg_type == "claim-available":
claim_data = message["data"]["claim"]
points = claim_data["point_gain"]["total_points"]
await self._twitch.claim_points(claim_data["channel_id"], claim_data["id"])
print(f"Claimed bonus points: {points}")