From fac2788e4b4d9e55c6eb9e99f4220c6b83790caf Mon Sep 17 00:00:00 2001 From: DevilXD Date: Fri, 3 Dec 2021 17:48:51 +0100 Subject: [PATCH] First release --- .gitignore | 7 + .vscode/launch.json | 23 +++ .vscode/tasks.json | 20 +++ build.bat | 1 + channel.py | 136 ++++++++++++++++++ constants.py | 138 ++++++++++++++++++ exceptions.py | 32 +++++ inventory.py | 128 +++++++++++++++++ main.py | 123 ++++++++++++++++ pickaxe.ico | Bin 0 -> 67646 bytes twitch.py | 340 ++++++++++++++++++++++++++++++++++++++++++++ websocket.py | 216 ++++++++++++++++++++++++++++ 12 files changed, 1164 insertions(+) create mode 100644 .gitignore create mode 100644 .vscode/launch.json create mode 100644 .vscode/tasks.json create mode 100644 build.bat create mode 100644 channel.py create mode 100644 constants.py create mode 100644 exceptions.py create mode 100644 inventory.py create mode 100644 main.py create mode 100644 pickaxe.ico create mode 100644 twitch.py create mode 100644 websocket.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c757a33 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +__pycache__ +.mypy_cache +/build +/dist +/cookies.pickle +/settings.json +/*.spec diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..9dbd803 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,23 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Run Current File", + "type": "python", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal" + }, + { + "name": "Run App", + "type": "python", + "request": "launch", + "program": "main.py", + "console": "integratedTerminal", + "justMyCode": false + }, + ] +} \ No newline at end of file diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..768010e --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,20 @@ +{ + // See https://go.microsoft.com/fwlink/?LinkId=733558 + // for the documentation about the tasks.json format + "version": "2.0.0", + "tasks": [ + { + "label": "Build App", + "type": "shell", + "command": "build.bat", + "problemMatcher": [], + "group": { + "kind": "build", + "isDefault": true + }, + "presentation": { + "showReuseMessage": false + } + } + ] +} diff --git a/build.bat b/build.bat new file mode 100644 index 0000000..dac7f1e --- /dev/null +++ b/build.bat @@ -0,0 +1 @@ +pyinstaller -F -n "Twitch Drops Miner (by DevilXD)" -i pickaxe.ico main.py diff --git a/channel.py b/channel.py new file mode 100644 index 0000000..d1e5eed --- /dev/null +++ b/channel.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +import re +import json +import logging +from copy import copy +from base64 import b64encode +from datetime import datetime, timezone +from typing import Any, Optional, Dict, TYPE_CHECKING + +from inventory import Game +from exceptions import MinerException +from constants import BASE_URL, GQL_OPERATIONS + +if TYPE_CHECKING: + from twitch import Twitch + + +logger = logging.getLogger("TwitchDrops") + + +class Stream: + def __init__(self, channel: Channel, data: Dict[str, Any]): + self._twitch = channel._twitch + self.channel = channel + stream = data["stream"] + self.broadcast_id = int(stream["id"]) + self.viewer_count = stream["viewersCount"] + self.drops_enabled = any(tag["localizedName"] == "Drops Enabled" for tag in stream["tags"]) + settings = data["broadcastSettings"] + self.game: Game = Game(settings["game"]) + self.title = settings["title"] + self._timestamp = datetime.now(timezone.utc) + + +class Channel: + async def __new__(cls, *args, **kwargs): + """ + Enables __init__ to be async. + The instance is returned after initialization completes. + """ + self = super().__new__(cls) + await self.__init__(*args, **kwargs) + return self + + async def __init__(self, twitch: Twitch, channel_name: str): # type: ignore + self._twitch: Twitch = twitch + self.id: int = 0 # temp, to be filled by get_stream + self.name: str = channel_name + self.url: str = f"{BASE_URL}/{channel_name}" + self._spade_url: str = await self.get_spade_url() + self.stream: Optional[Stream] = None + await self.get_stream() + + @property + def online(self) -> bool: + """ + Returns True if the streamer is online and is currently streaming, False otherwise. + """ + return self.stream is not None + + async def get_spade_url(self) -> str: + """ + To get this monstrous thing, you have to walk a chain of requests. + Streamer page (HTML) --parse-> Streamer Settings (JavaScript) --parse-> Spade URL + """ + async with self._twitch._session.get(self.url) as response: + streamer_html = await response.text(encoding="utf8") + match = re.search( + r'src="(https://static\.twitchcdn\.net/config/settings\.[0-9a-f]{32}\.js)"', + streamer_html, + re.I, + ) + if not match: + raise MinerException("Error while spade_url extraction: step #1") + streamer_settings = match.group(1) + async with self._twitch._session.get(streamer_settings) as response: + settings_js = await response.text(encoding="utf8") + match = re.search( + r'"spade_url": ?"(https://video-edge-[.\w\-/]+\.ts)"', settings_js, re.I + ) + if not match: + raise MinerException("Error while spade_url extraction: step #2") + return match.group(1) + + async def get_stream(self) -> Optional[Stream]: + op = copy(GQL_OPERATIONS["GetStreamInfo"].with_variables({"channel": self.name})) + response = await self._twitch.gql_request(op) + if response: + stream_data = response["data"]["user"] + self.id = int(stream_data["id"]) # fill channel_id + if stream_data["stream"]: + self.stream = Stream(self, stream_data) + else: + self.stream = None + return self.stream + + async def check_online(self) -> bool: + stream = await self.get_stream() + if stream is None: + return False + return True + + def set_offline(self): + # to be called externally, if we receive an event about this happening + self.stream = None + + def _encode_payload(self): + assert self.stream is not None + assert self._twitch._user_id is not None + payload = [ + { + "event": "minute-watched", + "properties": { + "channel_id": self.id, + "broadcast_id": self.stream.broadcast_id, + "player": "site", + "user_id": self._twitch._user_id, + } + } + ] + json_event = json.dumps(payload, separators=(",", ":")) + return {"data": (b64encode(json_event.encode("utf8"))).decode("utf8")} + + async def _send_watch(self): + """ + This uses the encoded payload on spade url to simulate watching the stream. + Optimally, send every 60 seconds to advance drops. + """ + if not self.online: + return + logger.debug(f"Sending minute-watched to {self.name}") + async with self._twitch._session.post( + self._spade_url, data=self._encode_payload() + ) as response: + return response.status == 204 diff --git a/constants.py b/constants.py new file mode 100644 index 0000000..98f9d43 --- /dev/null +++ b/constants.py @@ -0,0 +1,138 @@ +from __future__ import annotations + +from copy import copy +from datetime import timedelta +from typing import Any, Optional, Union, Dict, Callable + + +BASE_URL = "https://twitch.tv" +WEBSOCKET_URL = "wss://pubsub-edge.twitch.tv/v1" +GQL_URL = "https://gql.twitch.tv/gql" +CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko" +USER_AGENT = "Twitch Drops App" +SETTINGS_PATH = "settings.json" +COOKIES_PATH = "cookies.pickle" +PING_INTERVAL = timedelta(minutes=3) + + +class GQLOperation(Dict[str, Any]): + def __init__(self, name: str, sha256: str, *, variables: Optional[Dict[str, Any]] = None): + super().__init__( + operationName=name, + extensions={ + "persistedQuery": { + "version": 1, + "sha256Hash": sha256, + } + } + ) + if variables is not None: + super().__setitem__("variables", variables) + + def with_variables(self, variables: Dict[str, Any]): + modified = copy(self) + modified["variables"] = variables + return modified + + +GQL_OPERATIONS: Dict[str, GQLOperation] = { + "IsStreamLive": GQLOperation( + "WithIsStreamLiveQuery", + "04e46329a6786ff3a81c01c50bfa5d725902507a0deb83b0edbf7abe7a3716ea", + ), + "GetStreamInfo": GQLOperation( # used + "VideoPlayerStreamInfoOverlayChannel", + "a5f2e34d626a9f4f5c0204f910bab2194948a9502089be558bb6e779a9e1b3d2", + ), + "ClaimCommunityPoints": GQLOperation( # used + "ClaimCommunityPoints", + "46aaeebe02c99afdf4fc97c7c0cba964124bf6b0af229395f1f6d1feed05b3d0", + ), + "ClaimDrop": GQLOperation( # used + "DropsPage_ClaimDropRewards", + "2f884fa187b8fadb2a49db0adc033e636f7b6aaee6e76de1e2bba9a7baf0daf6", + ), + "ChannelPointsContext": GQLOperation( # used + "ChannelPointsContext", + "9988086babc615a918a1e9a722ff41d98847acac822645209ac7379eecb27152", + ), + "ModViewChannelQuery": GQLOperation( + "ModViewChannelQuery", + "df5d55b6401389afb12d3017c9b2cf1237164220c8ef4ed754eae8188068a807", + ), + "Inventory": GQLOperation( # used + "Inventory", + "e0765ebaa8e8eeb4043cc6dfeab3eac7f682ef5f724b81367e6e55c7aef2be4c", + ), + "ViewerDropsDashboard": GQLOperation( + "ViewerDropsDashboard", + "c4d61d7b71d03b324914d3cf8ca0bc23fe25dacf54120cc954321b9704a3f4e2", + ), + "DropCampaignDetails": GQLOperation( + "DropCampaignDetails", + "14b5e8a50777165cfc3971e1d93b4758613fe1c817d5542c398dce70b7a45c05", + ), + "AvailableDrops": GQLOperation( + "DropsHighlightService_AvailableDrops", + "b19ee96a0e79e3f8281c4108bc4c7b3f232266db6f96fd04a339ab393673a075", + ), + # Use for replace https://api.twitch.tv/helix/users?login={self.username} + "ReportMenuItem": GQLOperation( + "ReportMenuItem", + "8f3628981255345ca5e5453dfd844efffb01d6413a9931498836e6268692a30c", + ), + "PersonalSections": GQLOperation( + "PersonalSections", + "9fbdfb00156f754c26bde81eb47436dee146655c92682328457037da1a48ed39", + variables={ + "input": { + "sectionInputs": ["FOLLOWED_SECTION"], + "recommendationContext": {"platform": "web"}, + }, + "channelLogin": None, + "withChannelUser": False, + "creatorAnniversariesExperimentEnabled": False, + }, + ), +} + + +def get_topic( + topic_name: str, target_id: Union[str, int], process: Callable[[Dict[str, Any]], Any] +) -> WebsocketTopic: + return WebsocketTopic(f"{WEBSOCKET_TOPICS[topic_name]}.{target_id}", process) + + +class WebsocketTopic: + def __init__(self, topic_id: str, process: Callable[[Dict[str, Any]], Any]): + self.id: str = topic_id + self.process: Callable[[Dict[str, Any]], Any] = process + + def __str__(self) -> str: + return self.id + + def __eq__(self, other): + if isinstance(other, str): + return self.id == other + elif isinstance(other, WebsocketTopic): + return self.id == other.id + return NotImplemented + + def __hash__(self) -> int: + return hash(self.id) + + +WEBSOCKET_TOPICS: Dict[str, str] = { + # Using user_id + "UserDrops": "user-drop-events", + "UserStreamState": "stream-change-v1", + "UserCommunityPoints": "community-points-user-v1", + "Presence": "presence", + "Notifications": "onsite-notifications", + # Using channel_id + "ChannelDrops": "channel-drop-events", + "ChannelStreamState": "stream-change-by-channel", + "ChannelCommunityPoints": "community-points-channel-v1", + "VideoPlayback": "video-playback-by-id", + "StreamUpdate": "broadcast-settings-update", +} diff --git a/exceptions.py b/exceptions.py new file mode 100644 index 0000000..25d6079 --- /dev/null +++ b/exceptions.py @@ -0,0 +1,32 @@ +class MinerException(Exception): + def __init__(self, *args: object): + if args: + super().__init__(*args) + else: + super().__init__("Unknown miner error") + + +class RequestException(MinerException): + def __init__(self, *args: object): + if args: + super().__init__(*args) + else: + super().__init__("Unknown error during request") + + +class LoginException(RequestException): + def __init__(self, *args: object): + if args: + super().__init__(*args) + else: + super().__init__("Unknown error during login") + + +class CaptchaRequired(LoginException): + def __init__(self): + super().__init__("Captcha is required") + + +class IncorrectCredentials(LoginException): + def __init__(self): + super().__init__("Incorrect username or password") diff --git a/inventory.py b/inventory.py new file mode 100644 index 0000000..1497956 --- /dev/null +++ b/inventory.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any, Optional, List, Dict, TYPE_CHECKING + +from constants import GQL_OPERATIONS + +if TYPE_CHECKING: + from twitch import Twitch + + +async def claim_drop(twitch: Twitch, claim_id: str) -> bool: + op = GQL_OPERATIONS["ClaimDrop"].with_variables( + {"input": {"dropInstanceID": claim_id}} + ) + response = await twitch.gql_request(op) + data = response["data"] + if "errors" in data and data["errors"]: + return False + elif "claimDropRewards" in data: + if not data["claimDropRewards"]: + return False + elif ( + data["claimDropRewards"]["status"] + in ["ELIGIBLE_FOR_ALL", "DROP_INSTANCE_ALREADY_CLAIMED"] + ): + return True + return False + + +class Game: + def __init__(self, data: Dict[str, Any]): + self.id: int = int(data["id"]) + self.name: str = data["name"] + + def __eq__(self, other: object): + if isinstance(other, self.__class__): + return self.id == other.id + return NotImplemented + + def __hash__(self) -> int: + return self.id + + +class BaseDrop: + def __init__(self, campaign: DropsCampaign, data: Dict[str, Any]): + self._twitch: Twitch = campaign._twitch + self.id = data["id"] + self.name: str = data["name"] + self.campaign: DropsCampaign = campaign + self.rewards: List[str] = [b["benefit"]["name"] for b in data["benefitEdges"]] + self.starts_at: datetime = datetime.strptime(data["startAt"], "%Y-%m-%dT%H:%M:%SZ") + self.ends_at: datetime = datetime.strptime(data["endAt"], "%Y-%m-%dT%H:%M:%SZ") + # If claim_id is not None, we can use it to claim the drop + self.claim_id: Optional[str] = data["self"]["dropInstanceID"] + self.is_claimed: bool = data["self"]["isClaimed"] + self._preconditions: bool = data["self"]["hasPreconditionsMet"] + + @property + def can_earn(self) -> bool: + return ( + self._preconditions # preconditions are met + and self.campaign.active # campaign is active + and not self.is_claimed # drop isn't already claimed + ) + + @property + def can_claim(self) -> bool: + return self.claim_id is not None and not self.is_claimed + + async def claim(self) -> bool: + """ + Returns True if the claim succeeded, False otherwise. + """ + if not self.can_claim: + return False + assert self.claim_id is not None + self.is_claimed = await claim_drop(self._twitch, self.claim_id) + return self.is_claimed + + +class TimedDrop(BaseDrop): + def __init__(self, campaign: DropsCampaign, data: Dict[str, Any]): + super().__init__(campaign, data) + self.current_minutes: int = data["self"]["currentMinutesWatched"] + self.required_minutes: int = data["requiredMinutesWatched"] + if self.is_claimed: + # correct minutes for claimed drops + self.current_minutes = self.required_minutes + + @property + def remaining_minutes(self): + return self.required_minutes - self.current_minutes + + @property + def progress(self): + return self.current_minutes / self.required_minutes + + def update(self, message: Dict[str, Any]): + # {"type": "drop-progress", data: {"current_progress_min": 3, "required_progress_min": 10}} + # {"type": "drop-claim", data: {"drop_instance_id": ...}} + msg_type = message["type"] + if msg_type == "drop-progress": + self.current_minutes = message["data"]["current_progress_min"] + self.required_minutes = message["data"]["required_progress_min"] + elif msg_type == "drop-claim": + self.claim_id = message["data"]["drop_instance_id"] + + +class DropsCampaign: + def __init__(self, twitch: Twitch, data: Dict[str, Any]): + self._twitch: Twitch = twitch + self.id: str = data["id"] + self.name: str = data["name"] + self.game: Game = Game(data["game"]) + self.starts_at: datetime = datetime.strptime(data["startAt"], "%Y-%m-%dT%H:%M:%SZ") + self.ends_at: datetime = datetime.strptime(data["endAt"], "%Y-%m-%dT%H:%M:%SZ") + self.status: str = data["status"] + self.timed_drops: Dict[str, TimedDrop] = { + d["id"]: TimedDrop(self, d) for d in data["timeBasedDrops"] + } + + @property + def active(self): + return self.status == "ACTIVE" + + def get_drop(self, drop_id: str) -> Optional[TimedDrop]: + return self.timed_drops.get(drop_id) diff --git a/main.py b/main.py new file mode 100644 index 0000000..46de812 --- /dev/null +++ b/main.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +__version__ = 1 + +import os +import sys +import json +import ctypes +import logging +import asyncio +import traceback +import threading +from typing import Any, Dict, NoReturn + +from twitch import Twitch +from constants import SETTINGS_PATH + +try: + import win32api + from win32con import CTRL_CLOSE_EVENT, CTRL_LOGOFF_EVENT, CTRL_SHUTDOWN_EVENT +except (ImportError, ModuleNotFoundError): + raise ImportError("You have to run 'python -m pip install pywin32' first") + + +# nice console title +try: + ctypes.windll.kernel32.SetConsoleTitleW(f"Twitch Drops Miner v{__version__} (by DevilXD)") +except AttributeError: + # ensure we're on windows and there was no import problems + print("Only Windows supported!") + quit() +assert sys.platform == "win32" + + +def terminate() -> NoReturn: + forever = threading.Event() + print("\nApplication Terminated.\nClose the console window to exit the application.") + forever.wait() + raise RuntimeError("Uh oh") # this will never run, solely for MyPy + + +# handle extra stackable '-v' parameter, that switches the logging level +logging_level = logging.ERROR +if len(sys.argv) > 1: + arg = sys.argv[1] + if arg == ("-v", "-v1"): + logging_level = logging.WARNING + elif arg in ("-vv", "-v2"): + logging_level = logging.INFO + elif arg in ("-vvv", "-v3"): + logging_level = logging.DEBUG +# handle logging stuff +logger = logging.getLogger("TwitchDrops") +handler = logging.StreamHandler(sys.stdout) +handler.setFormatter(logging.Formatter("{levelname}: {message}", style='{')) +logger.addHandler(handler) +logger.setLevel(logging_level) +# handle settings +if not os.path.isfile(SETTINGS_PATH): + default = { + "username": "YourTwitchUsername", + "channels": ["Channel1", "Channel2", "Channel3"], + } + with open(SETTINGS_PATH, 'w', encoding="utf8") as file: + json.dump(default, file, indent=4) + print( + f"File '{SETTINGS_PATH}' created.\n" + "Please modify default settings as necessary, then relaunch the application." + ) + terminate() +with open(SETTINGS_PATH, 'r', encoding="utf8") as file: + settings: Dict[str, Any] = json.load(file) +required_fields = ["channels"] +for field_name in required_fields: + if field_name not in settings: + print(f"Field '{field_name}' is a required field in '{SETTINGS_PATH}'") + terminate() +# asyncio loop +loop = asyncio.get_event_loop() +# client init +client = Twitch(settings.get("username"), settings.get("password")) +# main task and it's close event +main_task = loop.create_task(client.run(settings["channels"])) +close_event = threading.Event() + + +def clean_exit(code: int): + if code not in (CTRL_CLOSE_EVENT, CTRL_LOGOFF_EVENT, CTRL_SHUTDOWN_EVENT): + # filter only events we want + return False + # cancel the main task - this triggers the cleanup + main_task.cancel() + # wait until cleanup completes + close_event.wait() + # tell OS that we're free to exit now + return True + + +# ensures clean exit upon closing the console +win32api.SetConsoleCtrlHandler(clean_exit, True) +try: + loop.run_until_complete(main_task) +except (asyncio.CancelledError, KeyboardInterrupt): + # KeyboardInterrupt causes run_until_complete to exit, but without cancelling the task. + # The loop stops and thus the task gets frozen, until the loop runs again. + # Because we don't want anything from there to actually run during cleanup, + # we need to explicitly cancel the task ourselves here. + main_task.cancel() + # cancel all other tasks + for task in asyncio.all_tasks(loop): + if not task.done(): + task.cancel() + # main_task was cancelled due to program shutting down - do the cleanup + loop.run_until_complete(client.close()) + # notify we're free to exit + close_event.set() +except Exception: + print("Fatal error encountered:\n") + traceback.print_exc() + terminate() +finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() diff --git a/pickaxe.ico b/pickaxe.ico new file mode 100644 index 0000000000000000000000000000000000000000..a6ecd185456fa679f653a369529bceb31c87fe1d GIT binary patch literal 67646 zcmeHQZKxf^8Qyrs8$yUdYzU=zZbK>gA)z3}ij?z1tVk(E`h^6MB7_ob@Q3Y>()3&; zv^E6E4-rHpNN7b``$vk1h*yfVg%G5O2$gaOsgzL4MT%U?<(@vz?wNDutot$Z?#%A& zIdj8g_uZL!=XsxJ_G@-_Pm=7zzs=1A|8FG6_wP%NBuR1{37fbv-OkmbzsWbwPGahe zgTI6On~QX5Yg^R5GwuHe?(c0&=dyO9{+;RnP?;ueZjbtRqW{ZnYFyP;)W3E8uYq%3 zq6P66^>0=Gcft1^K?pwu5%q6P|DS}UyTTInZ$*C|^EA#O9!pU##-rMf!7}f3%LHiP#^aG^*CY<)EOw_+p|LfrA4AL!q($ofP)W1~!-+`OkY`E-W*{J_#^#2byxxbHE zN-#(L7xe!d_~`ZhoNXV7`p@)#4_q9w<*)DMqyDM>m%+o~zH2GLAN4;Q^!zM|rZ#m} zT>oDn{+n$0TDGz)tp7^y_o^SSsQ&LD?xh{mQQDd;S5SYB;Rib(-$XN)WmZuC(=PdI z!xuDq2EV?+OK_X8j8y%~zGCODwW2_)0~f%6YHO#J4{tcd=6|GHP>d%(-*k2d?k znY^|pwq+YOctR;l?fE&p-uwaYqXJznNCFhJ6N;`TIR1tBg8JS{okqZW8xrqPP2$tb zJYDCzC|}Ty-e$<_e2(Aw_w|Z6ui6I8FAFvKPLrQN(ueVVabEw;a`J_+} z9xp_56NT-CtpA&cDd>0Z%laR1zc19|+hsoMwhP+}RsZJ@PcO&yf%|Tu7O&T7T$L|u zGerG)ztmUl?*w*Ut4etNa+SVydBQd&IDOXP{2kHxuZ|zz`M`Y`=|iCoZ#BgyY}coj zw!S$>zpv-}n}Gdop$5FaITrOl8~itR$~1z0PYSi?J|e#6)A2ND-*!LsQT~!1OP|)M zUjy!)J%>yD=s-`%NKATK>DkYa~}8a zag9}orKyHZ>ObY0gSPyE>q`yze!>UN26&oiD6RVm@b>kZ?>MlV&gaSV(|Dd|S>NfI z{?C@h5&CxUY&)Mv^#RUV_4bZZ&-A}5be)Uvf#Z9GJoC2$yv|(`;!=IH&E-2q-1cD` zIDSdUvzK$e&Y2Kbo4TvEzNO>t%R2Zyfh0Nae}H=4j{Azgp6LHk8(b((TrPq8Spmm9 z?zeKT6JS&OV_Qq^JNLQNq%B{-y&p;PTwgSoTM}0f^#7$Kt`O>g+lK}GUPYbO=CKIy zwYA@~ofnY2p96jL72xJrAJ1F1e6{+!&;oq6@kq<_e)(5Ev}wa9us4XP%Uo;PwxqrQr;fSlI-ljV#t0f%r`%J3*Geu=W+y=~dAaU$Q zVK-fN_AN32R@0b@%5hAS=l06^+Q?@)RlhTCZ2K56vtKLRrb~WzwzF#-0bbL%^m6Z` zos&pL-`>e#x?-zC7v+KZbPl&3f7SZqUEo!13Ncs3qvzd#|HqLQ&hM)2q9w}e*Z!%m zqH-Z^17_auNsjxUqTZn{V?q16Uar=8{mX0kgSN4u4v!Og9A2M++hWwq18b9acwW?f zb+KN+b)I`0`_J&#==YIO|7Qi5f_=vaSb2Q2;YgL`K60cB&Gj~g*i{T%zK%o=DzEAE zHmla*=XoUEKi5xH)#)k^xQ-%8j{A(Mll#3qSHO|y+Q#PnyY(?Xk3qNLPrvt5Sc1F< z1lAXY{Jo;b+~v9uztY8d9KgeCM&(~E8-3%z2fx6`eV^BF3Zv=L`8ym<;uGN4<9wt^ z+xfb{a-alHaQA?~DY@4tcR|f#E8$z$xN5!8wyXS%f z#jjv_-9`Jl4`8TzH!g^4Kw#we`K}50`Zwx0xklDGYq~CoLmdyEYT9XcG z`SNYsz{hX3@HgTVCYz;`?=`rNSr`ki`<=gs1)s-j@N_y30Bq84GjiWI8vAywZ=Jsy zzRUWzxBIagJiOQLWGoQvO3(S^@TYY;bd_Jjcai>oLLG14+Jf&ZHJF}nn=9b?0@8L3 zt|G3ujdPlU-1o_&WlYbZyf^KqcsQO|#--Zc+A#oFIKIz!Fjd<&OTN3{{GEN5{ogRI zciV~s?Nr?ZDDKBDIjHglJZB6!QrPt6`6}Nbb!FOoNxZn z#t|f5yXD(9u|Gh)V2}HXyOP+Pj_awmOqal<&;P69+S_>m7;ZJh^mh8T))IUgJG7^jZ^1;iZ0F$*|MpQ}2ZAI$iXq~Gcw zmwboE=`&GZ+S4zv5;MO=_?XXjY<&Yy(L4aXuc~$S6KB8i0$-a*%D3&3=ll)ZSi39_ zynN;o#6GX{08FOm0BSTwYCMM)K7NQ~tGO!wDe5}whdLLlx(+_MKPi6emW}2Jpq)pM zIIdmSPFem-tE!W)xS_>4OFcx-2+;<|95_}}rfuDJ&gqEzzT=JnPI(VzdL2lOSk~&E z5amxJQNKEk%kuf2Z|{C*%&}D7+<{x$V}Q|l0Df;ldJU;eSJUm@ztQE5+#WcFK76On zR0~zEQWN;&91iLHE!75>!9LgPfwaaD!L#Xjpsr2fcU0hua||T&cXZ!e4}NUlnBc$* zf@{tTHF+kWYs71Th51&nzl7vM``-jk+0wy-ZnoC}>*(3-D3{Jh^~YTK!?(IM37gfqa>g69ghm(#%dlIfF4Efh)T4rs>AVM&VLH- z*_a1Pd*u?EJb+ZCUDG^jUpRiyE25SdEc7v%CoJg)95?vp5>waP(4foBNIyopiqzzJ zU0J{2^WPxf)9S5gj(|g#&msKw%cUFW(&QZsX}{q4g>%rD_)Q{? zJ#Y+hVNBU&oY7Y?1K|9-NKzer)Z;s9ya(Z!wAFlr;t1o`Dh9yu?~#1y=*lPi!>dR< zRy={kV+F@4;<2JiSi?N<_#Togt-A8VaZC0O9y3(`Xgp`tkiuRU17FLae^tNZ@q^bJ z>>K=MTjC2g^Rdxt(7`;yzewHz2_P!?_&yTPf&WDM1d02^$4IXr@f$sr-#2& z|E&4VS@|h>BbA@R*-M7XPT}k(gED(acsuj_5$)}{O;fWoc;i`q25&rTZ#^x)l_)$J zJDW3mkmh6baEfP0dv9)!G8@fg{h#J!H$gL+y)&~jI6DcNP4hE2+c|rj07jah!6SU# z-U9s0&fw;4!q4(i4}Y70uiJ#bU=vo_-ViGYkFZjERAlcGZkEq%gu7_(5WZG_ZXX@7 zJk`qbzc#{(X78jn)k*W$a`vs6O|{be;Wk$g9v8qn*&bpAYuW#6IeU=Vdr7K7f?8?$ zEZnqKYNzUDUdHs7g_q{1b{5WrUd99DZ>QnT?Nlw)XFiEgHb~tjQK?$USN3Ka?t~<6 zXT*J?PvK=7Hk0ubPG)Z< zTRA%+53|29RGq^EA+QIt+{_+oHd@!=fRKgeLBK+D2!5eC1i#Q6f?q5^TW=t>)9#jm x$n5!sshxH>mYCWz4rlf>hOH#~%Opf!S<6Z&4z}0HfQ}gvErytjaO&9o;{TtQN4fw2 literal 0 HcmV?d00001 diff --git a/twitch.py b/twitch.py new file mode 100644 index 0000000..620c4fa --- /dev/null +++ b/twitch.py @@ -0,0 +1,340 @@ +from __future__ import annotations + +import os +import asyncio +import logging +from yarl import URL +from getpass import getpass +from functools import partial +from typing import Any, Optional, Union, List, Dict, cast + +try: + import aiohttp +except ImportError: + raise ImportError("You have to run 'python -m pip install aiohttp' first") + +from channel import Channel +from websocket import Websocket +from inventory import DropsCampaign +from exceptions import LoginException, CaptchaRequired, IncorrectCredentials +from constants import ( + CLIENT_ID, + USER_AGENT, + COOKIES_PATH, + GQL_URL, + GQL_OPERATIONS, + GQLOperation, + WebsocketTopic, + get_topic, +) + + +logger = logging.getLogger("TwitchDrops") + + +class Twitch: + def __init__( + self, + username: Optional[str] = None, + password: Optional[str] = None, + *, + loop: Optional[asyncio.AbstractEventLoop] = None, + ): + self.username: Optional[str] = username + self.password: Optional[str] = password + # Cookies, session and auth + cookie_jar = aiohttp.CookieJar() + if os.path.isfile(COOKIES_PATH): + cookie_jar.load(COOKIES_PATH) + self._session = aiohttp.ClientSession( + cookie_jar=cookie_jar, headers={"User-Agent": USER_AGENT}, loop=loop + ) + self._access_token: Optional[str] = None + self._user_id: Optional[int] = None + self._is_logged_in = asyncio.Event() + # Websocket + self.websocket = Websocket(self) + # Storing, watching and changing channels + self.channels: Dict[int, Channel] = {} + self._watching_channel: Optional[Channel] = None + self._watching_task: Optional[asyncio.Task[Any]] = None + self._channel_change = asyncio.Event() + # Inventory + self.inventory: List[DropsCampaign] = [] + + def wait_until_login(self): + return self._is_logged_in.wait() + + async def close(self): + print("Exiting...") + self._session.cookie_jar.save(COOKIES_PATH) # type: ignore + self.stop_watching() + await self._session.close() + await self.websocket.close() + await asyncio.sleep(1) # allows aiohttp to safely close the session + + @property + def currently_watching(self) -> Optional[Channel]: + return self._watching_channel + + async def run(self, channels: List[str] = []): + """ + Main method that runs the whole client. + + Here, we manage several things, specifically: + • Fetching the drops inventory to make sure that everything we can claim, is claimed + • Selecting a stream to watch, and watching it + • Changing the stream that's being watched if necessary + """ + # Start our websocket connection - shouldn't require task tracking + asyncio.create_task(self.websocket.connect()) + # Claim the drops we can + self.inventory = await self.get_inventory() + games = set() + for campaign in self.inventory: + if campaign.status == "UPCOMING": + # we have no use in processing upcoming campaigns here + continue + for drop in campaign.timed_drops.values(): + if drop.can_earn: + games.add(campaign.game) + if drop.can_claim: + await drop.claim() + # Fetch information about all channels we're supposed to handle + for channel_name in channels: + channel: Channel = await Channel(self, channel_name) # type: ignore + self.channels[channel.id] = channel + # Sub to these channel updates + topics: List[WebsocketTopic] = [] + for channel_id in self.channels: + topics.append( + get_topic( + "VideoPlayback", channel_id, partial(self.process_stream_state, channel_id) + ) + ) + await self.websocket.add_topics(topics) + + # Repeat: Change into a channel we can watch, then reset the flag + self._channel_change.set() + refresh_channels = False # we're entering having fresh channel data already + while True: + # wait for the change channel signal + await self._channel_change.wait() + for channel in self.channels.values(): + if ( + channel.stream is not None # steam online + and channel.stream.drops_enabled # drops are enabled + and channel.stream.game in games # streams a game we can earn drops in + ): + self.watch(channel) + refresh_channels = True + self._channel_change.clear() + break + else: + # there's no available channel to watch + if refresh_channels: + # refresh the status of all channels, + # to make sure that our websocket didn't miss anything til this point + print("No suitable channel to watch, refreshing...") + for channel in self.channels.values(): + await channel.get_stream() + await asyncio.sleep(0.5) + refresh_channels = False + continue + print("No suitable channel to watch, retrying in 120 seconds") + await asyncio.sleep(120) + + def watch(self, channel: Channel): + if self._watching_task is not None: + self._watching_task.cancel() + + async def watcher(channel: Channel): + op = GQL_OPERATIONS["ChannelPointsContext"].with_variables( + {"channelLogin": channel.name} + ) + i = 0 + while True: + await channel._send_watch() + if i == 0: + # ensure every 30 minutes that we don't have unclaimed points bonus + response = await self.gql_request(op) + channel_data: Dict[str, Any] = response["data"]["community"]["channel"] + claim_available: Dict[str, Any] = ( + channel_data["self"]["communityPoints"]["availableClaim"] + ) + if claim_available: + await self.claim_points(channel_data["id"], claim_available["id"]) + logger.info("Claimed bonus points") + i = (i + 1) % 30 + await asyncio.sleep(59) + + print(f"Watching: {channel.name}") + self._watching_channel = channel + self._watching_task = asyncio.create_task(watcher(channel)) + + def stop_watching(self): + if self._watching_task is not None: + logger.warning("Watching stopped.") + self._watching_task.cancel() + self._watching_task = None + self._watching_channel = None + + async def process_stream_state(self, channel_id: int, message: Dict[str, Any]): + msg_type = message["type"] + channel = self.channels.get(channel_id) + if channel is None: + logger.error(f"Stream state change for a non-existing channel: {channel_id}") + return + if msg_type == "stream-down": + logger.info(f"{channel.name} goes OFFLINE") + channel.set_offline() + if self._watching_channel is not None and self._watching_channel.id == channel_id: + # change the channel if we're currently watching it + self._channel_change.set() + elif msg_type == "stream-up": + logger.info(f"{channel.name} goes ONLINE") + + # stream_up is sent before the stream actually goes online, so just wait a bit + # and check if it's actually online by then + async def online_delay(channel: Channel): + await asyncio.sleep(10) + await channel.check_online() + + asyncio.create_task(online_delay(channel)) + elif msg_type == "viewcount": + if channel.stream is None: + # check if we've got a view count for a stream that just started + await channel.check_online() + if channel.stream is not None: + viewers = message["viewers"] + channel.stream.viewer_count = viewers + logger.info(f"{channel.name} viewers: {viewers}") + else: + logger.error(f"Channel viewcount update for an offline stream: {channel.name}") + + async def _login(self) -> str: + logger.debug("Login flow started") + if self.username is None: + self.username = input("Username: ") + if self.password is None: + self.password = getpass() + if not self.password: + # catch early empty pass + raise IncorrectCredentials() + + payload: Dict[str, Any] = { + "username": self.username, + "password": self.password, + "client_id": CLIENT_ID, + "undelete_user": False, + "remember_me": True, + } + + for attempt in range(10): + async with self._session.post( + "https://passport.twitch.tv/login", json=payload + ) as response: + login_response = await response.json() + + # Feed this back in to avoid running into CAPTCHA if possible + if "captcha_proof" in login_response: + payload["captcha"] = {"proof": login_response["captcha_proof"]} + + # Error handling + if "error_code" in login_response: + error_code = login_response["error_code"] + logger.debug(f"Login error code: {error_code}") + if error_code == 1000: + # we've failed bois + logger.debug("Login failed due to CAPTCHA") + raise CaptchaRequired() + elif error_code == 3001: + # wrong password you dummy + logger.debug("Login failed due to incorrect login or pass") + print(f"Incorrect username or password.\nUsername: {self.username}") + self.password = getpass() + if not self.password: + raise IncorrectCredentials() + elif error_code in ( + 3011, # Authy token needed + 3012, # Invalid authy token + 3022, # Email code needed + 3023, # Invalid email code + ): + # 2FA handling + email = error_code in (3022, 3023) + logger.debug("2FA token required") + token = input("2FA token: ") + if email: + # email code + payload["twitchguard_code"] = token + else: + # authy token + payload["authy_token"] = token + continue + else: + raise LoginException(login_response["error"]) + + # Success handling + if "access_token" in login_response: + # we're in bois + self._access_token = login_response["access_token"] + logger.debug(f"Access token: {self._access_token}") + break + + if self._access_token is None: + # this means we've ran out of retries + raise LoginException("Ran out of login retries") + return self._access_token + + async def check_login(self) -> None: + if self._access_token is not None and self._user_id is not None: + # we're all good + return + # looks like we're missing something + print("Logging in") + jar = self._session.cookie_jar + cookie = jar.filter_cookies("https://twitch.tv") # type: ignore + if not cookie: + # no cookie - login + await self._login() + # store our auth token inside the cookie + cookie["auth-token"] = cast(str, self._access_token) + elif self._access_token is None: + # have cookie - get our access token + self._access_token = cookie["auth-token"].value + logger.debug("Session restored from cookie") + # validate our access token, by obtaining user_id + async with self._session.get( + "https://id.twitch.tv/oauth2/validate", + headers={"Authorization": f"OAuth {self._access_token}"} + ) as response: + validate_response = await response.json() + self._user_id = cookie["persistent"] = validate_response["user_id"] + self._is_logged_in.set() + print(f"Login successful, User ID: {self._user_id}") + # update our cookie + jar.update_cookies(cookie, URL("https://twitch.tv")) + + async def gql_request(self, op: GQLOperation) -> Dict[str, Any]: + await self.check_login() + headers = { + "Authorization": f"OAuth {self._access_token}", + "Client-Id": CLIENT_ID, + } + logger.debug(f"GQL Request: {op}") + async with self._session.post(GQL_URL, json=op, headers=headers) as response: + response_json = await response.json() + logger.debug(f"GQL Response: {response_json}") + return response_json + + async def get_inventory(self) -> List[DropsCampaign]: + response = await self.gql_request(GQL_OPERATIONS["Inventory"]) + inventory = response["data"]["currentUser"]["inventory"] + return [DropsCampaign(self, data) for data in inventory["dropCampaignsInProgress"]] + + async def claim_points(self, channel_id: Union[str, int], claim_id: str): + variables = {"input": {"channelID": str(channel_id), "claimID": claim_id}} + await self.gql_request( + GQL_OPERATIONS["ClaimCommunityPoints"].with_variables(variables) + ) diff --git a/websocket.py b/websocket.py new file mode 100644 index 0000000..7c4b232 --- /dev/null +++ b/websocket.py @@ -0,0 +1,216 @@ +from __future__ import annotations + +import json +import random +import string +import asyncio +import logging +from typing import Any, Optional, Dict, Tuple, Set, Iterable, TYPE_CHECKING + +from websockets.exceptions import ConnectionClosed, ConnectionClosedOK +from websockets.client import WebSocketClientProtocol, connect as websocket_connect +from exceptions import MinerException + +from inventory import TimedDrop +from constants import WEBSOCKET_URL, PING_INTERVAL, WebsocketTopic, get_topic + +if TYPE_CHECKING: + from twitch import Twitch + + +logger = logging.getLogger("TwitchDrops") + + +class Websocket: + def __init__(self, twitch: Twitch): + self._twitch = twitch + self._ws: Optional[WebSocketClientProtocol] = None + self.connected = asyncio.Event() # set when there's an active websocket connection + self.reconnect = asyncio.Event() # set when the websocket needs to reconnect + self._send_queue: asyncio.Queue[Tuple[str, Dict[str, Any]]] = asyncio.Queue() + self._recv_dict: Dict[str, asyncio.Future[Any]] = {} + self._topics: Set[WebsocketTopic] = set() + self._ping_task: Optional[asyncio.Task[Any]] = None + + async def connect(self): + # ensure we're logged in before connecting + await self._twitch.wait_until_login() + assert self._twitch._user_id is not None + logger.info("Connecting to Websocket") + # Listen to our events of choice + user_id = self._twitch._user_id + # Defer topics via a task so we can proceed to the actual websocket connection + asyncio.create_task( + self.add_topics([ + get_topic("UserDrops", user_id, self.process_drops), + get_topic("UserCommunityPoints", user_id, self.process_points), + ]) + ) + # Connect/Reconnect loop + async for websocket in websocket_connect(WEBSOCKET_URL, ssl=True, ping_interval=None): + websocket.BACKOFF_MAX = 3 * 60 # type: ignore # 3 minutes + self._ws = websocket + self.reconnect.clear() + self.connected.set() + logger.info("Websocket Connected") + self._ping_task = asyncio.create_task(self._ping_loop()) + try: + while not self.reconnect.is_set(): + # Process receive + try: + # Wait up to 0.5s for a message we're supposed to receive + raw_message = await asyncio.wait_for(websocket.recv(), timeout=0.5) + except asyncio.TimeoutError: + # nothing - skip handling + pass + else: + # we've got something to process + message = json.loads(raw_message) + logger.debug(f"Websocket received: {message}") + msg_type = message["type"] + # handle the simple PING case + if msg_type == "PONG": + ping_future = self._recv_dict.pop("PING", None) + if ping_future is not None and not ping_future.done(): + ping_future.set_result(message) + elif msg_type == "RESPONSE": + self._recv_dict.pop(message["nonce"]).set_result(message) + elif msg_type == "RECONNECT": + # We've received a reconnect request + logger.warning("Received a Websocket Reconnect Request") + self.reconnect.set() + elif msg_type == "MESSAGE": + # request the assigned topic to process the response + target_topic = message["data"]["topic"] + for topic in self._topics: + if target_topic == topic: + await topic.process(json.loads(message["data"]["message"])) + break + else: + logger.error(f"Received unknown websocket payload: {message}") + + # Early exit if needed + if self.reconnect.is_set(): + break + + # Process send + while not self._send_queue.empty(): + nonce, message = self._send_queue.get_nowait() + if nonce != "PING": + message["nonce"] = nonce + await websocket.send(json.dumps(message, separators=(',', ':'))) + logger.debug(f"Websocket sent: {message}") + # A reconnect was requested + continue + except ConnectionClosed as exc: + self.connected.clear() + if self._ping_task is not None: + self._ping_task.cancel() + self._ping_task = None + if isinstance(exc, ConnectionClosedOK): + if exc.rcvd_then_sent: + # server closed the connection, not us - reconnect + logger.warning("Websocket Disconnected - Reconnecting") + continue + # we closed it - exit + break + # otherwise, reconnect + logger.warning("Websocket Closed - Reconnecting") + continue + except Exception: + logger.exception("Exception in Websocket - Reconnecting") + continue + + async def close(self): + self.connected.clear() + if self._ping_task is not None: + self._ping_task.cancel() + if self._ws is not None: + await self._ws.close() + + async def _ping_loop(self): + await self.connected.wait() + ping_every = PING_INTERVAL.total_seconds() + while self.connected.is_set(): + try: + await asyncio.wait_for(self.send({"type": "PING"}), timeout=10) + except asyncio.TimeoutError: + # per documentation, if there's no response for a PING, reconnect to the websocket + logger.warning("Websocket got no response to PING - reconnect") + self.reconnect.set() + break + await asyncio.sleep(ping_every) + + def create_nonce(self, length: int = 30) -> str: + available_chars = string.ascii_letters + string.digits + return ''.join(random.choices(available_chars, k=length)) + + def send(self, message: Dict[str, Any]) -> asyncio.Future[Dict[str, Any]]: + logger.debug(f"Websocket sending: {message}") + msg_type = message["type"] + if msg_type == "PING": + nonce = "PING" + else: + nonce = self.create_nonce() + self._send_queue.put_nowait((nonce, message)) + future: asyncio.Future[Dict[str, Any]] = asyncio.get_running_loop().create_future() + self._recv_dict[nonce] = future + return future + + async def add_topics(self, topics: Iterable[WebsocketTopic]): + # ensure no topics end up duplicated + topics = set(topics) + topics.difference_update(self._topics) + if not topics: + # none left to add + return + self._topics.update(topics) + if len(self._topics) >= 50: + # TODO: Handle multiple connections (up to 10) since one allows only up to 50 topics + raise MinerException("Too many topics") + await self.connected.wait() + assert self._twitch._access_token is not None + auth_token = self._twitch._access_token + topics_list = list(map(str, topics)) + logger.info(f"Listening for: {', '.join(topics_list)}") + await self.send( + { + "type": "LISTEN", + "data": { + "topics": topics_list, + "auth_token": auth_token, + } + } + ) + + async def process_drops(self, message: Dict[str, Any]): + drop_id = message["data"]["drop_id"] + drop: Optional[TimedDrop] = None + for campaign in self._twitch.inventory: + drop = campaign.get_drop(drop_id) + if drop is not None: + break + else: + logger.error(f"Drop with ID of {drop_id} not found!") + return + drop.update(message) + msg_type = message["type"] + if msg_type == "drop-progress": + print( + f"Drop progress: {drop.progress:4.0%} ({drop.remaining_minutes} minutes remaining)" + ) + elif msg_type == "drop-claim": + await drop.claim() + print(f"Claimed drop: {', '.join(drop.rewards)}") + + async def process_points(self, message: Dict[str, Any]): + msg_type = message["type"] + if msg_type == "points-earned": + points = message["data"]["point_gain"]["total_points"] + balance = message["data"]["balance"]["balance"] + print(f"Earned points for watching: {points:3}, total: {balance}") + elif msg_type == "claim-available": + claim_data = message["data"]["claim"] + points = claim_data["point_gain"]["total_points"] + await self._twitch.claim_points(claim_data["channel_id"], claim_data["id"]) + print(f"Claimed bonus points: {points}")