mirror of
https://github.com/rangermix/TwitchDropsMiner.git
synced 2026-06-06 04:19:39 +00:00
Initial integrity support
This commit is contained in:
364
twitch.py
364
twitch.py
@@ -1,10 +1,13 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import json
|
||||
import asyncio
|
||||
import logging
|
||||
from math import ceil
|
||||
from time import time
|
||||
from functools import partial
|
||||
from base64 import urlsafe_b64decode
|
||||
from collections import abc, OrderedDict
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from contextlib import suppress, asynccontextmanager
|
||||
@@ -19,7 +22,15 @@ from channel import Channel
|
||||
from websocket import WebsocketPool
|
||||
from inventory import DropsCampaign
|
||||
from exceptions import MinerException, LoginException, CaptchaRequired, ExitRequest
|
||||
from utils import task_wrapper, timestamp, AwaitableValue, OrderedSet, ExponentialBackoff
|
||||
from utils import (
|
||||
CHARS_HEX_LOWER,
|
||||
timestamp,
|
||||
create_nonce,
|
||||
task_wrapper,
|
||||
OrderedSet,
|
||||
AwaitableValue,
|
||||
ExponentialBackoff,
|
||||
)
|
||||
from constants import (
|
||||
BASE_URL,
|
||||
CLIENT_ID,
|
||||
@@ -46,6 +57,201 @@ logger = logging.getLogger("TwitchDrops")
|
||||
gql_logger = logging.getLogger("TwitchDrops.gql")
|
||||
|
||||
|
||||
class _IntegrityToken:
|
||||
def __init__(self, response: JsonType):
|
||||
self.token: str = response["token"]
|
||||
self.expires: datetime = datetime.fromtimestamp(
|
||||
response["expiration"] / 1000, timezone.utc
|
||||
)
|
||||
# verify the integrity token's contents for the "is_bad_bot" flag
|
||||
stripped_token: str = self.token.split('.')[2] + "=="
|
||||
messy_json: str = urlsafe_b64decode(
|
||||
stripped_token.encode()
|
||||
).decode(errors="ignore").replace('\n', '')
|
||||
match = re.search(r'(.+)(?<="}).+$', messy_json)
|
||||
if match is None:
|
||||
raise MinerException("Unable to parse the integrity token")
|
||||
decoded_header: JsonType = json.loads(match.group(1))
|
||||
if decoded_header.get("is_bad_bot", "false") != "false":
|
||||
raise MinerException(
|
||||
"Twitch considers this miner as a \"Bad Bot\". "
|
||||
"Try deleting the cookie file and try again."
|
||||
)
|
||||
|
||||
@property
|
||||
def expired(self) -> bool:
|
||||
return datetime.now(timezone.utc) >= self.expires
|
||||
|
||||
|
||||
class _AuthState:
|
||||
def __init__(self, twitch: Twitch):
|
||||
self._twitch: Twitch = twitch
|
||||
self.user_id: int
|
||||
self.device_id: str
|
||||
self.session_id: str
|
||||
self.access_token: str
|
||||
self.client_version: str
|
||||
self.integrity_token: _IntegrityToken
|
||||
|
||||
async def _login(self) -> str:
|
||||
logger.debug("Login flow started")
|
||||
login_form: LoginForm = self._twitch.gui.login
|
||||
gui_print = self._twitch.gui.print
|
||||
|
||||
payload: JsonType = {
|
||||
"client_id": CLIENT_ID,
|
||||
"undelete_user": False,
|
||||
"remember_me": True,
|
||||
}
|
||||
|
||||
while True:
|
||||
username, password, token = await login_form.ask_login()
|
||||
payload["username"] = username
|
||||
payload["password"] = password
|
||||
# remove stale 2FA tokens, if present
|
||||
payload.pop("authy_token", None)
|
||||
payload.pop("twitchguard_code", None)
|
||||
for attempt in range(2):
|
||||
async with self._twitch.request(
|
||||
"POST", "https://passport.twitch.tv/login", json=payload
|
||||
) as response:
|
||||
login_response: JsonType = await response.json()
|
||||
|
||||
# Feed this back in to avoid running into CAPTCHA if possible
|
||||
if "captcha_proof" in login_response:
|
||||
payload["captcha"] = {"proof": login_response["captcha_proof"]}
|
||||
|
||||
# Error handling
|
||||
if "error_code" in login_response:
|
||||
error_code: int = login_response["error_code"]
|
||||
logger.debug(f"Login error code: {error_code}")
|
||||
if error_code == 1000:
|
||||
# we've failed bois
|
||||
logger.debug("Login failed due to CAPTCHA")
|
||||
raise CaptchaRequired()
|
||||
elif error_code == 3001:
|
||||
# wrong password you dummy
|
||||
logger.debug("Login failed due to incorrect username or password")
|
||||
gui_print("Incorrect username or password.")
|
||||
login_form.clear(password=True)
|
||||
break
|
||||
elif error_code in (
|
||||
3012, # Invalid authy token
|
||||
3023, # Invalid email code
|
||||
):
|
||||
logger.debug("Login failed due to incorrect 2FA code")
|
||||
if error_code == 3023:
|
||||
gui_print("Incorrect email code.")
|
||||
else:
|
||||
gui_print("Incorrect 2FA code.")
|
||||
login_form.clear(token=True)
|
||||
break
|
||||
elif error_code in (
|
||||
3011, # Authy token needed
|
||||
3022, # Email code needed
|
||||
):
|
||||
# 2FA handling
|
||||
logger.debug("2FA token required")
|
||||
email = error_code == 3022
|
||||
if not token:
|
||||
# user didn't provide a token, so ask them for it
|
||||
if email:
|
||||
gui_print("Email code required. Check your email.")
|
||||
else:
|
||||
gui_print("2FA token required.")
|
||||
break
|
||||
if email:
|
||||
payload["twitchguard_code"] = token
|
||||
else:
|
||||
payload["authy_token"] = token
|
||||
continue
|
||||
else:
|
||||
raise LoginException(login_response["error"])
|
||||
# Success handling
|
||||
if "access_token" in login_response:
|
||||
# we're in bois
|
||||
self.access_token = cast(str, login_response["access_token"])
|
||||
logger.debug("Access token granted")
|
||||
login_form.clear()
|
||||
return self.access_token
|
||||
|
||||
def gql_headers(self, *, integrity: bool) -> JsonType:
|
||||
headers = {
|
||||
"Authorization": f"OAuth {self.access_token}",
|
||||
"Client-Id": CLIENT_ID,
|
||||
"Client-Session-Id": self.session_id,
|
||||
"Client-Version": self.client_version,
|
||||
"X-Device-Id": self.device_id,
|
||||
}
|
||||
if integrity:
|
||||
headers["Client-Integrity"] = self.integrity_token.token
|
||||
return headers
|
||||
|
||||
async def verify(self):
|
||||
if not hasattr(self, "session_id"):
|
||||
self.session_id = create_nonce(CHARS_HEX_LOWER, 16)
|
||||
if not (hasattr(self, "client_version") and hasattr(self, "device_id")):
|
||||
async with self._twitch.request("GET", BASE_URL) as response:
|
||||
match = re.search(r'twilightBuildID="([-a-z0-9]+)"', await response.text("utf8"))
|
||||
if match is None:
|
||||
raise MinerException("Unable to extract client_version")
|
||||
self.client_version = match.group(1)
|
||||
# doing the request ends up setting the "unique_id" value in the cookie
|
||||
session = await self._twitch.get_session()
|
||||
jar = cast(aiohttp.CookieJar, session.cookie_jar)
|
||||
cookie = jar.filter_cookies(URL(BASE_URL))
|
||||
self.device_id = cookie["unique_id"].value
|
||||
if not (hasattr(self, "access_token") and hasattr(self, "user_id")):
|
||||
# looks like we're missing something
|
||||
login_form: LoginForm = self._twitch.gui.login
|
||||
logger.debug("Checking login")
|
||||
login_form.update(_("gui", "login", "logging_in"), None)
|
||||
session = await self._twitch.get_session()
|
||||
jar = cast(aiohttp.CookieJar, session.cookie_jar)
|
||||
url = URL(BASE_URL)
|
||||
assert url.host is not None
|
||||
for attempt in range(2):
|
||||
cookie = jar.filter_cookies(url)
|
||||
if "auth-token" not in cookie:
|
||||
self.access_token = await self._login()
|
||||
cookie["auth-token"] = self.access_token
|
||||
elif not hasattr(self, "access_token"):
|
||||
logger.debug("Restoring session from cookie")
|
||||
self.access_token = cookie["auth-token"].value
|
||||
# validate the auth token, by obtaining user_id
|
||||
async with self._twitch.request(
|
||||
"GET",
|
||||
"https://id.twitch.tv/oauth2/validate",
|
||||
headers={"Authorization": f"OAuth {self.access_token}"}
|
||||
) as response:
|
||||
status = response.status
|
||||
if status == 401:
|
||||
# the access token we have is invalid - clear the cookie and reauth
|
||||
logger.debug("Restored session is invalid")
|
||||
jar.clear_domain(url.host)
|
||||
continue
|
||||
elif status == 200:
|
||||
validate_response = await response.json()
|
||||
break
|
||||
else:
|
||||
raise RuntimeError("Login verification failure")
|
||||
self.user_id = int(validate_response["user_id"])
|
||||
cookie["persistent"] = str(self.user_id)
|
||||
self._twitch._is_logged_in.set()
|
||||
logger.debug(f"Login successful, user ID: {self.user_id}")
|
||||
login_form.update(_("gui", "login", "logged_in"), self.user_id)
|
||||
# update our cookie and save it
|
||||
jar.update_cookies(cookie, url)
|
||||
jar.save(COOKIES_PATH)
|
||||
if not hasattr(self, "integrity_token") or self.integrity_token.expired:
|
||||
async with self._twitch.request(
|
||||
"POST",
|
||||
"https://gql.twitch.tv/integrity",
|
||||
headers=self.gql_headers(integrity=False)
|
||||
) as response:
|
||||
self.integrity_token = _IntegrityToken(await response.json())
|
||||
|
||||
|
||||
class Twitch:
|
||||
def __init__(self, settings: Settings):
|
||||
self.settings: Settings = settings
|
||||
@@ -59,8 +265,7 @@ class Twitch:
|
||||
self.gui = GUIManager(self)
|
||||
# Cookies, session and auth
|
||||
self._session: aiohttp.ClientSession | None = None
|
||||
self._access_token: str | None = None
|
||||
self._user_id: int | None = None
|
||||
self._auth_state: _AuthState = _AuthState(self)
|
||||
self._is_logged_in = asyncio.Event()
|
||||
# Storing and watching channels
|
||||
self.channels: OrderedDict[int, Channel] = OrderedDict()
|
||||
@@ -183,17 +388,16 @@ class Twitch:
|
||||
• Changing the stream that's being watched if necessary
|
||||
"""
|
||||
self.gui.start()
|
||||
await self.check_login()
|
||||
auth_state = await self.get_auth()
|
||||
await self.websocket.start()
|
||||
# NOTE: watch task is explicitly restarted on each new run
|
||||
if self._watching_task is not None:
|
||||
self._watching_task.cancel()
|
||||
self._watching_task = asyncio.create_task(self._watch_loop())
|
||||
# Add default topics
|
||||
assert self._user_id is not None
|
||||
self.websocket.add_topics([
|
||||
WebsocketTopic("User", "Drops", self._user_id, self.process_drops),
|
||||
WebsocketTopic("User", "CommunityPoints", self._user_id, self.process_points),
|
||||
WebsocketTopic("User", "Drops", auth_state.user_id, self.process_drops),
|
||||
WebsocketTopic("User", "CommunityPoints", auth_state.user_id, self.process_points),
|
||||
])
|
||||
full_cleanup: bool = False
|
||||
channels: Final[OrderedDict[int, Channel]] = self.channels
|
||||
@@ -726,139 +930,9 @@ class Twitch:
|
||||
await self.claim_points(claim_data["channel_id"], claim_data["id"])
|
||||
self.gui.print(_("status", "claimed_points").format(points=points))
|
||||
|
||||
async def _login(self) -> str:
|
||||
logger.debug("Login flow started")
|
||||
login_form: LoginForm = self.gui.login
|
||||
|
||||
payload: JsonType = {
|
||||
"client_id": CLIENT_ID,
|
||||
"undelete_user": False,
|
||||
"remember_me": True,
|
||||
}
|
||||
|
||||
while True:
|
||||
username, password, token = await login_form.ask_login()
|
||||
payload["username"] = username
|
||||
payload["password"] = password
|
||||
# remove stale 2FA tokens, if present
|
||||
payload.pop("authy_token", None)
|
||||
payload.pop("twitchguard_code", None)
|
||||
for attempt in range(2):
|
||||
async with self.request(
|
||||
"POST", "https://passport.twitch.tv/login", json=payload
|
||||
) as response:
|
||||
login_response: JsonType = await response.json()
|
||||
|
||||
# Feed this back in to avoid running into CAPTCHA if possible
|
||||
if "captcha_proof" in login_response:
|
||||
payload["captcha"] = {"proof": login_response["captcha_proof"]}
|
||||
|
||||
# Error handling
|
||||
if "error_code" in login_response:
|
||||
error_code: int = login_response["error_code"]
|
||||
logger.debug(f"Login error code: {error_code}")
|
||||
if error_code == 1000:
|
||||
# we've failed bois
|
||||
logger.debug("Login failed due to CAPTCHA")
|
||||
raise CaptchaRequired()
|
||||
elif error_code == 3001:
|
||||
# wrong password you dummy
|
||||
logger.debug("Login failed due to incorrect username or password")
|
||||
self.gui.print("Incorrect username or password.")
|
||||
login_form.clear(password=True)
|
||||
break
|
||||
elif error_code in (
|
||||
3012, # Invalid authy token
|
||||
3023, # Invalid email code
|
||||
):
|
||||
logger.debug("Login failed due to incorrect 2FA code")
|
||||
if error_code == 3023:
|
||||
self.gui.print("Incorrect email code.")
|
||||
else:
|
||||
self.gui.print("Incorrect 2FA code.")
|
||||
login_form.clear(token=True)
|
||||
break
|
||||
elif error_code in (
|
||||
3011, # Authy token needed
|
||||
3022, # Email code needed
|
||||
):
|
||||
# 2FA handling
|
||||
logger.debug("2FA token required")
|
||||
email = error_code == 3022
|
||||
if not token:
|
||||
# user didn't provide a token, so ask them for it
|
||||
if email:
|
||||
self.gui.print("Email code required. Check your email.")
|
||||
else:
|
||||
self.gui.print("2FA token required.")
|
||||
break
|
||||
if email:
|
||||
payload["twitchguard_code"] = token
|
||||
else:
|
||||
payload["authy_token"] = token
|
||||
continue
|
||||
else:
|
||||
raise LoginException(login_response["error"])
|
||||
# Success handling
|
||||
if "access_token" in login_response:
|
||||
# we're in bois
|
||||
self._access_token = cast(str, login_response["access_token"])
|
||||
logger.debug("Access token granted")
|
||||
login_form.clear()
|
||||
return self._access_token
|
||||
|
||||
async def check_login(self) -> tuple[str, int]:
|
||||
if self._access_token is not None and self._user_id is not None:
|
||||
# we're all good
|
||||
return (self._access_token, self._user_id)
|
||||
# looks like we're missing something
|
||||
login_form: LoginForm = self.gui.login
|
||||
logger.debug("Checking login")
|
||||
login_form.update(_("gui", "login", "logging_in"), None)
|
||||
# NOTE: We need this here because of the jar being accessed
|
||||
session = await self.get_session()
|
||||
url = URL(BASE_URL)
|
||||
assert url.host is not None
|
||||
jar = cast(aiohttp.CookieJar, session.cookie_jar)
|
||||
for attempt in range(2):
|
||||
cookie = jar.filter_cookies(url)
|
||||
if not cookie:
|
||||
# no cookie - login
|
||||
access_token: str = await self._login()
|
||||
# store the auth token inside the cookie
|
||||
cookie["auth-token"] = access_token
|
||||
elif self._access_token is None:
|
||||
# have cookie - get our access token
|
||||
access_token = cookie["auth-token"].value
|
||||
logger.debug("Restoring session from cookie")
|
||||
# store the auth token within the application
|
||||
self._access_token = access_token
|
||||
# validate the auth token, by obtaining user_id
|
||||
async with self.request(
|
||||
"GET",
|
||||
"https://id.twitch.tv/oauth2/validate",
|
||||
headers={"Authorization": f"OAuth {self._access_token}"}
|
||||
) as response:
|
||||
status = response.status
|
||||
if status == 401:
|
||||
# the access token we have is invalid - clear the cookie and reauth
|
||||
logger.debug("Restored session is invalid")
|
||||
jar.clear_domain(url.host)
|
||||
continue
|
||||
elif status == 200:
|
||||
validate_response = await response.json()
|
||||
break
|
||||
else:
|
||||
raise RuntimeError("Login verification failure")
|
||||
self._user_id = int(validate_response["user_id"])
|
||||
cookie["persistent"] = str(self._user_id)
|
||||
self._is_logged_in.set()
|
||||
logger.debug(f"Login successful, user ID: {self._user_id}")
|
||||
login_form.update(_("gui", "login", "logged_in"), self._user_id)
|
||||
# update our cookie and save it
|
||||
jar.update_cookies(cookie, url)
|
||||
jar.save(COOKIES_PATH)
|
||||
return (self._access_token, self._user_id)
|
||||
async def get_auth(self) -> _AuthState:
|
||||
await self._auth_state.verify()
|
||||
return self._auth_state
|
||||
|
||||
@asynccontextmanager
|
||||
async def request(
|
||||
@@ -904,15 +978,12 @@ class Twitch:
|
||||
|
||||
async def gql_request(self, op: GQLOperation) -> JsonType:
|
||||
gql_logger.debug(f"GQL Request: {op}")
|
||||
access_token, user_id = await self.check_login()
|
||||
auth_state = await self.get_auth()
|
||||
async with self.request(
|
||||
"POST",
|
||||
"https://gql.twitch.tv/gql",
|
||||
json=op,
|
||||
headers={
|
||||
"Authorization": f"OAuth {access_token}",
|
||||
"Client-Id": CLIENT_ID,
|
||||
}
|
||||
headers=auth_state.gql_headers(integrity=True)
|
||||
) as response:
|
||||
response_json: JsonType = await response.json()
|
||||
gql_logger.debug(f"GQL Response: {response_json}")
|
||||
@@ -926,9 +997,10 @@ class Twitch:
|
||||
available_data: JsonType,
|
||||
claimed_benefits: dict[str, datetime],
|
||||
) -> DropsCampaign:
|
||||
auth_state = await self.get_auth()
|
||||
response = await self.gql_request(
|
||||
GQL_OPERATIONS["CampaignDetails"].with_variables(
|
||||
{"channelLogin": str(self._user_id), "dropID": campaign_id}
|
||||
{"channelLogin": str(auth_state.user_id), "dropID": campaign_id}
|
||||
)
|
||||
)
|
||||
campaign_data: JsonType = response["data"]["user"]["dropCampaign"]
|
||||
|
||||
8
utils.py
8
utils.py
@@ -57,11 +57,13 @@ def timestamp(string: str) -> datetime:
|
||||
return datetime.strptime(string, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
|
||||
|
||||
|
||||
NONCE_CHARS = string.ascii_letters + string.digits
|
||||
CHARS_ASCII = string.ascii_letters + string.digits
|
||||
CHARS_HEX_LOWER = string.digits + "abcdef"
|
||||
CHARS_HEX_UPPER = string.digits + "ABCDEF"
|
||||
|
||||
|
||||
def create_nonce(length: int = 30) -> str:
|
||||
return ''.join(random.choices(NONCE_CHARS, k=length))
|
||||
def create_nonce(chars: str, length: int) -> str:
|
||||
return ''.join(random.choices(chars, k=length))
|
||||
|
||||
|
||||
def deduplicate(iterable: abc.Iterable[_T]) -> list[_T]:
|
||||
|
||||
16
websocket.py
16
websocket.py
@@ -13,7 +13,13 @@ from translate import _
|
||||
from exceptions import MinerException, WebsocketClosed
|
||||
from constants import PING_INTERVAL, PING_TIMEOUT, MAX_WEBSOCKETS, WS_TOPICS_LIMIT
|
||||
from utils import (
|
||||
task_wrapper, create_nonce, json_minify, format_traceback, AwaitableValue, ExponentialBackoff
|
||||
CHARS_ASCII,
|
||||
task_wrapper,
|
||||
create_nonce,
|
||||
json_minify,
|
||||
format_traceback,
|
||||
AwaitableValue,
|
||||
ExponentialBackoff,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -200,7 +206,7 @@ class Websocket:
|
||||
# nothing to do
|
||||
return
|
||||
self._topics_changed.clear()
|
||||
access_token, user_id = await self._twitch.check_login()
|
||||
auth_state = await self._twitch.get_auth()
|
||||
current: set[WebsocketTopic] = set(self.topics.values())
|
||||
# handle removed topics
|
||||
removed = self._submitted.difference(current)
|
||||
@@ -212,7 +218,7 @@ class Websocket:
|
||||
"type": "UNLISTEN",
|
||||
"data": {
|
||||
"topics": topics_list,
|
||||
"auth_token": access_token,
|
||||
"auth_token": auth_state.access_token,
|
||||
}
|
||||
}
|
||||
)
|
||||
@@ -227,7 +233,7 @@ class Websocket:
|
||||
"type": "LISTEN",
|
||||
"data": {
|
||||
"topics": topics_list,
|
||||
"auth_token": access_token,
|
||||
"auth_token": auth_state.access_token,
|
||||
}
|
||||
}
|
||||
)
|
||||
@@ -316,7 +322,7 @@ class Websocket:
|
||||
ws = self._ws.get_with_default(None)
|
||||
assert ws is not None
|
||||
if message["type"] != "PING":
|
||||
message["nonce"] = create_nonce()
|
||||
message["nonce"] = create_nonce(CHARS_ASCII, 30)
|
||||
await ws.send_json(message, dumps=json_minify)
|
||||
ws_logger.debug(f"Websocket[{self._idx}] sent: {message}")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user