diff --git a/twitch.py b/twitch.py index 5de3a9d..b775a3e 100644 --- a/twitch.py +++ b/twitch.py @@ -1,10 +1,13 @@ from __future__ import annotations +import re +import json import asyncio import logging from math import ceil from time import time from functools import partial +from base64 import urlsafe_b64decode from collections import abc, OrderedDict from datetime import datetime, timedelta, timezone from contextlib import suppress, asynccontextmanager @@ -19,7 +22,15 @@ from channel import Channel from websocket import WebsocketPool from inventory import DropsCampaign from exceptions import MinerException, LoginException, CaptchaRequired, ExitRequest -from utils import task_wrapper, timestamp, AwaitableValue, OrderedSet, ExponentialBackoff +from utils import ( + CHARS_HEX_LOWER, + timestamp, + create_nonce, + task_wrapper, + OrderedSet, + AwaitableValue, + ExponentialBackoff, +) from constants import ( BASE_URL, CLIENT_ID, @@ -46,6 +57,201 @@ logger = logging.getLogger("TwitchDrops") gql_logger = logging.getLogger("TwitchDrops.gql") +class _IntegrityToken: + def __init__(self, response: JsonType): + self.token: str = response["token"] + self.expires: datetime = datetime.fromtimestamp( + response["expiration"] / 1000, timezone.utc + ) + # verify the integrity token's contents for the "is_bad_bot" flag + stripped_token: str = self.token.split('.')[2] + "==" + messy_json: str = urlsafe_b64decode( + stripped_token.encode() + ).decode(errors="ignore").replace('\n', '') + match = re.search(r'(.+)(?<="}).+$', messy_json) + if match is None: + raise MinerException("Unable to parse the integrity token") + decoded_header: JsonType = json.loads(match.group(1)) + if decoded_header.get("is_bad_bot", "false") != "false": + raise MinerException( + "Twitch considers this miner as a \"Bad Bot\". " + "Try deleting the cookie file and try again." + ) + + @property + def expired(self) -> bool: + return datetime.now(timezone.utc) >= self.expires + + +class _AuthState: + def __init__(self, twitch: Twitch): + self._twitch: Twitch = twitch + self.user_id: int + self.device_id: str + self.session_id: str + self.access_token: str + self.client_version: str + self.integrity_token: _IntegrityToken + + async def _login(self) -> str: + logger.debug("Login flow started") + login_form: LoginForm = self._twitch.gui.login + gui_print = self._twitch.gui.print + + payload: JsonType = { + "client_id": CLIENT_ID, + "undelete_user": False, + "remember_me": True, + } + + while True: + username, password, token = await login_form.ask_login() + payload["username"] = username + payload["password"] = password + # remove stale 2FA tokens, if present + payload.pop("authy_token", None) + payload.pop("twitchguard_code", None) + for attempt in range(2): + async with self._twitch.request( + "POST", "https://passport.twitch.tv/login", json=payload + ) as response: + login_response: JsonType = await response.json() + + # Feed this back in to avoid running into CAPTCHA if possible + if "captcha_proof" in login_response: + payload["captcha"] = {"proof": login_response["captcha_proof"]} + + # Error handling + if "error_code" in login_response: + error_code: int = login_response["error_code"] + logger.debug(f"Login error code: {error_code}") + if error_code == 1000: + # we've failed bois + logger.debug("Login failed due to CAPTCHA") + raise CaptchaRequired() + elif error_code == 3001: + # wrong password you dummy + logger.debug("Login failed due to incorrect username or password") + gui_print("Incorrect username or password.") + login_form.clear(password=True) + break + elif error_code in ( + 3012, # Invalid authy token + 3023, # Invalid email code + ): + logger.debug("Login failed due to incorrect 2FA code") + if error_code == 3023: + gui_print("Incorrect email code.") + else: + gui_print("Incorrect 2FA code.") + login_form.clear(token=True) + break + elif error_code in ( + 3011, # Authy token needed + 3022, # Email code needed + ): + # 2FA handling + logger.debug("2FA token required") + email = error_code == 3022 + if not token: + # user didn't provide a token, so ask them for it + if email: + gui_print("Email code required. Check your email.") + else: + gui_print("2FA token required.") + break + if email: + payload["twitchguard_code"] = token + else: + payload["authy_token"] = token + continue + else: + raise LoginException(login_response["error"]) + # Success handling + if "access_token" in login_response: + # we're in bois + self.access_token = cast(str, login_response["access_token"]) + logger.debug("Access token granted") + login_form.clear() + return self.access_token + + def gql_headers(self, *, integrity: bool) -> JsonType: + headers = { + "Authorization": f"OAuth {self.access_token}", + "Client-Id": CLIENT_ID, + "Client-Session-Id": self.session_id, + "Client-Version": self.client_version, + "X-Device-Id": self.device_id, + } + if integrity: + headers["Client-Integrity"] = self.integrity_token.token + return headers + + async def verify(self): + if not hasattr(self, "session_id"): + self.session_id = create_nonce(CHARS_HEX_LOWER, 16) + if not (hasattr(self, "client_version") and hasattr(self, "device_id")): + async with self._twitch.request("GET", BASE_URL) as response: + match = re.search(r'twilightBuildID="([-a-z0-9]+)"', await response.text("utf8")) + if match is None: + raise MinerException("Unable to extract client_version") + self.client_version = match.group(1) + # doing the request ends up setting the "unique_id" value in the cookie + session = await self._twitch.get_session() + jar = cast(aiohttp.CookieJar, session.cookie_jar) + cookie = jar.filter_cookies(URL(BASE_URL)) + self.device_id = cookie["unique_id"].value + if not (hasattr(self, "access_token") and hasattr(self, "user_id")): + # looks like we're missing something + login_form: LoginForm = self._twitch.gui.login + logger.debug("Checking login") + login_form.update(_("gui", "login", "logging_in"), None) + session = await self._twitch.get_session() + jar = cast(aiohttp.CookieJar, session.cookie_jar) + url = URL(BASE_URL) + assert url.host is not None + for attempt in range(2): + cookie = jar.filter_cookies(url) + if "auth-token" not in cookie: + self.access_token = await self._login() + cookie["auth-token"] = self.access_token + elif not hasattr(self, "access_token"): + logger.debug("Restoring session from cookie") + self.access_token = cookie["auth-token"].value + # validate the auth token, by obtaining user_id + async with self._twitch.request( + "GET", + "https://id.twitch.tv/oauth2/validate", + headers={"Authorization": f"OAuth {self.access_token}"} + ) as response: + status = response.status + if status == 401: + # the access token we have is invalid - clear the cookie and reauth + logger.debug("Restored session is invalid") + jar.clear_domain(url.host) + continue + elif status == 200: + validate_response = await response.json() + break + else: + raise RuntimeError("Login verification failure") + self.user_id = int(validate_response["user_id"]) + cookie["persistent"] = str(self.user_id) + self._twitch._is_logged_in.set() + logger.debug(f"Login successful, user ID: {self.user_id}") + login_form.update(_("gui", "login", "logged_in"), self.user_id) + # update our cookie and save it + jar.update_cookies(cookie, url) + jar.save(COOKIES_PATH) + if not hasattr(self, "integrity_token") or self.integrity_token.expired: + async with self._twitch.request( + "POST", + "https://gql.twitch.tv/integrity", + headers=self.gql_headers(integrity=False) + ) as response: + self.integrity_token = _IntegrityToken(await response.json()) + + class Twitch: def __init__(self, settings: Settings): self.settings: Settings = settings @@ -59,8 +265,7 @@ class Twitch: self.gui = GUIManager(self) # Cookies, session and auth self._session: aiohttp.ClientSession | None = None - self._access_token: str | None = None - self._user_id: int | None = None + self._auth_state: _AuthState = _AuthState(self) self._is_logged_in = asyncio.Event() # Storing and watching channels self.channels: OrderedDict[int, Channel] = OrderedDict() @@ -183,17 +388,16 @@ class Twitch: • Changing the stream that's being watched if necessary """ self.gui.start() - await self.check_login() + auth_state = await self.get_auth() await self.websocket.start() # NOTE: watch task is explicitly restarted on each new run if self._watching_task is not None: self._watching_task.cancel() self._watching_task = asyncio.create_task(self._watch_loop()) # Add default topics - assert self._user_id is not None self.websocket.add_topics([ - WebsocketTopic("User", "Drops", self._user_id, self.process_drops), - WebsocketTopic("User", "CommunityPoints", self._user_id, self.process_points), + WebsocketTopic("User", "Drops", auth_state.user_id, self.process_drops), + WebsocketTopic("User", "CommunityPoints", auth_state.user_id, self.process_points), ]) full_cleanup: bool = False channels: Final[OrderedDict[int, Channel]] = self.channels @@ -726,139 +930,9 @@ class Twitch: await self.claim_points(claim_data["channel_id"], claim_data["id"]) self.gui.print(_("status", "claimed_points").format(points=points)) - async def _login(self) -> str: - logger.debug("Login flow started") - login_form: LoginForm = self.gui.login - - payload: JsonType = { - "client_id": CLIENT_ID, - "undelete_user": False, - "remember_me": True, - } - - while True: - username, password, token = await login_form.ask_login() - payload["username"] = username - payload["password"] = password - # remove stale 2FA tokens, if present - payload.pop("authy_token", None) - payload.pop("twitchguard_code", None) - for attempt in range(2): - async with self.request( - "POST", "https://passport.twitch.tv/login", json=payload - ) as response: - login_response: JsonType = await response.json() - - # Feed this back in to avoid running into CAPTCHA if possible - if "captcha_proof" in login_response: - payload["captcha"] = {"proof": login_response["captcha_proof"]} - - # Error handling - if "error_code" in login_response: - error_code: int = login_response["error_code"] - logger.debug(f"Login error code: {error_code}") - if error_code == 1000: - # we've failed bois - logger.debug("Login failed due to CAPTCHA") - raise CaptchaRequired() - elif error_code == 3001: - # wrong password you dummy - logger.debug("Login failed due to incorrect username or password") - self.gui.print("Incorrect username or password.") - login_form.clear(password=True) - break - elif error_code in ( - 3012, # Invalid authy token - 3023, # Invalid email code - ): - logger.debug("Login failed due to incorrect 2FA code") - if error_code == 3023: - self.gui.print("Incorrect email code.") - else: - self.gui.print("Incorrect 2FA code.") - login_form.clear(token=True) - break - elif error_code in ( - 3011, # Authy token needed - 3022, # Email code needed - ): - # 2FA handling - logger.debug("2FA token required") - email = error_code == 3022 - if not token: - # user didn't provide a token, so ask them for it - if email: - self.gui.print("Email code required. Check your email.") - else: - self.gui.print("2FA token required.") - break - if email: - payload["twitchguard_code"] = token - else: - payload["authy_token"] = token - continue - else: - raise LoginException(login_response["error"]) - # Success handling - if "access_token" in login_response: - # we're in bois - self._access_token = cast(str, login_response["access_token"]) - logger.debug("Access token granted") - login_form.clear() - return self._access_token - - async def check_login(self) -> tuple[str, int]: - if self._access_token is not None and self._user_id is not None: - # we're all good - return (self._access_token, self._user_id) - # looks like we're missing something - login_form: LoginForm = self.gui.login - logger.debug("Checking login") - login_form.update(_("gui", "login", "logging_in"), None) - # NOTE: We need this here because of the jar being accessed - session = await self.get_session() - url = URL(BASE_URL) - assert url.host is not None - jar = cast(aiohttp.CookieJar, session.cookie_jar) - for attempt in range(2): - cookie = jar.filter_cookies(url) - if not cookie: - # no cookie - login - access_token: str = await self._login() - # store the auth token inside the cookie - cookie["auth-token"] = access_token - elif self._access_token is None: - # have cookie - get our access token - access_token = cookie["auth-token"].value - logger.debug("Restoring session from cookie") - # store the auth token within the application - self._access_token = access_token - # validate the auth token, by obtaining user_id - async with self.request( - "GET", - "https://id.twitch.tv/oauth2/validate", - headers={"Authorization": f"OAuth {self._access_token}"} - ) as response: - status = response.status - if status == 401: - # the access token we have is invalid - clear the cookie and reauth - logger.debug("Restored session is invalid") - jar.clear_domain(url.host) - continue - elif status == 200: - validate_response = await response.json() - break - else: - raise RuntimeError("Login verification failure") - self._user_id = int(validate_response["user_id"]) - cookie["persistent"] = str(self._user_id) - self._is_logged_in.set() - logger.debug(f"Login successful, user ID: {self._user_id}") - login_form.update(_("gui", "login", "logged_in"), self._user_id) - # update our cookie and save it - jar.update_cookies(cookie, url) - jar.save(COOKIES_PATH) - return (self._access_token, self._user_id) + async def get_auth(self) -> _AuthState: + await self._auth_state.verify() + return self._auth_state @asynccontextmanager async def request( @@ -904,15 +978,12 @@ class Twitch: async def gql_request(self, op: GQLOperation) -> JsonType: gql_logger.debug(f"GQL Request: {op}") - access_token, user_id = await self.check_login() + auth_state = await self.get_auth() async with self.request( "POST", "https://gql.twitch.tv/gql", json=op, - headers={ - "Authorization": f"OAuth {access_token}", - "Client-Id": CLIENT_ID, - } + headers=auth_state.gql_headers(integrity=True) ) as response: response_json: JsonType = await response.json() gql_logger.debug(f"GQL Response: {response_json}") @@ -926,9 +997,10 @@ class Twitch: available_data: JsonType, claimed_benefits: dict[str, datetime], ) -> DropsCampaign: + auth_state = await self.get_auth() response = await self.gql_request( GQL_OPERATIONS["CampaignDetails"].with_variables( - {"channelLogin": str(self._user_id), "dropID": campaign_id} + {"channelLogin": str(auth_state.user_id), "dropID": campaign_id} ) ) campaign_data: JsonType = response["data"]["user"]["dropCampaign"] diff --git a/utils.py b/utils.py index fd6561c..a37dbc4 100644 --- a/utils.py +++ b/utils.py @@ -57,11 +57,13 @@ def timestamp(string: str) -> datetime: return datetime.strptime(string, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) -NONCE_CHARS = string.ascii_letters + string.digits +CHARS_ASCII = string.ascii_letters + string.digits +CHARS_HEX_LOWER = string.digits + "abcdef" +CHARS_HEX_UPPER = string.digits + "ABCDEF" -def create_nonce(length: int = 30) -> str: - return ''.join(random.choices(NONCE_CHARS, k=length)) +def create_nonce(chars: str, length: int) -> str: + return ''.join(random.choices(chars, k=length)) def deduplicate(iterable: abc.Iterable[_T]) -> list[_T]: diff --git a/websocket.py b/websocket.py index c098fe5..d3a2e7d 100644 --- a/websocket.py +++ b/websocket.py @@ -13,7 +13,13 @@ from translate import _ from exceptions import MinerException, WebsocketClosed from constants import PING_INTERVAL, PING_TIMEOUT, MAX_WEBSOCKETS, WS_TOPICS_LIMIT from utils import ( - task_wrapper, create_nonce, json_minify, format_traceback, AwaitableValue, ExponentialBackoff + CHARS_ASCII, + task_wrapper, + create_nonce, + json_minify, + format_traceback, + AwaitableValue, + ExponentialBackoff, ) if TYPE_CHECKING: @@ -200,7 +206,7 @@ class Websocket: # nothing to do return self._topics_changed.clear() - access_token, user_id = await self._twitch.check_login() + auth_state = await self._twitch.get_auth() current: set[WebsocketTopic] = set(self.topics.values()) # handle removed topics removed = self._submitted.difference(current) @@ -212,7 +218,7 @@ class Websocket: "type": "UNLISTEN", "data": { "topics": topics_list, - "auth_token": access_token, + "auth_token": auth_state.access_token, } } ) @@ -227,7 +233,7 @@ class Websocket: "type": "LISTEN", "data": { "topics": topics_list, - "auth_token": access_token, + "auth_token": auth_state.access_token, } } ) @@ -316,7 +322,7 @@ class Websocket: ws = self._ws.get_with_default(None) assert ws is not None if message["type"] != "PING": - message["nonce"] = create_nonce() + message["nonce"] = create_nonce(CHARS_ASCII, 30) await ws.send_json(message, dumps=json_minify) ws_logger.debug(f"Websocket[{self._idx}] sent: {message}")