Files
TwitchDropsMiner/twitch.py

752 lines
32 KiB
Python

from __future__ import annotations
import os
import asyncio
import logging
import traceback
from yarl import URL
from time import time
from itertools import chain
from functools import partial
from typing import (
Callable, Iterable, Optional, Union, List, Dict, Generic, TypeVar, cast, TYPE_CHECKING
)
try:
import aiohttp
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install aiohttp' first") from exc
from channel import Channel
from gui import GUIManager, LoginData
from websocket import WebsocketPool, task_wrapper
from inventory import DropsCampaign, Game, TimedDrop
from exceptions import LoginException, CaptchaRequired
from constants import (
State,
JsonType,
WebsocketTopic,
CLIENT_ID,
USER_AGENT,
COOKIES_PATH,
AUTH_URL,
GQL_URL,
WATCH_INTERVAL,
GQL_OPERATIONS,
DROPS_ENABLED_TAG,
GQLOperation,
)
if TYPE_CHECKING:
from main import ParsedArgs
logger = logging.getLogger("TwitchDrops")
gql_logger = logging.getLogger("TwitchDrops.gql")
_V = TypeVar("_V")
_D = TypeVar("_D")
class _AwaitableValue(Generic[_V]):
def __init__(self):
self._value: _V
self._event = asyncio.Event()
def has_value(self) -> bool:
return self._event.is_set()
def get_with_default(self, default: _D) -> Union[_D, _V]:
if self._event.is_set():
return self._value
return default
async def get(self) -> _V:
await self._event.wait()
return self._value
def set(self, value: _V) -> None:
self._value = value
self._event.set()
def clear(self) -> None:
self._event.clear()
class Twitch:
def __init__(self, options: ParsedArgs):
self._options = options
# GUI
self.gui = GUIManager(self)
# Cookies, session and auth
cookie_jar = aiohttp.CookieJar()
if os.path.isfile(COOKIES_PATH):
cookie_jar.load(COOKIES_PATH)
self._session = aiohttp.ClientSession(
cookie_jar=cookie_jar,
headers={"User-Agent": USER_AGENT},
timeout=aiohttp.ClientTimeout(connect=5, total=10),
)
self._access_token: Optional[str] = None
self._user_id: Optional[int] = None
self._is_logged_in = asyncio.Event()
# State management
self._state: State = State.INVENTORY_FETCH
self._state_change = asyncio.Event()
self.inventory: List[DropsCampaign] = [] # inventory
# Storing and watching channels
self.channels: Dict[int, Channel] = {}
self._watching_channel: _AwaitableValue[Channel] = _AwaitableValue()
self._watching_task: Optional[asyncio.Task[None]] = None
self._watching_restart = asyncio.Event()
self._drop_update: Optional[asyncio.Future[bool]] = None
# Websocket
self.websocket = WebsocketPool(self)
# Runner task
self._main_task: Optional[asyncio.Task[None]] = None
def wait_until_login(self):
return self._is_logged_in.wait()
def change_state(self, state: State) -> None:
self._state = state
self._state_change.set()
def state_change(self, state: State) -> Callable[[], None]:
# this is identical to change_state, but defers the call
# perfect for GUI usage
return partial(self.change_state, state)
def request_close(self):
"""
Called when the application is requested to close,
usually by the console or application window being closed.
"""
self.stop()
def start(self):
self._loop = loop = asyncio.get_event_loop()
self._main_task = loop.create_task(self._run())
try:
loop.run_until_complete(self._main_task)
except asyncio.CancelledError:
# happens when the user requests close
pass
except KeyboardInterrupt:
# KeyboardInterrupt causes run_until_complete to exit, but without cancelling the task.
# The loop stops and thus the task gets frozen, until the loop runs again.
# Because we don't want anything from there to actually run during cleanup,
# we need to explicitly cancel the task ourselves here.
self.stop()
except CaptchaRequired:
self.gui.prevent_close()
self.gui.print(
"Your login attempt was denied by CAPTCHA.\nPlease try again in +12 hours."
)
except Exception:
self.gui.prevent_close()
self.gui.print("Fatal error encountered:\n")
self.gui.print(traceback.format_exc())
finally:
loop.run_until_complete(self.close())
loop.run_until_complete(loop.shutdown_asyncgens())
if not self.gui.close_requested:
self.gui.print(
"\nApplication Terminated.\nClose the window to exit the application."
)
loop.run_until_complete(self.gui.wait_until_closed())
self.gui.stop()
loop.close()
def stop(self):
if self._main_task is not None:
self._main_task.cancel()
self._main_task = None
async def close(self):
start_time = time()
self.gui.print("Exiting...")
self.stop_watching()
if self._watching_task is not None:
self._watching_task.cancel()
self._watching_task = None
self._session.cookie_jar.save(COOKIES_PATH) # type: ignore
await self._session.close()
await self.websocket.stop()
# wait at least one full second + whatever it takes to complete the closing
# this allows aiohttp to safely close the session
await asyncio.sleep(start_time + 1 - time())
def is_watching(self, channel: Channel) -> bool:
watching_channel = self._watching_channel.get_with_default(None)
return watching_channel is not None and watching_channel == channel
async def _run(self):
"""
Main method that runs the whole client.
Here, we manage several things, specifically:
• Fetching the drops inventory to make sure that everything we can claim, is claimed
• Selecting a stream to watch, and watching it
• Changing the stream that's being watched if necessary
"""
self.gui.start()
self._watching_task = asyncio.create_task(self._watch_loop())
await self.check_login()
# Add default topics
assert self._user_id is not None
self.websocket.add_topics([
WebsocketTopic("User", "Drops", self._user_id, self.process_drops),
WebsocketTopic("User", "CommunityPoints", self._user_id, self.process_points),
])
games: List[Game] = []
selected_game: Optional[Game] = None
self.change_state(State.INVENTORY_FETCH)
while True:
if self._state is State.INVENTORY_FETCH:
await self.fetch_inventory()
self.change_state(State.GAMES_UPDATE)
elif self._state is State.GAMES_UPDATE:
# Figure out which games to watch, and claim the drops we can
games.clear()
for campaign in self.inventory:
# we have no use in processing upcoming campaigns here
if not campaign.upcoming:
for drop in campaign.timed_drops.values():
if drop.can_claim:
await drop.claim()
if drop.can_earn and (game := campaign.game) not in games:
games.append(game)
self.change_state(State.GAME_SELECT)
elif self._state is State.GAME_SELECT:
# 'games' has all games we want to mine drops for
# if it's empty, there's no point in continuing
if not games:
self.gui.print("No active campaigns to mine drops for.")
return
# only start the websocket after we confirm there are drops to mine
await self.websocket.start()
self.gui.games.set_games(games)
selected_game = self.gui.games.get_selection()
# pre-display the active drop without a countdown
active_drop = self.get_active_drop(selected_game)
if active_drop is not None:
active_drop.display(countdown=False)
self.change_state(State.CHANNELS_CLEANUP)
elif self._state is State.CHANNELS_CLEANUP:
# remove all channels that are offline,
# or aren't streaming the game we want anymore
to_remove: List[Channel] = [
channel for channel in self.channels.values()
if not (channel.online or channel.pending_online)
or channel.game is None or channel.game != selected_game
]
self.websocket.remove_topics(
WebsocketTopic.as_str("Channel", "VideoPlayback", channel.id)
for channel in to_remove
)
for channel in to_remove:
if self.is_watching(channel):
# we're removing a channel we're watching
self.stop_watching()
del self.channels[channel.id]
channel.remove()
self.gui.channels.shrink()
self.change_state(State.CHANNELS_FETCH)
elif self._state is State.CHANNELS_FETCH:
if selected_game is None:
self.change_state(State.GAME_SELECT)
else:
# get a list of all live channels with drops enabled
live_streams: List[Channel] = await self.get_live_streams(
selected_game, [DROPS_ENABLED_TAG]
)
# add them, filtering out ones we already have
for channel in live_streams:
if channel.id not in self.channels:
self.channels[channel.id] = channel
channel.display()
# load points
# asyncio.gather(*(channel.claim_bonus() for channel in live_streams))
# Sub to these channel updates
topics: List[WebsocketTopic] = [
WebsocketTopic(
"Channel", "VideoPlayback", channel_id, self.process_stream_state
)
for channel_id in self.channels
]
self.websocket.add_topics(topics)
self.change_state(State.CHANNEL_SWITCH)
elif self._state is State.CHANNEL_SWITCH:
if selected_game is None:
self.change_state(State.GAME_SELECT)
else:
# Change into the selected channel, stay in the watching channel,
# or select a new channel that meets the required conditions
channels: Iterable[Channel]
priority_channels: List[Channel] = []
selected_channel = self.gui.channels.get_selection()
if selected_channel is not None:
self.gui.channels.clear_selection()
priority_channels.append(selected_channel)
watching_channel = self._watching_channel.get_with_default(None)
if watching_channel is not None:
priority_channels.append(watching_channel)
channels = chain(priority_channels, self.channels.values())
# If there's no selected channel, change into a channel we can watch
for channel in channels:
if (
channel.online # steam online
and channel.drops_enabled # drops are enabled
and channel.game == selected_game # it's a game we've selected
):
self.watch(channel)
# break the state change chain by clearing the flag
self._state_change.clear()
break
else:
self.stop_watching()
selected_game = self.gui.games.select_next()
if selected_game is None:
self.gui.print("No suitable channel to watch.")
# TODO: Figure out what to do here.
return
self.change_state(State.CHANNELS_CLEANUP)
await self._state_change.wait()
async def _watch_loop(self) -> None:
interval = WATCH_INTERVAL.total_seconds()
i = 1
while True:
channel = await self._watching_channel.get()
await channel.send_watch()
last_watch = time()
self._drop_update = asyncio.Future()
use_active = False
try:
handled = await asyncio.wait_for(self._drop_update, timeout=10)
except asyncio.TimeoutError:
# there was no websocket update within 10s
handled = False
use_active = True
self._drop_update = None
if not handled:
# websocket update timed out, or the update was for an unrelated drop
if not use_active:
# we need to use GQL to get the current progress
context = await self.gql_request(GQL_OPERATIONS["CurrentDrop"])
drop_data: JsonType = context["data"]["currentUser"]["dropCurrentSession"]
drop_id = drop_data["dropID"]
drop = self.get_drop(drop_id)
if drop is None:
use_active = True
logger.error(f"Missing drop: {drop_id}")
elif not drop.can_earn:
use_active = True
else:
drop.update_minutes(drop_data["currentMinutesWatched"])
drop.display()
if use_active:
# Sometimes, even GQL fails to give us the correct drop.
# In that case, we can use the locally cached inventory to try
# and put together the drop that we're actually mining right now
selected_game = self.gui.games.get_selection()
drop = self.get_active_drop(selected_game)
if drop is not None:
drop.bump_minutes()
drop.display()
else:
logger.error("Active drop search failed")
if i % 30 == 1:
# ensure every 30 minutes that we don't have unclaimed points bonus
await channel.claim_bonus()
if i % 60 == 0:
# cleanup channels every hour
self.change_state(State.CHANNELS_CLEANUP)
i = (i + 1) % 3600
self._watching_restart.clear()
try:
await asyncio.wait_for(
self._watching_restart.wait(), timeout=last_watch + interval - time()
)
except asyncio.TimeoutError:
pass
def watch(self, channel: Channel):
if self.is_watching(channel):
# we're already watching the same channel, so there's no point switching
return
self.gui.channels.set_watching(channel)
self._watching_channel.set(channel)
def stop_watching(self):
self.gui.progress.stop_timer()
self.gui.channels.clear_watching()
self._watching_channel.clear()
def restart_watching(self, channel: Optional[Channel] = None):
# this forcibly re-sends the watching payload to the specified or currently watched channel
if channel is None:
channel = self._watching_channel.get_with_default(None)
if channel is not None:
self.gui.progress.stop_timer()
self._watching_channel.set(channel)
self.gui.channels.set_watching(channel)
self._watching_restart.set()
async def process_stream_state(self, channel_id: int, message: JsonType):
msg_type = message["type"]
channel = self.channels.get(channel_id)
if channel is None:
logger.error(f"Stream state change for a non-existing channel: {channel_id}")
return
if msg_type == "stream-down":
logger.info(f"{channel.name} goes OFFLINE")
channel.set_offline()
if self.is_watching(channel):
self.gui.print(f"{channel.name} goes OFFLINE, switching...")
# change the channel if we're currently watching it
self.change_state(State.CHANNEL_SWITCH)
elif msg_type == "stream-up":
logger.info(f"{channel.name} goes ONLINE")
channel.set_online()
elif msg_type == "viewcount":
if not channel.online:
# if it's not online for some reason, set it so
channel.set_online()
else:
viewers = message["viewers"]
channel.viewers = viewers
logger.debug(f"{channel.name} viewers: {viewers}")
@task_wrapper
async def process_drops(self, user_id: int, message: JsonType):
# Message examples:
# {"type": "drop-progress", data: {"current_progress_min": 3, "required_progress_min": 10}}
# {"type": "drop-claim", data: {"drop_instance_id": ...}}
msg_type: str = message["type"]
if msg_type not in ("drop-progress", "drop-claim"):
return
drop_id: str = message["data"]["drop_id"]
drop: Optional[TimedDrop] = self.get_drop(drop_id)
if msg_type == "drop-claim":
if drop is None:
logger.error(
f"Received a drop claim ID for a non-existing drop: {drop_id}\n"
f"Drop claim ID: {message['data']['drop_instance_id']}"
)
return
drop.update_claim(message["data"]["drop_instance_id"])
campaign = drop.campaign
mined = await drop.claim()
if mined:
claim_text = (
f"{drop.rewards_text()} "
f"({campaign.claimed_drops}/{campaign.total_drops})"
)
self.gui.print(f"Claimed drop: {claim_text}")
self.gui.tray.notify(claim_text, "Mined Drop")
else:
logger.error(f"Drop claim failed! Drop ID: {drop_id}")
if not mined or campaign.remaining_drops == 0:
self.change_state(State.GAMES_UPDATE)
return
# About 4-20s after claiming the drop, next drop can be started
# by re-sending the watch payload. We can test for it by fetching the current drop
# via GQL, and then comparing drop IDs.
await asyncio.sleep(4)
for attempt in range(8):
context = await self.gql_request(GQL_OPERATIONS["CurrentDrop"])
drop_data: JsonType = context["data"]["currentUser"]["dropCurrentSession"]
if drop_data["dropID"] != drop.id:
self.restart_watching()
break
await asyncio.sleep(2)
return
assert msg_type == "drop-progress"
if self._drop_update is None:
# we aren't actually waiting for a progress update right now, so we can just
# ignore the event this time
return
elif drop is not None and drop.can_earn:
drop.update_minutes(message["data"]["current_progress_min"])
drop.display()
# Let the watch loop know we've handled it here
self._drop_update.set_result(True)
else:
# Sometimes, the drop update we receive doesn't actually match what we're mining.
# This is a Twitch bug workaround: signal the watch loop to use GQL
# to get the current drop progress instead.
self._drop_update.set_result(False)
self._drop_update = None
@task_wrapper
async def process_points(self, user_id: int, message: JsonType):
# Example payloads:
# {
# "type": "points-earned",
# "data": {
# "timestamp": "YYYY-MM-DDTHH:MM:SS.UUUUUUUUUZ",
# "channel_id": "123456789",
# "point_gain": {
# "user_id": "12345678",
# "channel_id": "123456789",
# "total_points": 10,
# "baseline_points": 10,
# "reason_code": "WATCH",
# "multipliers": []
# },
# "balance": {
# "user_id": "12345678",
# "channel_id": "123456789",
# "balance": 12345
# }
# }
# }
# {
# "type": "claim-available",
# "data": {
# "timestamp":"YYYY-MM-DDTHH:MM:SS.UUUUUUUUUZ",
# "claim": {
# "id": "4ae6fefd-1234-40ae-ad3d-92254c576a91",
# "user_id": "12345678",
# "channel_id": "123456789",
# "point_gain": {
# "user_id": "12345678",
# "channel_id": "123456789",
# "total_points": 50,
# "baseline_points": 50,
# "reason_code": "CLAIM",
# "multipliers": []
# },
# "created_at": "YYYY-MM-DDTHH:MM:SSZ"
# }
# }
# }
msg_type = message["type"]
if msg_type == "points-earned":
data = message["data"]
channel = self.channels.get(int(data["channel_id"]))
points = data["point_gain"]["total_points"]
balance = data["balance"]["balance"]
if channel is not None:
channel.points = balance
self.gui.channels.display(channel)
self.gui.print(f"Earned points for watching: {points:3}, total: {balance}")
elif msg_type == "claim-available":
claim_data = message["data"]["claim"]
points = claim_data["point_gain"]["total_points"]
await self.claim_points(claim_data["channel_id"], claim_data["id"])
self.gui.print(f"Claimed bonus points: {points}")
async def _validate_password(self, password: str) -> bool:
"""
Use Twitch's password validator to validate the password length, characters required, etc.
Helps avoid running into the CAPTCHA if you mistype your password by mistake.
Valid length: 8-71
"""
if not 8 <= len(password) <= 71:
return False
payload = {"password": password}
async with self._session.post(
f"{AUTH_URL}/api/v1/password_strength", json=payload
) as response:
strength_response = await response.json()
return strength_response["isValid"]
async def ask_login(self) -> LoginData:
while True:
data = await self.gui.login.ask_login()
if await self._validate_password(data.password):
return data
async def _login(self) -> str:
logger.debug("Login flow started")
payload: JsonType = {
"client_id": CLIENT_ID,
"undelete_user": False,
"remember_me": True,
}
while True:
username, password, token = await self.ask_login()
payload["username"] = username
payload["password"] = password
# remove stale 2FA tokens, if present
payload.pop("authy_token", None)
payload.pop("twitchguard_code", None)
for attempt in range(2):
async with self._session.post(f"{AUTH_URL}/login", json=payload) as response:
login_response: JsonType = await response.json()
# Feed this back in to avoid running into CAPTCHA if possible
if "captcha_proof" in login_response:
payload["captcha"] = {"proof": login_response["captcha_proof"]}
# Error handling
if "error_code" in login_response:
error_code: int = login_response["error_code"]
logger.debug(f"Login error code: {error_code}")
if error_code == 1000:
# we've failed bois
logger.debug("Login failed due to CAPTCHA")
raise CaptchaRequired()
elif error_code == 3001:
# wrong password you dummy
logger.debug("Login failed due to incorrect login or pass")
self.gui.print("Incorrect username or password.")
self.gui.login.clear(password=True)
break
elif error_code in (
3012, # Invalid authy token
3023, # Invalid email code
):
logger.debug("Login failed due to incorrect 2FA code")
if error_code == 3023:
self.gui.print("Incorrect email code.")
else:
self.gui.print("Incorrect 2FA code.")
self.gui.login.clear(token=True)
break
elif error_code in (
3011, # Authy token needed
3022, # Email code needed
):
# 2FA handling
logger.debug("2FA token required")
email = error_code == 3022
if not token:
# user didn't provide a token, so ask them for it
if email:
self.gui.print("Email code required. Check your email.")
else:
self.gui.print("2FA token required.")
break
if email:
payload["twitchguard_code"] = token
else:
payload["authy_token"] = token
continue
else:
raise LoginException(login_response["error"])
# Success handling
if "access_token" in login_response:
# we're in bois
self._access_token = cast(str, login_response["access_token"])
logger.debug("Access token granted")
self.gui.login.clear()
return self._access_token
async def check_login(self) -> None:
if self._access_token is not None and self._user_id is not None:
# we're all good
return
# looks like we're missing something
logger.debug("Checking login")
self.gui.login.update("Logging in...", None)
jar = cast(aiohttp.CookieJar, self._session.cookie_jar)
for attempt in range(2):
cookie = jar.filter_cookies("https://twitch.tv") # type: ignore
if not cookie:
# no cookie - login
await self._login()
# store our auth token inside the cookie
cookie["auth-token"] = cast(str, self._access_token)
elif self._access_token is None:
# have cookie - get our access token
self._access_token = cookie["auth-token"].value
logger.debug("Restoring session from cookie")
# validate our access token, by obtaining user_id
async with self._session.get(
"https://id.twitch.tv/oauth2/validate",
headers={"Authorization": f"OAuth {self._access_token}"}
) as response:
status = response.status
if status == 401:
# the access token we have is invalid - clear the cookie and reauth
logger.debug("Restored session is invalid")
jar.clear_domain("twitch.tv")
continue
elif status == 200:
validate_response = await response.json()
break
else:
raise RuntimeError("Login verification failure")
self._user_id = int(validate_response["user_id"])
cookie["persistent"] = str(self._user_id)
self._is_logged_in.set()
logger.debug(f"Login successful, user ID: {self._user_id}")
self.gui.login.update("Logged in", self._user_id)
# update our cookie and save it
jar.update_cookies(cookie, URL("https://twitch.tv"))
jar.save(COOKIES_PATH)
async def gql_request(self, op: GQLOperation) -> JsonType:
await self.check_login()
headers = {
"Authorization": f"OAuth {self._access_token}",
"Client-Id": CLIENT_ID,
}
gql_logger.debug(f"GQL Request: {op}")
async with self._session.post(GQL_URL, json=op, headers=headers) as response:
response_json = await response.json()
gql_logger.debug(f"GQL Response: {response_json}")
return response_json
async def fetch_inventory(self) -> None:
response = await self.gql_request(GQL_OPERATIONS["Inventory"])
inventory = response["data"]["currentUser"]["inventory"]["dropCampaignsInProgress"] or []
campaigns = sorted(
(DropsCampaign(self, data) for data in inventory),
key=lambda c: c.ends_at,
)
self.inventory = campaigns
def get_drop(self, drop_id: str) -> Optional[TimedDrop]:
for campaign in self.inventory:
drop = campaign.timed_drops.get(drop_id)
if drop is not None:
return drop
return None
def get_active_drop(self, game: Game) -> Optional[TimedDrop]:
drops = sorted(
(
drop
for campaign in self.inventory
if campaign.active and campaign.game == game
for drop in campaign.timed_drops.values()
if drop.can_earn
),
key=lambda d: d.remaining_minutes,
)
if drops:
return drops[0]
return None
async def get_live_streams(self, game: Game, tag_ids: List[str]) -> List[Channel]:
limit = 45
response = await self.gql_request(
GQL_OPERATIONS["GameDirectory"].with_variables({
"limit": limit,
"name": game.name,
"options": {
"includeRestricted": ["SUB_ONLY_LIVE"],
"tags": tag_ids,
},
})
)
return [
Channel.from_directory(self, stream_channel_data["node"])
for stream_channel_data in response["data"]["game"]["streams"]["edges"]
]
async def claim_points(self, channel_id: Union[str, int], claim_id: str) -> None:
variables = {"input": {"channelID": str(channel_id), "claimID": claim_id}}
await self.gql_request(
GQL_OPERATIONS["ClaimCommunityPoints"].with_variables(variables)
)