diff --git a/constants.py b/constants.py index 672da58..2cffc29 100644 --- a/constants.py +++ b/constants.py @@ -8,6 +8,8 @@ from enum import Enum, auto from datetime import timedelta from typing import Any, Dict, Literal, NewType, TYPE_CHECKING +from yarl import URL + from version import __version__ if TYPE_CHECKING: @@ -59,7 +61,7 @@ MAX_WEBSOCKETS = 8 WS_TOPICS_LIMIT = 50 # Misc DEFAULT_LANG = "English" -BASE_URL = "https://twitch.tv" +BASE_URL = URL("https://twitch.tv") CLIENT_ID = "kd1unb4b3q4t58fwlpcbzcbnm76a8fp" # CLIENT_ID = "uo6dggojyb8d6soh92zknwmi5ej1q2" USER_AGENT = ( diff --git a/twitch.py b/twitch.py index 9241b03..c4a6256 100644 --- a/twitch.py +++ b/twitch.py @@ -80,6 +80,9 @@ class _AuthState: and datetime.now(timezone.utc) >= self.integrity_expires ) + def _hasattrs(self, *attrs: str) -> bool: + return all(hasattr(self, attr) for attr in attrs) + def _delattrs(self, *attrs: str) -> None: for attr in attrs: if hasattr(self, attr): @@ -130,8 +133,12 @@ class _AuthState: if "error_code" in login_response: error_code: int = login_response["error_code"] logger.debug(f"Login error code: {error_code}") - if error_code == 1000: + if error_code in (1000, 5027): # we've failed bois + session = await self._twitch.get_session() + jar = cast(aiohttp.CookieJar, session.cookie_jar) + assert BASE_URL.host is not None + jar.clear_domain(BASE_URL.host) logger.debug("Login failed due to CAPTCHA") raise CaptchaRequired() elif error_code == 3001: @@ -204,28 +211,25 @@ class _AuthState: async def _validate(self): if not hasattr(self, "session_id"): self.session_id = create_nonce(CHARS_HEX_LOWER, 16) - if not (hasattr(self, "client_version") and hasattr(self, "device_id")): + if not self._hasattrs("client_version", "device_id", "access_token", "user_id"): + session = await self._twitch.get_session() + jar = cast(aiohttp.CookieJar, session.cookie_jar) + if not self._hasattrs("client_version", "device_id"): async with self._twitch.request("GET", BASE_URL) as response: match = re.search(r'twilightBuildID="([-a-z0-9]+)"', await response.text("utf8")) if match is None: raise MinerException("Unable to extract client_version") self.client_version = match.group(1) # doing the request ends up setting the "unique_id" value in the cookie - session = await self._twitch.get_session() - jar = cast(aiohttp.CookieJar, session.cookie_jar) - cookie = jar.filter_cookies(URL(BASE_URL)) + cookie = jar.filter_cookies(BASE_URL) self.device_id = cookie["unique_id"].value - if not (hasattr(self, "access_token") and hasattr(self, "user_id")): + if not self._hasattrs("access_token", "user_id"): # looks like we're missing something login_form: LoginForm = self._twitch.gui.login logger.debug("Checking login") login_form.update(_("gui", "login", "logging_in"), None) - session = await self._twitch.get_session() - jar = cast(aiohttp.CookieJar, session.cookie_jar) - url = URL(BASE_URL) - assert url.host is not None for attempt in range(2): - cookie = jar.filter_cookies(url) + cookie = jar.filter_cookies(BASE_URL) if "auth-token" not in cookie: self.access_token = await self._login() cookie["auth-token"] = self.access_token @@ -242,7 +246,8 @@ class _AuthState: if status == 401: # the access token we have is invalid - clear the cookie and reauth logger.debug("Restored session is invalid") - jar.clear_domain(url.host) + assert BASE_URL.host is not None + jar.clear_domain(BASE_URL.host) continue elif status == 200: validate_response = await response.json() @@ -254,9 +259,9 @@ class _AuthState: logger.debug(f"Login successful, user ID: {self.user_id}") login_form.update(_("gui", "login", "logged_in"), self.user_id) # update our cookie and save it - jar.update_cookies(cookie, url) + jar.update_cookies(cookie, BASE_URL) jar.save(COOKIES_PATH) - if not hasattr(self, "integrity_token") or self.integrity_expired: + if not self._hasattrs("integrity_token") or self.integrity_expired: async with self._twitch.request( "POST", "https://gql.twitch.tv/integrity", @@ -272,17 +277,15 @@ class _AuthState: self.integrity_expires = ((expiration - now) * 0.9) + now # verify the integrity token's contents for the "is_bad_bot" flag stripped_token: str = self.integrity_token.split('.')[2] + "==" - messy_json: str = urlsafe_b64decode( - stripped_token.encode() - ).decode(errors="ignore").replace('\n', '') + messy_json: str = urlsafe_b64decode(stripped_token.encode()).decode(errors="ignore") match = re.search(r'(.+)(?<="}).+$', messy_json) if match is None: raise MinerException("Unable to parse the integrity token") decoded_header: JsonType = json.loads(match.group(1)) if decoded_header.get("is_bad_bot", "false") != "false": self._twitch.print( - "Twitch has detected this miner as a \"Bad Bot\", and may try to stop you " - "from claiming drops. You're proceeding at your own risk!" + "Twitch has detected this miner as a \"Bad Bot\". " + "You're proceeding at your own risk!" ) await asyncio.sleep(8) self._logged_in.set() @@ -989,7 +992,7 @@ class Twitch: @asynccontextmanager async def request( - self, method: str, url: str, *, invalidate_after: datetime | None = None, **kwargs + self, method: str, url: URL | str, *, invalidate_after: datetime | None = None, **kwargs ) -> abc.AsyncIterator[aiohttp.ClientResponse]: session = await self.get_session() method = method.upper()