Files
TwitchDropsMiner/twitch.py

1613 lines
72 KiB
Python

from __future__ import annotations
import json
import asyncio
import logging
from time import time
from copy import deepcopy
from itertools import chain
from functools import partial
from collections import abc, deque, OrderedDict
from datetime import datetime, timedelta, timezone
from contextlib import suppress, asynccontextmanager
from typing import Any, Literal, Final, NoReturn, overload, cast, TYPE_CHECKING
import aiohttp
from yarl import URL
from translate import _
from gui import GUIManager
from channel import Channel
from websocket import WebsocketPool
from inventory import DropsCampaign
from exceptions import (
ExitRequest,
GQLException,
ReloadRequest,
LoginException,
MinerException,
RequestInvalid,
CaptchaRequired,
RequestException,
)
from utils import (
CHARS_HEX_LOWER,
chunk,
timestamp,
create_nonce,
task_wrapper,
AwaitableValue,
ExponentialBackoff,
)
from constants import (
CALL,
MAX_INT,
DUMP_PATH,
COOKIES_PATH,
MAX_CHANNELS,
GQL_OPERATIONS,
WATCH_INTERVAL,
State,
ClientType,
PriorityMode,
WebsocketTopic,
)
if TYPE_CHECKING:
from utils import Game
from gui import LoginForm
from channel import Stream
from settings import Settings
from inventory import TimedDrop
from constants import ClientInfo, JsonType, GQLOperation
logger = logging.getLogger("TwitchDrops")
gql_logger = logging.getLogger("TwitchDrops.gql")
class SkipExtraJsonDecoder(json.JSONDecoder):
def decode(self, s: str, *args):
# skip whitespace check
obj, end = self.raw_decode(s)
return obj
SAFE_LOADS = lambda s: json.loads(s, cls=SkipExtraJsonDecoder)
class _AuthState:
def __init__(self, twitch: Twitch):
self._twitch: Twitch = twitch
self._lock = asyncio.Lock()
self._logged_in = asyncio.Event()
self.user_id: int
self.device_id: str
self.session_id: str
self.access_token: str
self.client_version: str
def _hasattrs(self, *attrs: str) -> bool:
return all(hasattr(self, attr) for attr in attrs)
def _delattrs(self, *attrs: str) -> None:
for attr in attrs:
if hasattr(self, attr):
delattr(self, attr)
def clear(self) -> None:
self._delattrs(
"user_id",
"device_id",
"session_id",
"access_token",
"client_version",
)
self._logged_in.clear()
async def _oauth_login(self) -> str:
login_form: LoginForm = self._twitch.gui.login
client_info: ClientInfo = self._twitch._client_type
headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"Accept-Language": "en-US",
"Cache-Control": "no-cache",
"Client-Id": client_info.CLIENT_ID,
"Host": "id.twitch.tv",
"Origin": str(client_info.CLIENT_URL),
"Pragma": "no-cache",
"Referer": str(client_info.CLIENT_URL),
"User-Agent": client_info.USER_AGENT,
"X-Device-Id": self.device_id,
}
payload = {
"client_id": client_info.CLIENT_ID,
"scopes": (
"channel_read chat:read user_blocks_edit "
"user_blocks_read user_follows_edit user_read"
),
}
while True:
try:
async with self._twitch.request(
"POST", "https://id.twitch.tv/oauth2/device", headers=headers, data=payload
) as response:
# {
# "device_code": "40 chars [A-Za-z0-9]",
# "expires_in": 1800,
# "interval": 5,
# "user_code": "8 chars [A-Z]",
# "verification_uri": "https://www.twitch.tv/activate"
# }
now = datetime.now(timezone.utc)
response_json: JsonType = await response.json()
device_code: str = response_json["device_code"]
user_code: str = response_json["user_code"]
interval: int = response_json["interval"]
expires_at = now + timedelta(seconds=response_json["expires_in"])
# Print the code to the user, open them the activate page so they can type it in
await login_form.ask_enter_code(user_code)
payload = {
"client_id": self._twitch._client_type.CLIENT_ID,
"device_code": device_code,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
}
while True:
# sleep first, not like the user is gonna enter the code *that* fast
await asyncio.sleep(interval)
async with self._twitch.request(
"POST",
"https://id.twitch.tv/oauth2/token",
headers=headers,
data=payload,
invalidate_after=expires_at,
) as response:
# 200 means success, 400 means the user haven't entered the code yet
if response.status != 200:
continue
response_json = await response.json()
# {
# "access_token": "40 chars [A-Za-z0-9]",
# "refresh_token": "40 chars [A-Za-z0-9]",
# "scope": [...],
# "token_type": "bearer"
# }
self.access_token = cast(str, response_json["access_token"])
return self.access_token
except RequestInvalid:
# the device_code has expired, request a new code
continue
async def _login(self) -> str:
logger.info("Login flow started")
gui_print = self._twitch.gui.print
login_form: LoginForm = self._twitch.gui.login
client_info: ClientInfo = self._twitch._client_type
token_kind: str = ''
use_chrome: bool = False
payload: JsonType = {
# username and password are added later
# "username": str,
# "password": str,
# client ID to-be associated with the access token
"client_id": client_info.CLIENT_ID,
"undelete_user": False, # purpose unknown
"remember_me": True, # persist the session via the cookie
# "authy_token": str, # 2FA token
# "twitchguard_code": str, # email code
# "captcha": str, # self-fed captcha
# 'force_twitchguard': False, # force email code confirmation
}
while True:
login_data = await login_form.ask_login()
payload["username"] = login_data.username
payload["password"] = login_data.password
# reinstate the 2FA token, if present
payload.pop("authy_token", None)
payload.pop("twitchguard_code", None)
if login_data.token:
# if there's no token kind set yet, and the user has entered a token,
# we can immediately assume it's an authenticator token and not an email one
if not token_kind:
token_kind = "authy"
if token_kind == "authy":
payload["authy_token"] = login_data.token
elif token_kind == "email":
payload["twitchguard_code"] = login_data.token
# use fancy headers to mimic the twitch android app
headers = {
"Accept": "application/vnd.twitchtv.v3+json",
"Accept-Encoding": "gzip",
"Accept-Language": "en-US",
"Client-Id": client_info.CLIENT_ID,
"Content-Type": "application/json; charset=UTF-8",
"Host": "passport.twitch.tv",
"User-Agent": client_info.USER_AGENT,
"X-Device-Id": self.device_id,
# "X-Device-Id": ''.join(random.choices('0123456789abcdef', k=32)),
}
async with self._twitch.request(
"POST", "https://passport.twitch.tv/login", headers=headers, json=payload
) as response:
login_response: JsonType = await response.json(loads=SAFE_LOADS)
# Feed this back in to avoid running into CAPTCHA if possible
if "captcha_proof" in login_response:
payload["captcha"] = {"proof": login_response["captcha_proof"]}
# Error handling
if "error_code" in login_response:
error_code: int = login_response["error_code"]
logger.info(f"Login error code: {error_code}")
if error_code == 1000:
logger.info("1000: CAPTCHA is required")
use_chrome = True
break
elif error_code in (2004, 3001):
logger.info("3001: Login failed due to incorrect username or password")
gui_print(_("login", "incorrect_login_pass"))
if error_code == 2004:
# invalid username
login_form.clear(login=True)
login_form.clear(password=True)
continue
elif error_code in (
3012, # Invalid authy token
3023, # Invalid email code
):
logger.info("3012/23: Login failed due to incorrect 2FA code")
if error_code == 3023:
token_kind = "email"
gui_print(_("login", "incorrect_email_code"))
else:
token_kind = "authy"
gui_print(_("login", "incorrect_twofa_code"))
login_form.clear(token=True)
continue
elif error_code in (
3011, # Authy token needed
3022, # Email code needed
):
# 2FA handling
logger.info("3011/22: 2FA token required")
# user didn't provide a token, so ask them for it
if error_code == 3022:
token_kind = "email"
gui_print(_("login", "email_code_required"))
else:
token_kind = "authy"
gui_print(_("login", "twofa_code_required"))
continue
elif error_code >= 5000:
# Special errors, usually from Twitch telling the user to "go away"
# We print the code out to inform the user, and just use chrome flow instead
# {
# "error_code":5023,
# "error":"Please update your app to continue",
# "error_description":"client is not supported for this feature"
# }
# {
# "error_code":5027,
# "error":"Please update your app to continue",
# "error_description":"client blocked from this operation"
# }
gui_print(_("login", "error_code").format(error_code=error_code))
logger.info(str(login_response))
use_chrome = True
break
else:
ext_msg = str(login_response)
logger.info(ext_msg)
raise LoginException(ext_msg)
# Success handling
if "access_token" in login_response:
self.access_token = cast(str, login_response["access_token"])
logger.info("Access token granted")
login_form.clear()
break
if use_chrome:
# await self._chrome_login()
raise CaptchaRequired()
if hasattr(self, "access_token"):
return self.access_token
raise LoginException("Login flow finished without setting the access token")
def headers(self, *, user_agent: str = '', gql: bool = False) -> JsonType:
client_info: ClientInfo = self._twitch._client_type
headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip",
"Accept-Language": "en-US",
"Pragma": "no-cache",
"Cache-Control": "no-cache",
"Client-Id": client_info.CLIENT_ID,
}
if user_agent:
headers["User-Agent"] = user_agent
if hasattr(self, "session_id"):
headers["Client-Session-Id"] = self.session_id
# if hasattr(self, "client_version"):
# headers["Client-Version"] = self.client_version
if hasattr(self, "device_id"):
headers["X-Device-Id"] = self.device_id
if gql:
headers["Origin"] = str(client_info.CLIENT_URL)
headers["Referer"] = str(client_info.CLIENT_URL)
headers["Authorization"] = f"OAuth {self.access_token}"
return headers
async def validate(self):
async with self._lock:
await self._validate()
async def _validate(self):
if not hasattr(self, "session_id"):
self.session_id = create_nonce(CHARS_HEX_LOWER, 16)
if not self._hasattrs("device_id", "access_token", "user_id"):
session = await self._twitch.get_session()
jar = cast(aiohttp.CookieJar, session.cookie_jar)
client_info: ClientInfo = self._twitch._client_type
if not self._hasattrs("device_id"):
async with self._twitch.request(
"GET", client_info.CLIENT_URL, headers=self.headers()
) as response:
page_html = await response.text("utf8")
assert page_html is not None
# match = re.search(r'twilightBuildID="([-a-z0-9]+)"', page_html)
# if match is None:
# raise MinerException("Unable to extract client_version")
# self.client_version = match.group(1)
# doing the request ends up setting the "unique_id" value in the cookie
cookie = jar.filter_cookies(client_info.CLIENT_URL)
self.device_id = cookie["unique_id"].value
if not self._hasattrs("access_token", "user_id"):
# looks like we're missing something
login_form: LoginForm = self._twitch.gui.login
logger.info("Checking login")
login_form.update(_("gui", "login", "logging_in"), None)
for client_mismatch_attempt in range(2):
for invalid_token_attempt in range(2):
cookie = jar.filter_cookies(client_info.CLIENT_URL)
if "auth-token" not in cookie:
self.access_token = await self._oauth_login()
cookie["auth-token"] = self.access_token
elif not hasattr(self, "access_token"):
logger.info("Restoring session from cookie")
self.access_token = cookie["auth-token"].value
# validate the auth token, by obtaining user_id
async with self._twitch.request(
"GET",
"https://id.twitch.tv/oauth2/validate",
headers={"Authorization": f"OAuth {self.access_token}"}
) as response:
status = response.status
if status == 401:
# the access token we have is invalid - clear the cookie and reauth
logger.info("Restored session is invalid")
assert client_info.CLIENT_URL.host is not None
jar.clear_domain(client_info.CLIENT_URL.host)
continue
elif status == 200:
validate_response = await response.json()
break
else:
raise RuntimeError("Login verification failure (step #2)")
# ensure the cookie's client ID matches the currently selected client
if validate_response["client_id"] == client_info.CLIENT_ID:
break
# otherwise, we need to delete the entire cookie file and clear the jar
logger.info("Cookie client ID mismatch")
jar.clear()
COOKIES_PATH.unlink(missing_ok=True)
else:
raise RuntimeError("Login verification failure (step #1)")
self.user_id = int(validate_response["user_id"])
cookie["persistent"] = str(self.user_id)
logger.info(f"Login successful, user ID: {self.user_id}")
login_form.update(_("gui", "login", "logged_in"), self.user_id)
# update our cookie and save it
jar.update_cookies(cookie, client_info.CLIENT_URL)
jar.save(COOKIES_PATH)
self._logged_in.set()
def invalidate(self):
self._delattrs("access_token")
class Twitch:
def __init__(self, settings: Settings):
self.settings: Settings = settings
# State management
self._state: State = State.IDLE
self._state_change = asyncio.Event()
self.wanted_games: list[Game] = []
self.inventory: list[DropsCampaign] = []
self._drops: dict[str, TimedDrop] = {}
self._mnt_triggers: deque[datetime] = deque()
# Client type, session and auth
self._client_type: ClientInfo = ClientType.ANDROID_APP
self._session: aiohttp.ClientSession | None = None
self._auth_state: _AuthState = _AuthState(self)
# GUI
self.gui = GUIManager(self)
# Storing and watching channels
self.channels: OrderedDict[int, Channel] = OrderedDict()
self.watching_channel: AwaitableValue[Channel] = AwaitableValue()
self._watching_task: asyncio.Task[None] | None = None
self._watching_restart = asyncio.Event()
# Websocket
self.websocket = WebsocketPool(self)
# Maintenance task
self._mnt_task: asyncio.Task[None] | None = None
async def get_session(self) -> aiohttp.ClientSession:
if (session := self._session) is not None:
if session.closed:
raise RuntimeError("Session is closed")
return session
# load in cookies
cookie_jar = aiohttp.CookieJar()
try:
if COOKIES_PATH.exists():
cookie_jar.load(COOKIES_PATH)
except Exception:
# if loading in the cookies file ends up in an error, just ignore it
# clear the jar, just in case
cookie_jar.clear()
# create timeouts
# connection quality mulitiplier determines the magnitude of timeouts
connection_quality = self.settings.connection_quality
if connection_quality < 1:
connection_quality = self.settings.connection_quality = 1
elif connection_quality > 6:
connection_quality = self.settings.connection_quality = 6
timeout = aiohttp.ClientTimeout(
sock_connect=5*connection_quality,
total=10*connection_quality,
)
# create session, limited to 50 connections at maximum
connector = aiohttp.TCPConnector(limit=50)
self._session = aiohttp.ClientSession(
timeout=timeout,
connector=connector,
cookie_jar=cookie_jar,
headers={"User-Agent": self._client_type.USER_AGENT},
)
return self._session
async def shutdown(self) -> None:
start_time = time()
self.stop_watching()
if self._watching_task is not None:
self._watching_task.cancel()
self._watching_task = None
if self._mnt_task is not None:
self._mnt_task.cancel()
self._mnt_task = None
# stop websocket, close session and save cookies
await self.websocket.stop(clear_topics=True)
if self._session is not None:
cookie_jar = cast(aiohttp.CookieJar, self._session.cookie_jar)
# clear empty cookie entries off the cookies file before saving
# NOTE: Unfortunately, aiohttp provides no easy way of clearing empty cookies,
# so we need to access the private '_cookies' attribute for this.
for cookie_key, cookie in list(cookie_jar._cookies.items()):
if not cookie:
del cookie_jar._cookies[cookie_key]
cookie_jar.save(COOKIES_PATH)
await self._session.close()
self._session = None
self._drops.clear()
self.channels.clear()
self.inventory.clear()
self._auth_state.clear()
self.wanted_games.clear()
self._mnt_triggers.clear()
# wait at least half a second + whatever it takes to complete the closing
# this allows aiohttp to safely close the session
await asyncio.sleep(start_time + 0.5 - time())
def wait_until_login(self) -> abc.Coroutine[Any, Any, Literal[True]]:
return self._auth_state._logged_in.wait()
def change_state(self, state: State) -> None:
if self._state is not State.EXIT:
# prevent state changing once we switch to exit state
self._state = state
self._state_change.set()
def state_change(self, state: State) -> abc.Callable[[], None]:
# this is identical to change_state, but defers the call
# perfect for GUI usage
return partial(self.change_state, state)
def close(self):
"""
Called when the application is requested to close by the user,
usually by the console or application window being closed.
"""
self.change_state(State.EXIT)
def prevent_close(self):
"""
Called when the application window has to be prevented from closing, even after the user
closes it with X. Usually used solely to display tracebacks from the closing sequence.
"""
self.gui.prevent_close()
def print(self, message: str):
"""
Can be used to print messages within the GUI.
"""
self.gui.print(message)
def save(self, *, force: bool = False) -> None:
"""
Saves the application state.
"""
self.gui.save(force=force)
self.settings.save(force=force)
def get_priority(self, channel: Channel) -> int:
"""
Return a priority number for a given channel.
0 has the highest priority.
Higher numbers -> lower priority.
MAX_INT (a really big number) signifies the lowest possible priority.
"""
if (
(game := channel.game) is None # None when OFFLINE or no game set
or game not in self.wanted_games # we don't care about the played game
):
return MAX_INT
return self.wanted_games.index(game)
@staticmethod
def _viewers_key(channel: Channel) -> int:
if (viewers := channel.viewers) is not None:
return viewers
return -1
async def run(self):
if self.settings.dump:
with open(DUMP_PATH, 'w', encoding="utf8"):
# replace the existing file with an empty one
pass
while True:
try:
await self._run()
break
except ReloadRequest:
await self.shutdown()
except ExitRequest:
break
except aiohttp.ContentTypeError as exc:
raise RequestException(_("login", "unexpected_content")) from exc
async def _run(self):
"""
Main method that runs the whole client.
Here, we manage several things, specifically:
• Fetching the drops inventory to make sure that everything we can claim, is claimed
• Selecting a stream to watch, and watching it
• Changing the stream that's being watched if necessary
"""
self.gui.start()
auth_state = await self.get_auth()
await self.websocket.start()
# NOTE: watch task is explicitly restarted on each new run
if self._watching_task is not None:
self._watching_task.cancel()
self._watching_task = asyncio.create_task(self._watch_loop())
# Add default topics
self.websocket.add_topics([
WebsocketTopic("User", "Drops", auth_state.user_id, self.process_drops),
WebsocketTopic("User", "CommunityPoints", auth_state.user_id, self.process_points),
WebsocketTopic(
"User", "Notifications", auth_state.user_id, self.process_notifications
),
])
full_cleanup: bool = False
channels: Final[OrderedDict[int, Channel]] = self.channels
self.change_state(State.INVENTORY_FETCH)
while True:
if self._state is State.IDLE:
if self.settings.dump:
self.gui.close()
continue
self.gui.tray.change_icon("idle")
self.gui.status.update(_("gui", "status", "idle"))
self.stop_watching()
# clear the flag and wait until it's set again
self._state_change.clear()
elif self._state is State.INVENTORY_FETCH:
self.gui.tray.change_icon("maint")
# ensure the websocket is running
await self.websocket.start()
await self.fetch_inventory()
self.gui.set_games(set(campaign.game for campaign in self.inventory))
# Save state on every inventory fetch
self.save()
self.change_state(State.GAMES_UPDATE)
elif self._state is State.GAMES_UPDATE:
# claim drops from expired and active campaigns
for campaign in self.inventory:
if not campaign.upcoming:
for drop in campaign.drops:
if drop.can_claim:
await drop.claim()
# figure out which games we want
self.wanted_games.clear()
exclude = self.settings.exclude
priority = self.settings.priority
priority_mode = self.settings.priority_mode
priority_only = priority_mode is PriorityMode.PRIORITY_ONLY
next_hour = datetime.now(timezone.utc) + timedelta(hours=1)
# sorted_campaigns: list[DropsCampaign] = list(self.inventory)
sorted_campaigns: list[DropsCampaign] = self.inventory
if not priority_only:
if priority_mode is PriorityMode.ENDING_SOONEST:
sorted_campaigns.sort(key=lambda c: c.ends_at)
elif priority_mode is PriorityMode.LOW_AVBL_FIRST:
sorted_campaigns.sort(key=lambda c: c.availability)
sorted_campaigns.sort(
key=lambda c: (
priority.index(c.game.name) if c.game.name in priority else MAX_INT
)
)
for campaign in sorted_campaigns:
game: Game = campaign.game
if (
game not in self.wanted_games # isn't already there
# and isn't excluded by list or priority mode
and game.name not in exclude
and (not priority_only or game.name in priority)
# and can be progressed within the next hour
and campaign.can_earn_within(next_hour)
):
# non-excluded games with no priority are placed last, below priority ones
self.wanted_games.append(game)
full_cleanup = True
self.restart_watching()
self.change_state(State.CHANNELS_CLEANUP)
elif self._state is State.CHANNELS_CLEANUP:
self.gui.status.update(_("gui", "status", "cleanup"))
if not self.wanted_games or full_cleanup:
# no games selected or we're doing full cleanup: remove everything
to_remove_channels: list[Channel] = list(channels.values())
else:
# remove all channels that:
to_remove_channels = [
channel
for channel in channels.values()
if (
not channel.acl_based # aren't ACL-based
and (
channel.offline # and are offline
# or online but aren't streaming the game we want anymore
or (channel.game is None or channel.game not in self.wanted_games)
)
)
]
full_cleanup = False
if to_remove_channels:
to_remove_topics: list[str] = []
for channel in to_remove_channels:
to_remove_topics.append(
WebsocketTopic.as_str("Channel", "StreamState", channel.id)
)
to_remove_topics.append(
WebsocketTopic.as_str("Channel", "StreamUpdate", channel.id)
)
self.websocket.remove_topics(to_remove_topics)
for channel in to_remove_channels:
del channels[channel.id]
channel.remove()
del to_remove_channels, to_remove_topics
if self.wanted_games:
self.change_state(State.CHANNELS_FETCH)
else:
# with no games available, we switch to IDLE after cleanup
self.print(_("status", "no_campaign"))
self.change_state(State.IDLE)
elif self._state is State.CHANNELS_FETCH:
self.gui.status.update(_("gui", "status", "gathering"))
# start with all current channels, clear the memory and GUI
new_channels: set[Channel] = set(channels.values())
channels.clear()
self.gui.channels.clear()
# gather and add ACL channels from campaigns
# NOTE: we consider only campaigns that can be progressed
# NOTE: we use another set so that we can set them online separately
no_acl: set[Game] = set()
acl_channels: set[Channel] = set()
next_hour = datetime.now(timezone.utc) + timedelta(hours=1)
for campaign in self.inventory:
if (
campaign.game in self.wanted_games
and campaign.can_earn_within(next_hour)
):
if campaign.allowed_channels:
acl_channels.update(campaign.allowed_channels)
else:
no_acl.add(campaign.game)
# remove all ACL channels that already exist from the other set
acl_channels.difference_update(new_channels)
# use the other set to set them online if possible
# if acl_channels:
# await asyncio.gather(*(
# channel.update_stream(trigger_events=False)
# for channel in acl_channels
# ))
# finally, add them as new channels
new_channels.update(acl_channels)
for game in no_acl:
# for every campaign without an ACL, for it's game,
# add a list of live channels with drops enabled
new_channels.update(await self.get_live_streams(game, drops_enabled=True))
# sort them descending by viewers, by priority and by game priority
# NOTE: Viewers sort also ensures ONLINE channels are sorted to the top
# NOTE: We can drop using the set now, because there's no more channels being added
ordered_channels: list[Channel] = sorted(
new_channels, key=self._viewers_key, reverse=True
)
ordered_channels.sort(key=lambda ch: ch.acl_based, reverse=True)
ordered_channels.sort(key=self.get_priority)
# ensure that we won't end up with more channels than we can handle
# NOTE: we trim from the end because that's where the non-priority,
# offline (or online but low viewers) channels end up
to_remove_channels = ordered_channels[MAX_CHANNELS:]
ordered_channels = ordered_channels[:MAX_CHANNELS]
if to_remove_channels:
# tracked channels and gui were cleared earlier, so no need to do it here
# just make sure to unsubscribe from their topics
to_remove_topics = []
for channel in to_remove_channels:
to_remove_topics.append(
WebsocketTopic.as_str("Channel", "StreamState", channel.id)
)
to_remove_topics.append(
WebsocketTopic.as_str("Channel", "StreamUpdate", channel.id)
)
self.websocket.remove_topics(to_remove_topics)
del to_remove_channels, to_remove_topics
# set our new channel list
for channel in ordered_channels:
channels[channel.id] = channel
channel.display(add=True)
# subscribe to these channel's state updates
to_add_topics: list[WebsocketTopic] = []
for channel_id in channels:
to_add_topics.append(
WebsocketTopic(
"Channel", "StreamState", channel_id, self.process_stream_state
)
)
to_add_topics.append(
WebsocketTopic(
"Channel", "StreamUpdate", channel_id, self.process_stream_update
)
)
self.websocket.add_topics(to_add_topics)
# relink watching channel after cleanup,
# or stop watching it if it no longer qualifies
# NOTE: this replaces 'self.watching_channel's internal value with the new object
watching_channel = self.watching_channel.get_with_default(None)
if watching_channel is not None:
new_watching: Channel | None = channels.get(watching_channel.id)
if new_watching is not None and self.can_watch(new_watching):
self.watch(new_watching, update_status=False)
else:
# we've removed a channel we were watching
self.stop_watching()
del new_watching
# pre-display the active drop with a substracted minute
for channel in channels.values():
# check if there's any channels we can watch first
if self.can_watch(channel):
if (active_drop := self.get_active_drop(channel)) is not None:
active_drop.display(countdown=False, subone=True)
del active_drop
break
self.change_state(State.CHANNEL_SWITCH)
del (
no_acl,
acl_channels,
new_channels,
to_add_topics,
ordered_channels,
watching_channel,
)
elif self._state is State.CHANNEL_SWITCH:
if self.settings.dump:
self.gui.close()
continue
self.gui.status.update(_("gui", "status", "switching"))
# Change into the selected channel, stay in the watching channel,
# or select a new channel that meets the required conditions
new_watching = None
selected_channel = self.gui.channels.get_selection()
if selected_channel is not None and self.can_watch(selected_channel):
# selected channel is checked first, and set as long as we can watch it
new_watching = selected_channel
else:
# other channels additionally need to have a good reason
# for a switch (including the watching one)
# NOTE: we need to sort the channels every time because one channel
# can end up streaming any game - channels aren't game-tied
for channel in sorted(channels.values(), key=self.get_priority):
if self.can_watch(channel) and self.should_switch(channel):
new_watching = channel
break
watching_channel = self.watching_channel.get_with_default(None)
if new_watching is not None:
# if we have a better switch target - do so
self.watch(new_watching)
# break the state change chain by clearing the flag
self._state_change.clear()
elif watching_channel is not None:
# otherwise, continue watching what we had before
self.gui.status.update(
_("status", "watching").format(channel=watching_channel.name)
)
# break the state change chain by clearing the flag
self._state_change.clear()
else:
# not watching anything and there isn't anything to watch either
self.print(_("status", "no_channel"))
self.change_state(State.IDLE)
del new_watching, selected_channel, watching_channel
elif self._state is State.EXIT:
self.gui.tray.change_icon("pickaxe")
self.gui.status.update(_("gui", "status", "exiting"))
# we've been requested to exit the application
break
await self._state_change.wait()
async def _watch_sleep(self, delay: float) -> None:
# we use wait_for here to allow an asyncio.sleep-like that can be ended prematurely
self._watching_restart.clear()
with suppress(asyncio.TimeoutError):
await asyncio.wait_for(self._watching_restart.wait(), timeout=delay)
@task_wrapper(critical=True)
async def _watch_loop(self) -> NoReturn:
interval: float = WATCH_INTERVAL.total_seconds()
while True:
channel: Channel = await self.watching_channel.get()
if not channel.online:
# if the channel isn't online anymore, we stop watching it
self.stop_watching()
continue
succeeded: bool = await channel.send_watch()
if not succeeded:
logger.log(CALL, f"Watch requested failed for channel: {channel.name}")
elif not self.gui.progress.is_counting():
# If the previous update was more than 60s ago, and the progress tracker
# isn't counting down anymore, that means Twitch has temporarily
# stopped reporting drops progress. To ensure the timer keeps at least somewhat
# accurate time, we can use GQL to query for the current drop,
# or even "pretend" mining as a last resort option.
handled: bool = False
# Solution 1: use GQL to query for the currently mined drop status
try:
context = await self.gql_request(
GQL_OPERATIONS["CurrentDrop"].with_variables(
{"channelID": str(channel.id)}
)
)
drop_data: JsonType | None = (
context["data"]["currentUser"]["dropCurrentSession"]
)
except GQLException:
drop_data = None
if drop_data is not None:
drop = self._drops.get(drop_data["dropID"])
if drop is not None and drop.can_earn(channel):
drop.update_minutes(drop_data["currentMinutesWatched"])
drop_text = (
f"{drop.name} ({drop.campaign.game}, "
f"{drop.current_minutes}/{drop.required_minutes})"
)
logger.log(CALL, f"Drop progress from GQL: {drop_text}")
handled = True
# Solution 2: If GQL fails, figure out which drop we're most likely mining
# right now, and then bump up the minutes on that drop
if not handled:
if (drop := self.get_active_drop(channel)) is not None:
drop.bump_minutes()
drop_text = (
f"{drop.name} ({drop.campaign.game}, "
f"{drop.current_minutes}/{drop.required_minutes})"
)
logger.log(CALL, f"Drop progress from active search: {drop_text}")
handled = True
else:
logger.log(CALL, "No active drop could be determined")
await self._watch_sleep(interval)
@task_wrapper(critical=True)
async def _maintenance_task(self) -> None:
claim_period = timedelta(minutes=30)
max_period = timedelta(hours=1)
now = datetime.now(timezone.utc)
next_period = now + max_period
while True:
# exit if there's no need to repeat the loop
now = datetime.now(timezone.utc)
if now >= next_period:
break
next_trigger = min(now + claim_period, next_period)
trigger_cleanup = False
while self._mnt_triggers and self._mnt_triggers[0] <= next_trigger:
next_trigger = self._mnt_triggers.popleft()
trigger_cleanup = True
if next_trigger == next_period:
trigger_type: str = "Reload"
elif trigger_cleanup:
trigger_type = "Cleanup"
else:
trigger_type = "Points"
logger.log(
CALL,
(
"Maintenance task waiting until: "
f"{next_trigger.astimezone().strftime('%X')} ({trigger_type})"
)
)
await asyncio.sleep((next_trigger - now).total_seconds())
# exit after waiting, before the actions
now = datetime.now(timezone.utc)
if now >= next_period:
break
if trigger_cleanup:
logger.log(CALL, "Maintenance task requests channels cleanup")
self.change_state(State.CHANNELS_CLEANUP)
# ensure that we don't have unclaimed points bonus
watching_channel = self.watching_channel.get_with_default(None)
if watching_channel is not None:
try:
await watching_channel.claim_bonus()
except Exception:
pass # we intentionally silently skip anything else
# this triggers a restart of this task every (up to) 60 minutes
logger.log(CALL, "Maintenance task requests a reload")
self.change_state(State.INVENTORY_FETCH)
def can_watch(self, channel: Channel) -> bool:
"""
Determines if the given channel qualifies as a watching candidate.
"""
if not self.wanted_games:
return False
# exit early if
if (
not channel.online # stream is offline
or not channel.drops_enabled # drops aren't enabled
# there's no game or it's not one of the games we've selected
or (game := channel.game) is None or game not in self.wanted_games
):
return False
# check if we can progress any campaign for the played game
for campaign in self.inventory:
if campaign.game == game and campaign.can_earn(channel):
return True
return False
def should_switch(self, channel: Channel) -> bool:
"""
Determines if the given channel qualifies as a switch candidate.
"""
watching_channel = self.watching_channel.get_with_default(None)
if watching_channel is None:
return True
channel_order = self.get_priority(channel)
watching_order = self.get_priority(watching_channel)
return (
# this channel's game is higher order than the watching one's
channel_order < watching_order
or channel_order == watching_order # or the order is the same
# and this channel is ACL-based and the watching channel isn't
and channel.acl_based > watching_channel.acl_based
)
def watch(self, channel: Channel, *, update_status: bool = True):
self.gui.tray.change_icon("active")
self.gui.channels.set_watching(channel)
self.watching_channel.set(channel)
if update_status:
status_text = _("status", "watching").format(channel=channel.name)
self.print(status_text)
self.gui.status.update(status_text)
def stop_watching(self):
self.gui.clear_drop()
self.watching_channel.clear()
self.gui.channels.clear_watching()
def restart_watching(self):
self.gui.progress.stop_timer()
self._watching_restart.set()
@task_wrapper
async def process_stream_state(self, channel_id: int, message: JsonType):
msg_type = message["type"]
channel = self.channels.get(channel_id)
if channel is None:
logger.error(f"Stream state change for a non-existing channel: {channel_id}")
return
if msg_type == "viewcount":
if not channel.online:
# if it's not online for some reason, set it so
channel.check_online()
else:
viewers = message["viewers"]
channel.viewers = viewers
channel.display()
# logger.debug(f"{channel.name} viewers: {viewers}")
elif msg_type == "stream-down":
channel.set_offline()
elif msg_type == "stream-up":
channel.check_online()
elif msg_type == "commercial":
# skip these
pass
else:
logger.warning(f"Unknown stream state: {msg_type}")
@task_wrapper
async def process_stream_update(self, channel_id: int, message: JsonType):
# message = {
# "channel_id": "12345678",
# "type": "broadcast_settings_update",
# "channel": "channel._login",
# "old_status": "Old title",
# "status": "New title",
# "old_game": "Old game name",
# "game": "New game name",
# "old_game_id": 123456,
# "game_id": 123456
# }
channel = self.channels.get(channel_id)
if channel is None:
logger.error(f"Broadcast settings update for a non-existing channel: {channel_id}")
return
if message["old_game"] != message["game"]:
game_change = f", game changed: {message['old_game']} -> {message['game']}"
else:
game_change = ''
logger.log(CALL, f"Channel update from websocket: {channel.name}{game_change}")
# There's no information about channel tags here, but this event is triggered
# when the tags change. We can use this to just update the stream data after the change.
# Use 'set_online' to introduce a delay, allowing for multiple title and tags
# changes before we update. This eventually calls 'on_channel_update' below.
channel.check_online()
def on_channel_update(
self, channel: Channel, stream_before: Stream | None, stream_after: Stream | None
):
"""
Called by a Channel when it's status is updated (ONLINE, OFFLINE, title/tags change).
NOTE: 'stream_before' gets dealocated once this function finishes.
"""
if stream_before is None:
if stream_after is not None:
# Channel going ONLINE
if (
self.can_watch(channel) # we can watch the channel
and self.should_switch(channel) # and we should!
):
self.print(_("status", "goes_online").format(channel=channel.name))
self.watch(channel)
else:
logger.info(f"{channel.name} goes ONLINE")
else:
# Channel was OFFLINE and stays that way
logger.log(CALL, f"{channel.name} stays OFFLINE")
else:
watching_channel = self.watching_channel.get_with_default(None)
if (
watching_channel is not None
and watching_channel == channel # the watching channel was the one updated
and not self.can_watch(channel) # we can't watch it anymore
):
# NOTE: In these cases, channel was the watching channel
if stream_after is None:
# Channel going OFFLINE
self.print(_("status", "goes_offline").format(channel=channel.name))
else:
# Channel stays ONLINE, but we can't watch it anymore
logger.info(
f"{channel.name} status has been updated, switching... "
f"(🎁: {stream_before.drops_enabled and '' or ''} -> "
f"{stream_after.drops_enabled and '' or ''})"
)
self.change_state(State.CHANNEL_SWITCH)
# NOTE: In these cases, it wasn't the watching channel
elif stream_after is None:
logger.info(f"{channel.name} goes OFFLINE")
else:
# Channel is and stays ONLINE, but has been updated
logger.info(
f"{channel.name} status has been updated "
f"(🎁: {stream_before.drops_enabled and '' or ''} -> "
f"{stream_after.drops_enabled and '' or ''})"
)
channel.display()
@task_wrapper
async def process_drops(self, user_id: int, message: JsonType):
# Message examples:
# {"type": "drop-progress", data: {"current_progress_min": 3, "required_progress_min": 10}}
# {"type": "drop-claim", data: {"drop_instance_id": ...}}
msg_type: str = message["type"]
if msg_type not in ("drop-progress", "drop-claim"):
return
drop_id: str = message["data"]["drop_id"]
drop: TimedDrop | None = self._drops.get(drop_id)
watching_channel: Channel | None = self.watching_channel.get_with_default(None)
if msg_type == "drop-claim":
if drop is None:
logger.error(
f"Received a drop claim ID for a non-existing drop: {drop_id}\n"
f"Drop claim ID: {message['data']['drop_instance_id']}"
)
return
drop.update_claim(message["data"]["drop_instance_id"])
campaign = drop.campaign
mined = await drop.claim()
drop.display()
if mined:
claim_text = (
f"{campaign.game.name}\n"
f"{drop.rewards_text()} ({campaign.claimed_drops}/{campaign.total_drops})"
)
# two different claim texts, becase a new line after the game name
# looks ugly in the output window - replace it with a space
self.print(_("status", "claimed_drop").format(drop=claim_text.replace('\n', ' ')))
self.gui.tray.notify(claim_text, _("gui", "tray", "notification_title"))
else:
logger.error(f"Drop claim failed! Drop ID: {drop_id}")
# About 4-20s after claiming the drop, next drop can be started
# by re-sending the watch payload. We can test for it by fetching the current drop
# via GQL, and then comparing drop IDs.
await asyncio.sleep(4)
if watching_channel is not None:
for attempt in range(8):
context = await self.gql_request(
GQL_OPERATIONS["CurrentDrop"].with_variables(
{"channelID": str(watching_channel.id)}
)
)
drop_data: JsonType | None = (
context["data"]["currentUser"]["dropCurrentSession"]
)
if drop_data is None or drop_data["dropID"] != drop.id:
break
await asyncio.sleep(2)
if campaign.can_earn(watching_channel):
self.restart_watching()
else:
self.change_state(State.INVENTORY_FETCH)
return
assert msg_type == "drop-progress"
if drop is not None:
drop_text = (
f"{drop.name} ({drop.campaign.game}, "
f"{message['data']['current_progress_min']}/"
f"{message['data']['required_progress_min']})"
)
else:
drop_text = "<Unknown>"
logger.log(CALL, f"Drop update from websocket: {drop_text}")
if drop is not None and drop.can_earn(self.watching_channel.get_with_default(None)):
# the received payload is for the drop we expected
drop.update_minutes(message["data"]["current_progress_min"])
@task_wrapper
async def process_notifications(self, user_id: int, message: JsonType):
if message["type"] == "create-notification":
data: JsonType = message["data"]["notification"]
if data["type"] == "user_drop_reward_reminder_notification":
self.change_state(State.INVENTORY_FETCH)
await self.gql_request(
GQL_OPERATIONS["NotificationsDelete"].with_variables(
{"input": {"id": data["id"]}}
)
)
@task_wrapper
async def process_points(self, user_id: int, message: JsonType):
# Example payloads:
# {
# "type": "points-earned",
# "data": {
# "timestamp": "YYYY-MM-DDTHH:MM:SS.UUUUUUUUUZ",
# "channel_id": "123456789",
# "point_gain": {
# "user_id": "12345678",
# "channel_id": "123456789",
# "total_points": 10,
# "baseline_points": 10,
# "reason_code": "WATCH",
# "multipliers": []
# },
# "balance": {
# "user_id": "12345678",
# "channel_id": "123456789",
# "balance": 12345
# }
# }
# }
# {
# "type": "claim-available",
# "data": {
# "timestamp":"YYYY-MM-DDTHH:MM:SS.UUUUUUUUUZ",
# "claim": {
# "id": "4ae6fefd-1234-40ae-ad3d-92254c576a91",
# "user_id": "12345678",
# "channel_id": "123456789",
# "point_gain": {
# "user_id": "12345678",
# "channel_id": "123456789",
# "total_points": 50,
# "baseline_points": 50,
# "reason_code": "CLAIM",
# "multipliers": []
# },
# "created_at": "YYYY-MM-DDTHH:MM:SSZ"
# }
# }
# }
msg_type = message["type"]
if msg_type == "points-earned":
data: JsonType = message["data"]
channel: Channel | None = self.channels.get(int(data["channel_id"]))
points: int = data["point_gain"]["total_points"]
balance: int = data["balance"]["balance"]
if channel is not None:
channel.points = balance
channel.display()
self.print(_("status", "earned_points").format(points=f"{points:3}", balance=balance))
elif msg_type == "claim-available":
claim_data = message["data"]["claim"]
points = claim_data["point_gain"]["total_points"]
await self.claim_points(claim_data["channel_id"], claim_data["id"])
self.print(_("status", "claimed_points").format(points=points))
async def get_auth(self) -> _AuthState:
await self._auth_state.validate()
return self._auth_state
@asynccontextmanager
async def request(
self, method: str, url: URL | str, *, invalidate_after: datetime | None = None, **kwargs
) -> abc.AsyncIterator[aiohttp.ClientResponse]:
session = await self.get_session()
method = method.upper()
if self.settings.proxy and "proxy" not in kwargs:
kwargs["proxy"] = self.settings.proxy
logger.debug(f"Request: ({method=}, {url=}, {kwargs=})")
session_timeout = timedelta(seconds=session.timeout.total or 0)
backoff = ExponentialBackoff(maximum=3*60)
for delay in backoff:
if self.gui.close_requested:
raise ExitRequest()
elif (
invalidate_after is not None
# account for the expiration landing during the request
and datetime.now(timezone.utc) >= (invalidate_after - session_timeout)
):
raise RequestInvalid()
try:
response: aiohttp.ClientResponse | None = None
response = await self.gui.coro_unless_closed(
session.request(method, url, **kwargs)
)
assert response is not None
logger.debug(f"Response: {response.status}: {response}")
if response.status < 500:
# pre-read the response to avoid getting errors outside of the context manager
raw_response = await response.read() # noqa
yield response
return
self.print(_("error", "site_down").format(seconds=round(delay)))
except aiohttp.ClientConnectorCertificateError:
# for a case where SSL verification fails
raise
except (
aiohttp.ClientConnectionError, asyncio.TimeoutError, aiohttp.ClientPayloadError
):
# connection problems, retry
if backoff.steps > 1:
# just so that quick retries that sometimes happen, aren't shown
self.print(_("error", "no_connection").format(seconds=round(delay)))
finally:
if response is not None:
response.release()
with suppress(asyncio.TimeoutError):
await asyncio.wait_for(self.gui.wait_until_closed(), timeout=delay)
@overload
async def gql_request(self, ops: GQLOperation) -> JsonType:
...
@overload
async def gql_request(self, ops: list[GQLOperation]) -> list[JsonType]:
...
async def gql_request(
self, ops: GQLOperation | list[GQLOperation]
) -> JsonType | list[JsonType]:
gql_logger.debug(f"GQL Request: {ops}")
backoff = ExponentialBackoff(maximum=60)
for delay in backoff:
try:
auth_state = await self.get_auth()
async with self.request(
"POST",
"https://gql.twitch.tv/gql",
json=ops,
headers=auth_state.headers(user_agent=self._client_type.USER_AGENT, gql=True),
invalidate_after=None, # unused
) as response:
response_json: JsonType | list[JsonType] = await response.json()
except RequestInvalid:
continue
gql_logger.debug(f"GQL Response: {response_json}")
orig_response = response_json
if isinstance(response_json, list):
response_list = response_json
else:
response_list = [response_json]
force_retry: bool = False
for response_json in response_list:
# GQL errors handling
if "errors" in response_json:
for error_dict in response_json["errors"]:
if (
"message" in error_dict
and error_dict["message"] in (
# "server error",
# "service error",
"service unavailable",
"service timeout",
"context deadline exceeded",
)
):
force_retry = True
break
else:
raise GQLException(response_json['errors'])
# Other error handling
elif "error" in response_json:
raise GQLException(
f"{response_json['error']}: {response_json['message']}"
)
if force_retry:
break
else:
return orig_response
await asyncio.sleep(delay)
raise GQLException("Retry loop was broken")
def _merge_data(self, primary_data: JsonType, secondary_data: JsonType) -> JsonType:
merged = {}
for key in set(chain(primary_data.keys(), secondary_data.keys())):
in_primary = key in primary_data
if in_primary and key in secondary_data:
vp = primary_data[key]
vs = secondary_data[key]
if not isinstance(vp, type(vs)) or not isinstance(vs, type(vp)):
raise MinerException("Inconsistent merge data")
if isinstance(vp, dict): # both are dicts
merged[key] = self._merge_data(vp, vs)
else:
# use primary value
merged[key] = vp
elif in_primary:
merged[key] = primary_data[key]
else: # in campaigns only
merged[key] = secondary_data[key]
return merged
async def fetch_campaigns(
self, campaigns_chunk: list[tuple[str, JsonType]]
) -> dict[str, JsonType]:
campaign_ids: dict[str, JsonType] = dict(campaigns_chunk)
auth_state = await self.get_auth()
response_list: list[JsonType] = await self.gql_request(
[
GQL_OPERATIONS["CampaignDetails"].with_variables(
{"channelLogin": str(auth_state.user_id), "dropID": cid}
)
for cid in campaign_ids
]
)
fetched_data: dict[str, JsonType] = {
(campaign_data := response_json["data"]["user"]["dropCampaign"])["id"]: campaign_data
for response_json in response_list
}
return self._merge_data(campaign_ids, fetched_data)
async def fetch_inventory(self) -> None:
status_update = self.gui.status.update
status_update(_("gui", "status", "fetching_inventory"))
# fetch in-progress campaigns (inventory)
response = await self.gql_request(GQL_OPERATIONS["Inventory"])
inventory: JsonType = response["data"]["currentUser"]["inventory"]
ongoing_campaigns: list[JsonType] = inventory["dropCampaignsInProgress"] or []
# this contains claimed benefit edge IDs, not drop IDs
claimed_benefits: dict[str, datetime] = {
b["id"]: timestamp(b["lastAwardedAt"]) for b in inventory["gameEventDrops"]
}
inventory_data: dict[str, JsonType] = {c["id"]: c for c in ongoing_campaigns}
# fetch general available campaigns data (campaigns)
response = await self.gql_request(GQL_OPERATIONS["Campaigns"])
available_list: list[JsonType] = response["data"]["currentUser"]["dropCampaigns"] or []
applicable_statuses = ("ACTIVE", "UPCOMING")
available_campaigns: dict[str, JsonType] = {
c["id"]: c
for c in available_list
if c["status"] in applicable_statuses # that are currently not expired
}
# fetch detailed data for each campaign, in chunks
# specifically use an intermediate list per a Python bug
# https://github.com/python/cpython/issues/88342
status_update(_("gui", "status", "fetching_campaigns"))
for chunk_coro in asyncio.as_completed(
[
self.fetch_campaigns(campaigns_chunk)
for campaigns_chunk in chunk(available_campaigns.items(), 20)
]
):
chunk_campaigns_data = await chunk_coro
# merge the inventory and campaigns datas together
inventory_data = self._merge_data(inventory_data, chunk_campaigns_data)
if self.settings.dump:
# dump the campaigns data to the dump file
with open(DUMP_PATH, 'a', encoding="utf8") as file:
# we need to pre-process the inventory dump a little
dump_data: JsonType = deepcopy(inventory_data)
for campaign_data in dump_data.values():
# replace ACL lists with a simple text description
if (
campaign_data["allow"]
and campaign_data["allow"].get("isEnabled", True)
and campaign_data["allow"]["channels"]
):
# simply count the channels included in the ACL
campaign_data["allow"]["channels"] = (
f"{len(campaign_data['allow']['channels'])} channels"
)
# replace drop instance IDs, so they don't include user IDs
for drop_data in campaign_data["timeBasedDrops"]:
if "self" in drop_data and drop_data["self"]["dropInstanceID"]:
drop_data["self"]["dropInstanceID"] = "..."
json.dump(dump_data, file, indent=4, sort_keys=True)
file.write("\n\n") # add 2x new line spacer
json.dump(claimed_benefits, file, indent=4, sort_keys=True, default=str)
# use the merged data to create campaign objects
campaigns: list[DropsCampaign] = [
DropsCampaign(self, campaign_data, claimed_benefits)
for campaign_data in inventory_data.values()
]
campaigns.sort(key=lambda c: c.active, reverse=True)
campaigns.sort(key=lambda c: c.upcoming and c.starts_at or c.ends_at)
campaigns.sort(key=lambda c: c.linked, reverse=True)
self._drops.clear()
self.gui.inv.clear()
self.inventory.clear()
self._mnt_triggers.clear()
switch_triggers: set[datetime] = set()
next_hour = datetime.now(timezone.utc) + timedelta(hours=1)
# add the campaigns to the internal inventory
for campaign in campaigns:
self._drops.update({drop.id: drop for drop in campaign.drops})
if campaign.can_earn_within(next_hour):
switch_triggers.update(campaign.time_triggers)
self.inventory.append(campaign)
# concurrently add the campaigns into the GUI
# NOTE: this fetches pictures from the CDN, so might be slow without a cache
status_update(
_("gui", "status", "adding_campaigns").format(counter=f"(0/{len(campaigns)})")
)
for i, coro in enumerate(
asyncio.as_completed([
asyncio.create_task(self.gui.inv.add_campaign(campaign))
for campaign in campaigns
]),
start=1,
):
await coro
status_update(
_("gui", "status", "adding_campaigns").format(counter=f"({i}/{len(campaigns)})")
)
# this is needed here explicitly, because cache reads from disk don't raise this
if self.gui.close_requested:
raise ExitRequest()
self._mnt_triggers.extend(sorted(switch_triggers))
# trim out all triggers that we're already past
now = datetime.now(timezone.utc)
while self._mnt_triggers and self._mnt_triggers[0] <= now:
self._mnt_triggers.popleft()
# NOTE: maintenance task is restarted at the end of each inventory fetch
if self._mnt_task is not None and not self._mnt_task.done():
self._mnt_task.cancel()
self._mnt_task = asyncio.create_task(self._maintenance_task())
def get_active_drop(self, channel: Channel | None = None) -> TimedDrop | None:
if not self.wanted_games:
return None
watching_channel = self.watching_channel.get_with_default(channel)
if watching_channel is None:
# if we aren't watching anything, we can't earn any drops
return None
watching_game: Game | None = watching_channel.game
if watching_game is None:
# if the channel isn't playing anything in particular, we can't determine the drop
return None
drops: list[TimedDrop] = []
for campaign in self.inventory:
if (
campaign.game == watching_game # campaign's game matches watching game
and campaign.can_earn(watching_channel) # can be earned on this channel
):
# add only the drops we can actually earn
drops.extend(drop for drop in campaign.drops if drop.can_earn(watching_channel))
if drops:
drops.sort(key=lambda d: d.remaining_minutes)
return drops[0]
return None
async def get_live_streams(
self, game: Game, *, limit: int = 20, drops_enabled: bool = True
) -> list[Channel]:
filters: list[str] = []
if drops_enabled:
filters.append("DROPS_ENABLED")
try:
response = await self.gql_request(
GQL_OPERATIONS["GameDirectory"].with_variables({
"limit": limit,
"slug": game.slug,
"options": {
"includeRestricted": ["SUB_ONLY_LIVE"],
"systemFilters": filters,
},
})
)
except GQLException as exc:
raise MinerException(f"Game: {game.slug}") from exc
if "game" in response["data"]:
return [
Channel.from_directory(
self, stream_channel_data["node"], drops_enabled=drops_enabled
)
for stream_channel_data in response["data"]["game"]["streams"]["edges"]
]
return []
async def claim_points(self, channel_id: str | int, claim_id: str) -> None:
await self.gql_request(
GQL_OPERATIONS["ClaimCommunityPoints"].with_variables(
{"input": {"channelID": str(channel_id), "claimID": claim_id}}
)
)