Clear the cookie on a 5027 from login

This commit is contained in:
DevilXD
2022-10-23 08:33:28 +02:00
parent 6929633093
commit fee022b5a4
2 changed files with 26 additions and 21 deletions

View File

@@ -8,6 +8,8 @@ from enum import Enum, auto
from datetime import timedelta
from typing import Any, Dict, Literal, NewType, TYPE_CHECKING
from yarl import URL
from version import __version__
if TYPE_CHECKING:
@@ -59,7 +61,7 @@ MAX_WEBSOCKETS = 8
WS_TOPICS_LIMIT = 50
# Misc
DEFAULT_LANG = "English"
BASE_URL = "https://twitch.tv"
BASE_URL = URL("https://twitch.tv")
CLIENT_ID = "kd1unb4b3q4t58fwlpcbzcbnm76a8fp"
# CLIENT_ID = "uo6dggojyb8d6soh92zknwmi5ej1q2"
USER_AGENT = (

View File

@@ -80,6 +80,9 @@ class _AuthState:
and datetime.now(timezone.utc) >= self.integrity_expires
)
def _hasattrs(self, *attrs: str) -> bool:
return all(hasattr(self, attr) for attr in attrs)
def _delattrs(self, *attrs: str) -> None:
for attr in attrs:
if hasattr(self, attr):
@@ -130,8 +133,12 @@ class _AuthState:
if "error_code" in login_response:
error_code: int = login_response["error_code"]
logger.debug(f"Login error code: {error_code}")
if error_code == 1000:
if error_code in (1000, 5027):
# we've failed bois
session = await self._twitch.get_session()
jar = cast(aiohttp.CookieJar, session.cookie_jar)
assert BASE_URL.host is not None
jar.clear_domain(BASE_URL.host)
logger.debug("Login failed due to CAPTCHA")
raise CaptchaRequired()
elif error_code == 3001:
@@ -204,28 +211,25 @@ class _AuthState:
async def _validate(self):
if not hasattr(self, "session_id"):
self.session_id = create_nonce(CHARS_HEX_LOWER, 16)
if not (hasattr(self, "client_version") and hasattr(self, "device_id")):
if not self._hasattrs("client_version", "device_id", "access_token", "user_id"):
session = await self._twitch.get_session()
jar = cast(aiohttp.CookieJar, session.cookie_jar)
if not self._hasattrs("client_version", "device_id"):
async with self._twitch.request("GET", BASE_URL) as response:
match = re.search(r'twilightBuildID="([-a-z0-9]+)"', await response.text("utf8"))
if match is None:
raise MinerException("Unable to extract client_version")
self.client_version = match.group(1)
# doing the request ends up setting the "unique_id" value in the cookie
session = await self._twitch.get_session()
jar = cast(aiohttp.CookieJar, session.cookie_jar)
cookie = jar.filter_cookies(URL(BASE_URL))
cookie = jar.filter_cookies(BASE_URL)
self.device_id = cookie["unique_id"].value
if not (hasattr(self, "access_token") and hasattr(self, "user_id")):
if not self._hasattrs("access_token", "user_id"):
# looks like we're missing something
login_form: LoginForm = self._twitch.gui.login
logger.debug("Checking login")
login_form.update(_("gui", "login", "logging_in"), None)
session = await self._twitch.get_session()
jar = cast(aiohttp.CookieJar, session.cookie_jar)
url = URL(BASE_URL)
assert url.host is not None
for attempt in range(2):
cookie = jar.filter_cookies(url)
cookie = jar.filter_cookies(BASE_URL)
if "auth-token" not in cookie:
self.access_token = await self._login()
cookie["auth-token"] = self.access_token
@@ -242,7 +246,8 @@ class _AuthState:
if status == 401:
# the access token we have is invalid - clear the cookie and reauth
logger.debug("Restored session is invalid")
jar.clear_domain(url.host)
assert BASE_URL.host is not None
jar.clear_domain(BASE_URL.host)
continue
elif status == 200:
validate_response = await response.json()
@@ -254,9 +259,9 @@ class _AuthState:
logger.debug(f"Login successful, user ID: {self.user_id}")
login_form.update(_("gui", "login", "logged_in"), self.user_id)
# update our cookie and save it
jar.update_cookies(cookie, url)
jar.update_cookies(cookie, BASE_URL)
jar.save(COOKIES_PATH)
if not hasattr(self, "integrity_token") or self.integrity_expired:
if not self._hasattrs("integrity_token") or self.integrity_expired:
async with self._twitch.request(
"POST",
"https://gql.twitch.tv/integrity",
@@ -272,17 +277,15 @@ class _AuthState:
self.integrity_expires = ((expiration - now) * 0.9) + now
# verify the integrity token's contents for the "is_bad_bot" flag
stripped_token: str = self.integrity_token.split('.')[2] + "=="
messy_json: str = urlsafe_b64decode(
stripped_token.encode()
).decode(errors="ignore").replace('\n', '')
messy_json: str = urlsafe_b64decode(stripped_token.encode()).decode(errors="ignore")
match = re.search(r'(.+)(?<="}).+$', messy_json)
if match is None:
raise MinerException("Unable to parse the integrity token")
decoded_header: JsonType = json.loads(match.group(1))
if decoded_header.get("is_bad_bot", "false") != "false":
self._twitch.print(
"Twitch has detected this miner as a \"Bad Bot\", and may try to stop you "
"from claiming drops. You're proceeding at your own risk!"
"Twitch has detected this miner as a \"Bad Bot\". "
"You're proceeding at your own risk!"
)
await asyncio.sleep(8)
self._logged_in.set()
@@ -989,7 +992,7 @@ class Twitch:
@asynccontextmanager
async def request(
self, method: str, url: str, *, invalidate_after: datetime | None = None, **kwargs
self, method: str, url: URL | str, *, invalidate_after: datetime | None = None, **kwargs
) -> abc.AsyncIterator[aiohttp.ClientResponse]:
session = await self.get_session()
method = method.upper()