commit fac2788e4b4d9e55c6eb9e99f4220c6b83790caf Author: DevilXD Date: Fri Dec 3 17:48:51 2021 +0100 First release diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c757a33 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +__pycache__ +.mypy_cache +/build +/dist +/cookies.pickle +/settings.json +/*.spec diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..9dbd803 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,23 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Run Current File", + "type": "python", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal" + }, + { + "name": "Run App", + "type": "python", + "request": "launch", + "program": "main.py", + "console": "integratedTerminal", + "justMyCode": false + }, + ] +} \ No newline at end of file diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..768010e --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,20 @@ +{ + // See https://go.microsoft.com/fwlink/?LinkId=733558 + // for the documentation about the tasks.json format + "version": "2.0.0", + "tasks": [ + { + "label": "Build App", + "type": "shell", + "command": "build.bat", + "problemMatcher": [], + "group": { + "kind": "build", + "isDefault": true + }, + "presentation": { + "showReuseMessage": false + } + } + ] +} diff --git a/build.bat b/build.bat new file mode 100644 index 0000000..dac7f1e --- /dev/null +++ b/build.bat @@ -0,0 +1 @@ +pyinstaller -F -n "Twitch Drops Miner (by DevilXD)" -i pickaxe.ico main.py diff --git a/channel.py b/channel.py new file mode 100644 index 0000000..d1e5eed --- /dev/null +++ b/channel.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +import re +import json +import logging +from copy import copy +from base64 import b64encode +from datetime import datetime, timezone +from typing import Any, Optional, Dict, TYPE_CHECKING + +from inventory import Game +from exceptions import MinerException +from constants import BASE_URL, GQL_OPERATIONS + +if TYPE_CHECKING: + from twitch import Twitch + + +logger = logging.getLogger("TwitchDrops") + + +class Stream: + def __init__(self, channel: Channel, data: Dict[str, Any]): + self._twitch = channel._twitch + self.channel = channel + stream = data["stream"] + self.broadcast_id = int(stream["id"]) + self.viewer_count = stream["viewersCount"] + self.drops_enabled = any(tag["localizedName"] == "Drops Enabled" for tag in stream["tags"]) + settings = data["broadcastSettings"] + self.game: Game = Game(settings["game"]) + self.title = settings["title"] + self._timestamp = datetime.now(timezone.utc) + + +class Channel: + async def __new__(cls, *args, **kwargs): + """ + Enables __init__ to be async. + The instance is returned after initialization completes. + """ + self = super().__new__(cls) + await self.__init__(*args, **kwargs) + return self + + async def __init__(self, twitch: Twitch, channel_name: str): # type: ignore + self._twitch: Twitch = twitch + self.id: int = 0 # temp, to be filled by get_stream + self.name: str = channel_name + self.url: str = f"{BASE_URL}/{channel_name}" + self._spade_url: str = await self.get_spade_url() + self.stream: Optional[Stream] = None + await self.get_stream() + + @property + def online(self) -> bool: + """ + Returns True if the streamer is online and is currently streaming, False otherwise. + """ + return self.stream is not None + + async def get_spade_url(self) -> str: + """ + To get this monstrous thing, you have to walk a chain of requests. + Streamer page (HTML) --parse-> Streamer Settings (JavaScript) --parse-> Spade URL + """ + async with self._twitch._session.get(self.url) as response: + streamer_html = await response.text(encoding="utf8") + match = re.search( + r'src="(https://static\.twitchcdn\.net/config/settings\.[0-9a-f]{32}\.js)"', + streamer_html, + re.I, + ) + if not match: + raise MinerException("Error while spade_url extraction: step #1") + streamer_settings = match.group(1) + async with self._twitch._session.get(streamer_settings) as response: + settings_js = await response.text(encoding="utf8") + match = re.search( + r'"spade_url": ?"(https://video-edge-[.\w\-/]+\.ts)"', settings_js, re.I + ) + if not match: + raise MinerException("Error while spade_url extraction: step #2") + return match.group(1) + + async def get_stream(self) -> Optional[Stream]: + op = copy(GQL_OPERATIONS["GetStreamInfo"].with_variables({"channel": self.name})) + response = await self._twitch.gql_request(op) + if response: + stream_data = response["data"]["user"] + self.id = int(stream_data["id"]) # fill channel_id + if stream_data["stream"]: + self.stream = Stream(self, stream_data) + else: + self.stream = None + return self.stream + + async def check_online(self) -> bool: + stream = await self.get_stream() + if stream is None: + return False + return True + + def set_offline(self): + # to be called externally, if we receive an event about this happening + self.stream = None + + def _encode_payload(self): + assert self.stream is not None + assert self._twitch._user_id is not None + payload = [ + { + "event": "minute-watched", + "properties": { + "channel_id": self.id, + "broadcast_id": self.stream.broadcast_id, + "player": "site", + "user_id": self._twitch._user_id, + } + } + ] + json_event = json.dumps(payload, separators=(",", ":")) + return {"data": (b64encode(json_event.encode("utf8"))).decode("utf8")} + + async def _send_watch(self): + """ + This uses the encoded payload on spade url to simulate watching the stream. + Optimally, send every 60 seconds to advance drops. + """ + if not self.online: + return + logger.debug(f"Sending minute-watched to {self.name}") + async with self._twitch._session.post( + self._spade_url, data=self._encode_payload() + ) as response: + return response.status == 204 diff --git a/constants.py b/constants.py new file mode 100644 index 0000000..98f9d43 --- /dev/null +++ b/constants.py @@ -0,0 +1,138 @@ +from __future__ import annotations + +from copy import copy +from datetime import timedelta +from typing import Any, Optional, Union, Dict, Callable + + +BASE_URL = "https://twitch.tv" +WEBSOCKET_URL = "wss://pubsub-edge.twitch.tv/v1" +GQL_URL = "https://gql.twitch.tv/gql" +CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko" +USER_AGENT = "Twitch Drops App" +SETTINGS_PATH = "settings.json" +COOKIES_PATH = "cookies.pickle" +PING_INTERVAL = timedelta(minutes=3) + + +class GQLOperation(Dict[str, Any]): + def __init__(self, name: str, sha256: str, *, variables: Optional[Dict[str, Any]] = None): + super().__init__( + operationName=name, + extensions={ + "persistedQuery": { + "version": 1, + "sha256Hash": sha256, + } + } + ) + if variables is not None: + super().__setitem__("variables", variables) + + def with_variables(self, variables: Dict[str, Any]): + modified = copy(self) + modified["variables"] = variables + return modified + + +GQL_OPERATIONS: Dict[str, GQLOperation] = { + "IsStreamLive": GQLOperation( + "WithIsStreamLiveQuery", + "04e46329a6786ff3a81c01c50bfa5d725902507a0deb83b0edbf7abe7a3716ea", + ), + "GetStreamInfo": GQLOperation( # used + "VideoPlayerStreamInfoOverlayChannel", + "a5f2e34d626a9f4f5c0204f910bab2194948a9502089be558bb6e779a9e1b3d2", + ), + "ClaimCommunityPoints": GQLOperation( # used + "ClaimCommunityPoints", + "46aaeebe02c99afdf4fc97c7c0cba964124bf6b0af229395f1f6d1feed05b3d0", + ), + "ClaimDrop": GQLOperation( # used + "DropsPage_ClaimDropRewards", + "2f884fa187b8fadb2a49db0adc033e636f7b6aaee6e76de1e2bba9a7baf0daf6", + ), + "ChannelPointsContext": GQLOperation( # used + "ChannelPointsContext", + "9988086babc615a918a1e9a722ff41d98847acac822645209ac7379eecb27152", + ), + "ModViewChannelQuery": GQLOperation( + "ModViewChannelQuery", + "df5d55b6401389afb12d3017c9b2cf1237164220c8ef4ed754eae8188068a807", + ), + "Inventory": GQLOperation( # used + "Inventory", + "e0765ebaa8e8eeb4043cc6dfeab3eac7f682ef5f724b81367e6e55c7aef2be4c", + ), + "ViewerDropsDashboard": GQLOperation( + "ViewerDropsDashboard", + "c4d61d7b71d03b324914d3cf8ca0bc23fe25dacf54120cc954321b9704a3f4e2", + ), + "DropCampaignDetails": GQLOperation( + "DropCampaignDetails", + "14b5e8a50777165cfc3971e1d93b4758613fe1c817d5542c398dce70b7a45c05", + ), + "AvailableDrops": GQLOperation( + "DropsHighlightService_AvailableDrops", + "b19ee96a0e79e3f8281c4108bc4c7b3f232266db6f96fd04a339ab393673a075", + ), + # Use for replace https://api.twitch.tv/helix/users?login={self.username} + "ReportMenuItem": GQLOperation( + "ReportMenuItem", + "8f3628981255345ca5e5453dfd844efffb01d6413a9931498836e6268692a30c", + ), + "PersonalSections": GQLOperation( + "PersonalSections", + "9fbdfb00156f754c26bde81eb47436dee146655c92682328457037da1a48ed39", + variables={ + "input": { + "sectionInputs": ["FOLLOWED_SECTION"], + "recommendationContext": {"platform": "web"}, + }, + "channelLogin": None, + "withChannelUser": False, + "creatorAnniversariesExperimentEnabled": False, + }, + ), +} + + +def get_topic( + topic_name: str, target_id: Union[str, int], process: Callable[[Dict[str, Any]], Any] +) -> WebsocketTopic: + return WebsocketTopic(f"{WEBSOCKET_TOPICS[topic_name]}.{target_id}", process) + + +class WebsocketTopic: + def __init__(self, topic_id: str, process: Callable[[Dict[str, Any]], Any]): + self.id: str = topic_id + self.process: Callable[[Dict[str, Any]], Any] = process + + def __str__(self) -> str: + return self.id + + def __eq__(self, other): + if isinstance(other, str): + return self.id == other + elif isinstance(other, WebsocketTopic): + return self.id == other.id + return NotImplemented + + def __hash__(self) -> int: + return hash(self.id) + + +WEBSOCKET_TOPICS: Dict[str, str] = { + # Using user_id + "UserDrops": "user-drop-events", + "UserStreamState": "stream-change-v1", + "UserCommunityPoints": "community-points-user-v1", + "Presence": "presence", + "Notifications": "onsite-notifications", + # Using channel_id + "ChannelDrops": "channel-drop-events", + "ChannelStreamState": "stream-change-by-channel", + "ChannelCommunityPoints": "community-points-channel-v1", + "VideoPlayback": "video-playback-by-id", + "StreamUpdate": "broadcast-settings-update", +} diff --git a/exceptions.py b/exceptions.py new file mode 100644 index 0000000..25d6079 --- /dev/null +++ b/exceptions.py @@ -0,0 +1,32 @@ +class MinerException(Exception): + def __init__(self, *args: object): + if args: + super().__init__(*args) + else: + super().__init__("Unknown miner error") + + +class RequestException(MinerException): + def __init__(self, *args: object): + if args: + super().__init__(*args) + else: + super().__init__("Unknown error during request") + + +class LoginException(RequestException): + def __init__(self, *args: object): + if args: + super().__init__(*args) + else: + super().__init__("Unknown error during login") + + +class CaptchaRequired(LoginException): + def __init__(self): + super().__init__("Captcha is required") + + +class IncorrectCredentials(LoginException): + def __init__(self): + super().__init__("Incorrect username or password") diff --git a/inventory.py b/inventory.py new file mode 100644 index 0000000..1497956 --- /dev/null +++ b/inventory.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any, Optional, List, Dict, TYPE_CHECKING + +from constants import GQL_OPERATIONS + +if TYPE_CHECKING: + from twitch import Twitch + + +async def claim_drop(twitch: Twitch, claim_id: str) -> bool: + op = GQL_OPERATIONS["ClaimDrop"].with_variables( + {"input": {"dropInstanceID": claim_id}} + ) + response = await twitch.gql_request(op) + data = response["data"] + if "errors" in data and data["errors"]: + return False + elif "claimDropRewards" in data: + if not data["claimDropRewards"]: + return False + elif ( + data["claimDropRewards"]["status"] + in ["ELIGIBLE_FOR_ALL", "DROP_INSTANCE_ALREADY_CLAIMED"] + ): + return True + return False + + +class Game: + def __init__(self, data: Dict[str, Any]): + self.id: int = int(data["id"]) + self.name: str = data["name"] + + def __eq__(self, other: object): + if isinstance(other, self.__class__): + return self.id == other.id + return NotImplemented + + def __hash__(self) -> int: + return self.id + + +class BaseDrop: + def __init__(self, campaign: DropsCampaign, data: Dict[str, Any]): + self._twitch: Twitch = campaign._twitch + self.id = data["id"] + self.name: str = data["name"] + self.campaign: DropsCampaign = campaign + self.rewards: List[str] = [b["benefit"]["name"] for b in data["benefitEdges"]] + self.starts_at: datetime = datetime.strptime(data["startAt"], "%Y-%m-%dT%H:%M:%SZ") + self.ends_at: datetime = datetime.strptime(data["endAt"], "%Y-%m-%dT%H:%M:%SZ") + # If claim_id is not None, we can use it to claim the drop + self.claim_id: Optional[str] = data["self"]["dropInstanceID"] + self.is_claimed: bool = data["self"]["isClaimed"] + self._preconditions: bool = data["self"]["hasPreconditionsMet"] + + @property + def can_earn(self) -> bool: + return ( + self._preconditions # preconditions are met + and self.campaign.active # campaign is active + and not self.is_claimed # drop isn't already claimed + ) + + @property + def can_claim(self) -> bool: + return self.claim_id is not None and not self.is_claimed + + async def claim(self) -> bool: + """ + Returns True if the claim succeeded, False otherwise. + """ + if not self.can_claim: + return False + assert self.claim_id is not None + self.is_claimed = await claim_drop(self._twitch, self.claim_id) + return self.is_claimed + + +class TimedDrop(BaseDrop): + def __init__(self, campaign: DropsCampaign, data: Dict[str, Any]): + super().__init__(campaign, data) + self.current_minutes: int = data["self"]["currentMinutesWatched"] + self.required_minutes: int = data["requiredMinutesWatched"] + if self.is_claimed: + # correct minutes for claimed drops + self.current_minutes = self.required_minutes + + @property + def remaining_minutes(self): + return self.required_minutes - self.current_minutes + + @property + def progress(self): + return self.current_minutes / self.required_minutes + + def update(self, message: Dict[str, Any]): + # {"type": "drop-progress", data: {"current_progress_min": 3, "required_progress_min": 10}} + # {"type": "drop-claim", data: {"drop_instance_id": ...}} + msg_type = message["type"] + if msg_type == "drop-progress": + self.current_minutes = message["data"]["current_progress_min"] + self.required_minutes = message["data"]["required_progress_min"] + elif msg_type == "drop-claim": + self.claim_id = message["data"]["drop_instance_id"] + + +class DropsCampaign: + def __init__(self, twitch: Twitch, data: Dict[str, Any]): + self._twitch: Twitch = twitch + self.id: str = data["id"] + self.name: str = data["name"] + self.game: Game = Game(data["game"]) + self.starts_at: datetime = datetime.strptime(data["startAt"], "%Y-%m-%dT%H:%M:%SZ") + self.ends_at: datetime = datetime.strptime(data["endAt"], "%Y-%m-%dT%H:%M:%SZ") + self.status: str = data["status"] + self.timed_drops: Dict[str, TimedDrop] = { + d["id"]: TimedDrop(self, d) for d in data["timeBasedDrops"] + } + + @property + def active(self): + return self.status == "ACTIVE" + + def get_drop(self, drop_id: str) -> Optional[TimedDrop]: + return self.timed_drops.get(drop_id) diff --git a/main.py b/main.py new file mode 100644 index 0000000..46de812 --- /dev/null +++ b/main.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +__version__ = 1 + +import os +import sys +import json +import ctypes +import logging +import asyncio +import traceback +import threading +from typing import Any, Dict, NoReturn + +from twitch import Twitch +from constants import SETTINGS_PATH + +try: + import win32api + from win32con import CTRL_CLOSE_EVENT, CTRL_LOGOFF_EVENT, CTRL_SHUTDOWN_EVENT +except (ImportError, ModuleNotFoundError): + raise ImportError("You have to run 'python -m pip install pywin32' first") + + +# nice console title +try: + ctypes.windll.kernel32.SetConsoleTitleW(f"Twitch Drops Miner v{__version__} (by DevilXD)") +except AttributeError: + # ensure we're on windows and there was no import problems + print("Only Windows supported!") + quit() +assert sys.platform == "win32" + + +def terminate() -> NoReturn: + forever = threading.Event() + print("\nApplication Terminated.\nClose the console window to exit the application.") + forever.wait() + raise RuntimeError("Uh oh") # this will never run, solely for MyPy + + +# handle extra stackable '-v' parameter, that switches the logging level +logging_level = logging.ERROR +if len(sys.argv) > 1: + arg = sys.argv[1] + if arg == ("-v", "-v1"): + logging_level = logging.WARNING + elif arg in ("-vv", "-v2"): + logging_level = logging.INFO + elif arg in ("-vvv", "-v3"): + logging_level = logging.DEBUG +# handle logging stuff +logger = logging.getLogger("TwitchDrops") +handler = logging.StreamHandler(sys.stdout) +handler.setFormatter(logging.Formatter("{levelname}: {message}", style='{')) +logger.addHandler(handler) +logger.setLevel(logging_level) +# handle settings +if not os.path.isfile(SETTINGS_PATH): + default = { + "username": "YourTwitchUsername", + "channels": ["Channel1", "Channel2", "Channel3"], + } + with open(SETTINGS_PATH, 'w', encoding="utf8") as file: + json.dump(default, file, indent=4) + print( + f"File '{SETTINGS_PATH}' created.\n" + "Please modify default settings as necessary, then relaunch the application." + ) + terminate() +with open(SETTINGS_PATH, 'r', encoding="utf8") as file: + settings: Dict[str, Any] = json.load(file) +required_fields = ["channels"] +for field_name in required_fields: + if field_name not in settings: + print(f"Field '{field_name}' is a required field in '{SETTINGS_PATH}'") + terminate() +# asyncio loop +loop = asyncio.get_event_loop() +# client init +client = Twitch(settings.get("username"), settings.get("password")) +# main task and it's close event +main_task = loop.create_task(client.run(settings["channels"])) +close_event = threading.Event() + + +def clean_exit(code: int): + if code not in (CTRL_CLOSE_EVENT, CTRL_LOGOFF_EVENT, CTRL_SHUTDOWN_EVENT): + # filter only events we want + return False + # cancel the main task - this triggers the cleanup + main_task.cancel() + # wait until cleanup completes + close_event.wait() + # tell OS that we're free to exit now + return True + + +# ensures clean exit upon closing the console +win32api.SetConsoleCtrlHandler(clean_exit, True) +try: + loop.run_until_complete(main_task) +except (asyncio.CancelledError, KeyboardInterrupt): + # KeyboardInterrupt causes run_until_complete to exit, but without cancelling the task. + # The loop stops and thus the task gets frozen, until the loop runs again. + # Because we don't want anything from there to actually run during cleanup, + # we need to explicitly cancel the task ourselves here. + main_task.cancel() + # cancel all other tasks + for task in asyncio.all_tasks(loop): + if not task.done(): + task.cancel() + # main_task was cancelled due to program shutting down - do the cleanup + loop.run_until_complete(client.close()) + # notify we're free to exit + close_event.set() +except Exception: + print("Fatal error encountered:\n") + traceback.print_exc() + terminate() +finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() diff --git a/pickaxe.ico b/pickaxe.ico new file mode 100644 index 0000000..a6ecd18 Binary files /dev/null and b/pickaxe.ico differ diff --git a/twitch.py b/twitch.py new file mode 100644 index 0000000..620c4fa --- /dev/null +++ b/twitch.py @@ -0,0 +1,340 @@ +from __future__ import annotations + +import os +import asyncio +import logging +from yarl import URL +from getpass import getpass +from functools import partial +from typing import Any, Optional, Union, List, Dict, cast + +try: + import aiohttp +except ImportError: + raise ImportError("You have to run 'python -m pip install aiohttp' first") + +from channel import Channel +from websocket import Websocket +from inventory import DropsCampaign +from exceptions import LoginException, CaptchaRequired, IncorrectCredentials +from constants import ( + CLIENT_ID, + USER_AGENT, + COOKIES_PATH, + GQL_URL, + GQL_OPERATIONS, + GQLOperation, + WebsocketTopic, + get_topic, +) + + +logger = logging.getLogger("TwitchDrops") + + +class Twitch: + def __init__( + self, + username: Optional[str] = None, + password: Optional[str] = None, + *, + loop: Optional[asyncio.AbstractEventLoop] = None, + ): + self.username: Optional[str] = username + self.password: Optional[str] = password + # Cookies, session and auth + cookie_jar = aiohttp.CookieJar() + if os.path.isfile(COOKIES_PATH): + cookie_jar.load(COOKIES_PATH) + self._session = aiohttp.ClientSession( + cookie_jar=cookie_jar, headers={"User-Agent": USER_AGENT}, loop=loop + ) + self._access_token: Optional[str] = None + self._user_id: Optional[int] = None + self._is_logged_in = asyncio.Event() + # Websocket + self.websocket = Websocket(self) + # Storing, watching and changing channels + self.channels: Dict[int, Channel] = {} + self._watching_channel: Optional[Channel] = None + self._watching_task: Optional[asyncio.Task[Any]] = None + self._channel_change = asyncio.Event() + # Inventory + self.inventory: List[DropsCampaign] = [] + + def wait_until_login(self): + return self._is_logged_in.wait() + + async def close(self): + print("Exiting...") + self._session.cookie_jar.save(COOKIES_PATH) # type: ignore + self.stop_watching() + await self._session.close() + await self.websocket.close() + await asyncio.sleep(1) # allows aiohttp to safely close the session + + @property + def currently_watching(self) -> Optional[Channel]: + return self._watching_channel + + async def run(self, channels: List[str] = []): + """ + Main method that runs the whole client. + + Here, we manage several things, specifically: + • Fetching the drops inventory to make sure that everything we can claim, is claimed + • Selecting a stream to watch, and watching it + • Changing the stream that's being watched if necessary + """ + # Start our websocket connection - shouldn't require task tracking + asyncio.create_task(self.websocket.connect()) + # Claim the drops we can + self.inventory = await self.get_inventory() + games = set() + for campaign in self.inventory: + if campaign.status == "UPCOMING": + # we have no use in processing upcoming campaigns here + continue + for drop in campaign.timed_drops.values(): + if drop.can_earn: + games.add(campaign.game) + if drop.can_claim: + await drop.claim() + # Fetch information about all channels we're supposed to handle + for channel_name in channels: + channel: Channel = await Channel(self, channel_name) # type: ignore + self.channels[channel.id] = channel + # Sub to these channel updates + topics: List[WebsocketTopic] = [] + for channel_id in self.channels: + topics.append( + get_topic( + "VideoPlayback", channel_id, partial(self.process_stream_state, channel_id) + ) + ) + await self.websocket.add_topics(topics) + + # Repeat: Change into a channel we can watch, then reset the flag + self._channel_change.set() + refresh_channels = False # we're entering having fresh channel data already + while True: + # wait for the change channel signal + await self._channel_change.wait() + for channel in self.channels.values(): + if ( + channel.stream is not None # steam online + and channel.stream.drops_enabled # drops are enabled + and channel.stream.game in games # streams a game we can earn drops in + ): + self.watch(channel) + refresh_channels = True + self._channel_change.clear() + break + else: + # there's no available channel to watch + if refresh_channels: + # refresh the status of all channels, + # to make sure that our websocket didn't miss anything til this point + print("No suitable channel to watch, refreshing...") + for channel in self.channels.values(): + await channel.get_stream() + await asyncio.sleep(0.5) + refresh_channels = False + continue + print("No suitable channel to watch, retrying in 120 seconds") + await asyncio.sleep(120) + + def watch(self, channel: Channel): + if self._watching_task is not None: + self._watching_task.cancel() + + async def watcher(channel: Channel): + op = GQL_OPERATIONS["ChannelPointsContext"].with_variables( + {"channelLogin": channel.name} + ) + i = 0 + while True: + await channel._send_watch() + if i == 0: + # ensure every 30 minutes that we don't have unclaimed points bonus + response = await self.gql_request(op) + channel_data: Dict[str, Any] = response["data"]["community"]["channel"] + claim_available: Dict[str, Any] = ( + channel_data["self"]["communityPoints"]["availableClaim"] + ) + if claim_available: + await self.claim_points(channel_data["id"], claim_available["id"]) + logger.info("Claimed bonus points") + i = (i + 1) % 30 + await asyncio.sleep(59) + + print(f"Watching: {channel.name}") + self._watching_channel = channel + self._watching_task = asyncio.create_task(watcher(channel)) + + def stop_watching(self): + if self._watching_task is not None: + logger.warning("Watching stopped.") + self._watching_task.cancel() + self._watching_task = None + self._watching_channel = None + + async def process_stream_state(self, channel_id: int, message: Dict[str, Any]): + msg_type = message["type"] + channel = self.channels.get(channel_id) + if channel is None: + logger.error(f"Stream state change for a non-existing channel: {channel_id}") + return + if msg_type == "stream-down": + logger.info(f"{channel.name} goes OFFLINE") + channel.set_offline() + if self._watching_channel is not None and self._watching_channel.id == channel_id: + # change the channel if we're currently watching it + self._channel_change.set() + elif msg_type == "stream-up": + logger.info(f"{channel.name} goes ONLINE") + + # stream_up is sent before the stream actually goes online, so just wait a bit + # and check if it's actually online by then + async def online_delay(channel: Channel): + await asyncio.sleep(10) + await channel.check_online() + + asyncio.create_task(online_delay(channel)) + elif msg_type == "viewcount": + if channel.stream is None: + # check if we've got a view count for a stream that just started + await channel.check_online() + if channel.stream is not None: + viewers = message["viewers"] + channel.stream.viewer_count = viewers + logger.info(f"{channel.name} viewers: {viewers}") + else: + logger.error(f"Channel viewcount update for an offline stream: {channel.name}") + + async def _login(self) -> str: + logger.debug("Login flow started") + if self.username is None: + self.username = input("Username: ") + if self.password is None: + self.password = getpass() + if not self.password: + # catch early empty pass + raise IncorrectCredentials() + + payload: Dict[str, Any] = { + "username": self.username, + "password": self.password, + "client_id": CLIENT_ID, + "undelete_user": False, + "remember_me": True, + } + + for attempt in range(10): + async with self._session.post( + "https://passport.twitch.tv/login", json=payload + ) as response: + login_response = await response.json() + + # Feed this back in to avoid running into CAPTCHA if possible + if "captcha_proof" in login_response: + payload["captcha"] = {"proof": login_response["captcha_proof"]} + + # Error handling + if "error_code" in login_response: + error_code = login_response["error_code"] + logger.debug(f"Login error code: {error_code}") + if error_code == 1000: + # we've failed bois + logger.debug("Login failed due to CAPTCHA") + raise CaptchaRequired() + elif error_code == 3001: + # wrong password you dummy + logger.debug("Login failed due to incorrect login or pass") + print(f"Incorrect username or password.\nUsername: {self.username}") + self.password = getpass() + if not self.password: + raise IncorrectCredentials() + elif error_code in ( + 3011, # Authy token needed + 3012, # Invalid authy token + 3022, # Email code needed + 3023, # Invalid email code + ): + # 2FA handling + email = error_code in (3022, 3023) + logger.debug("2FA token required") + token = input("2FA token: ") + if email: + # email code + payload["twitchguard_code"] = token + else: + # authy token + payload["authy_token"] = token + continue + else: + raise LoginException(login_response["error"]) + + # Success handling + if "access_token" in login_response: + # we're in bois + self._access_token = login_response["access_token"] + logger.debug(f"Access token: {self._access_token}") + break + + if self._access_token is None: + # this means we've ran out of retries + raise LoginException("Ran out of login retries") + return self._access_token + + async def check_login(self) -> None: + if self._access_token is not None and self._user_id is not None: + # we're all good + return + # looks like we're missing something + print("Logging in") + jar = self._session.cookie_jar + cookie = jar.filter_cookies("https://twitch.tv") # type: ignore + if not cookie: + # no cookie - login + await self._login() + # store our auth token inside the cookie + cookie["auth-token"] = cast(str, self._access_token) + elif self._access_token is None: + # have cookie - get our access token + self._access_token = cookie["auth-token"].value + logger.debug("Session restored from cookie") + # validate our access token, by obtaining user_id + async with self._session.get( + "https://id.twitch.tv/oauth2/validate", + headers={"Authorization": f"OAuth {self._access_token}"} + ) as response: + validate_response = await response.json() + self._user_id = cookie["persistent"] = validate_response["user_id"] + self._is_logged_in.set() + print(f"Login successful, User ID: {self._user_id}") + # update our cookie + jar.update_cookies(cookie, URL("https://twitch.tv")) + + async def gql_request(self, op: GQLOperation) -> Dict[str, Any]: + await self.check_login() + headers = { + "Authorization": f"OAuth {self._access_token}", + "Client-Id": CLIENT_ID, + } + logger.debug(f"GQL Request: {op}") + async with self._session.post(GQL_URL, json=op, headers=headers) as response: + response_json = await response.json() + logger.debug(f"GQL Response: {response_json}") + return response_json + + async def get_inventory(self) -> List[DropsCampaign]: + response = await self.gql_request(GQL_OPERATIONS["Inventory"]) + inventory = response["data"]["currentUser"]["inventory"] + return [DropsCampaign(self, data) for data in inventory["dropCampaignsInProgress"]] + + async def claim_points(self, channel_id: Union[str, int], claim_id: str): + variables = {"input": {"channelID": str(channel_id), "claimID": claim_id}} + await self.gql_request( + GQL_OPERATIONS["ClaimCommunityPoints"].with_variables(variables) + ) diff --git a/websocket.py b/websocket.py new file mode 100644 index 0000000..7c4b232 --- /dev/null +++ b/websocket.py @@ -0,0 +1,216 @@ +from __future__ import annotations + +import json +import random +import string +import asyncio +import logging +from typing import Any, Optional, Dict, Tuple, Set, Iterable, TYPE_CHECKING + +from websockets.exceptions import ConnectionClosed, ConnectionClosedOK +from websockets.client import WebSocketClientProtocol, connect as websocket_connect +from exceptions import MinerException + +from inventory import TimedDrop +from constants import WEBSOCKET_URL, PING_INTERVAL, WebsocketTopic, get_topic + +if TYPE_CHECKING: + from twitch import Twitch + + +logger = logging.getLogger("TwitchDrops") + + +class Websocket: + def __init__(self, twitch: Twitch): + self._twitch = twitch + self._ws: Optional[WebSocketClientProtocol] = None + self.connected = asyncio.Event() # set when there's an active websocket connection + self.reconnect = asyncio.Event() # set when the websocket needs to reconnect + self._send_queue: asyncio.Queue[Tuple[str, Dict[str, Any]]] = asyncio.Queue() + self._recv_dict: Dict[str, asyncio.Future[Any]] = {} + self._topics: Set[WebsocketTopic] = set() + self._ping_task: Optional[asyncio.Task[Any]] = None + + async def connect(self): + # ensure we're logged in before connecting + await self._twitch.wait_until_login() + assert self._twitch._user_id is not None + logger.info("Connecting to Websocket") + # Listen to our events of choice + user_id = self._twitch._user_id + # Defer topics via a task so we can proceed to the actual websocket connection + asyncio.create_task( + self.add_topics([ + get_topic("UserDrops", user_id, self.process_drops), + get_topic("UserCommunityPoints", user_id, self.process_points), + ]) + ) + # Connect/Reconnect loop + async for websocket in websocket_connect(WEBSOCKET_URL, ssl=True, ping_interval=None): + websocket.BACKOFF_MAX = 3 * 60 # type: ignore # 3 minutes + self._ws = websocket + self.reconnect.clear() + self.connected.set() + logger.info("Websocket Connected") + self._ping_task = asyncio.create_task(self._ping_loop()) + try: + while not self.reconnect.is_set(): + # Process receive + try: + # Wait up to 0.5s for a message we're supposed to receive + raw_message = await asyncio.wait_for(websocket.recv(), timeout=0.5) + except asyncio.TimeoutError: + # nothing - skip handling + pass + else: + # we've got something to process + message = json.loads(raw_message) + logger.debug(f"Websocket received: {message}") + msg_type = message["type"] + # handle the simple PING case + if msg_type == "PONG": + ping_future = self._recv_dict.pop("PING", None) + if ping_future is not None and not ping_future.done(): + ping_future.set_result(message) + elif msg_type == "RESPONSE": + self._recv_dict.pop(message["nonce"]).set_result(message) + elif msg_type == "RECONNECT": + # We've received a reconnect request + logger.warning("Received a Websocket Reconnect Request") + self.reconnect.set() + elif msg_type == "MESSAGE": + # request the assigned topic to process the response + target_topic = message["data"]["topic"] + for topic in self._topics: + if target_topic == topic: + await topic.process(json.loads(message["data"]["message"])) + break + else: + logger.error(f"Received unknown websocket payload: {message}") + + # Early exit if needed + if self.reconnect.is_set(): + break + + # Process send + while not self._send_queue.empty(): + nonce, message = self._send_queue.get_nowait() + if nonce != "PING": + message["nonce"] = nonce + await websocket.send(json.dumps(message, separators=(',', ':'))) + logger.debug(f"Websocket sent: {message}") + # A reconnect was requested + continue + except ConnectionClosed as exc: + self.connected.clear() + if self._ping_task is not None: + self._ping_task.cancel() + self._ping_task = None + if isinstance(exc, ConnectionClosedOK): + if exc.rcvd_then_sent: + # server closed the connection, not us - reconnect + logger.warning("Websocket Disconnected - Reconnecting") + continue + # we closed it - exit + break + # otherwise, reconnect + logger.warning("Websocket Closed - Reconnecting") + continue + except Exception: + logger.exception("Exception in Websocket - Reconnecting") + continue + + async def close(self): + self.connected.clear() + if self._ping_task is not None: + self._ping_task.cancel() + if self._ws is not None: + await self._ws.close() + + async def _ping_loop(self): + await self.connected.wait() + ping_every = PING_INTERVAL.total_seconds() + while self.connected.is_set(): + try: + await asyncio.wait_for(self.send({"type": "PING"}), timeout=10) + except asyncio.TimeoutError: + # per documentation, if there's no response for a PING, reconnect to the websocket + logger.warning("Websocket got no response to PING - reconnect") + self.reconnect.set() + break + await asyncio.sleep(ping_every) + + def create_nonce(self, length: int = 30) -> str: + available_chars = string.ascii_letters + string.digits + return ''.join(random.choices(available_chars, k=length)) + + def send(self, message: Dict[str, Any]) -> asyncio.Future[Dict[str, Any]]: + logger.debug(f"Websocket sending: {message}") + msg_type = message["type"] + if msg_type == "PING": + nonce = "PING" + else: + nonce = self.create_nonce() + self._send_queue.put_nowait((nonce, message)) + future: asyncio.Future[Dict[str, Any]] = asyncio.get_running_loop().create_future() + self._recv_dict[nonce] = future + return future + + async def add_topics(self, topics: Iterable[WebsocketTopic]): + # ensure no topics end up duplicated + topics = set(topics) + topics.difference_update(self._topics) + if not topics: + # none left to add + return + self._topics.update(topics) + if len(self._topics) >= 50: + # TODO: Handle multiple connections (up to 10) since one allows only up to 50 topics + raise MinerException("Too many topics") + await self.connected.wait() + assert self._twitch._access_token is not None + auth_token = self._twitch._access_token + topics_list = list(map(str, topics)) + logger.info(f"Listening for: {', '.join(topics_list)}") + await self.send( + { + "type": "LISTEN", + "data": { + "topics": topics_list, + "auth_token": auth_token, + } + } + ) + + async def process_drops(self, message: Dict[str, Any]): + drop_id = message["data"]["drop_id"] + drop: Optional[TimedDrop] = None + for campaign in self._twitch.inventory: + drop = campaign.get_drop(drop_id) + if drop is not None: + break + else: + logger.error(f"Drop with ID of {drop_id} not found!") + return + drop.update(message) + msg_type = message["type"] + if msg_type == "drop-progress": + print( + f"Drop progress: {drop.progress:4.0%} ({drop.remaining_minutes} minutes remaining)" + ) + elif msg_type == "drop-claim": + await drop.claim() + print(f"Claimed drop: {', '.join(drop.rewards)}") + + async def process_points(self, message: Dict[str, Any]): + msg_type = message["type"] + if msg_type == "points-earned": + points = message["data"]["point_gain"]["total_points"] + balance = message["data"]["balance"]["balance"] + print(f"Earned points for watching: {points:3}, total: {balance}") + elif msg_type == "claim-available": + claim_data = message["data"]["claim"] + points = claim_data["point_gain"]["total_points"] + await self._twitch.claim_points(claim_data["channel_id"], claim_data["id"]) + print(f"Claimed bonus points: {points}")