diff --git a/channel.py b/channel.py index 0550aa1..3ecce8b 100644 --- a/channel.py +++ b/channel.py @@ -1,14 +1,11 @@ from __future__ import annotations -import re import asyncio import logging -from base64 import b64encode -from functools import cached_property from typing import Any, SupportsInt, TYPE_CHECKING -from utils import invalidate_cache, json_minify, Game -from exceptions import MinerException, RequestException +from utils import Game +from exceptions import MinerException from constants import CALL, GQL_OPERATIONS, ONLINE_DELAY, URLType if TYPE_CHECKING: @@ -21,7 +18,9 @@ logger = logging.getLogger("TwitchDrops") class Stream: - __slots__ = ("channel", "broadcast_id", "viewers", "drops_enabled", "game", "title") + __slots__ = ( + "channel", "broadcast_id", "viewers", "drops_enabled", "game", "title", "_stream_url" + ) def __init__( self, @@ -38,6 +37,7 @@ class Stream: self.drops_enabled: bool = False self.game: Game | None = Game(game) if game else None self.title: str = title + self._stream_url: URLType | None = None @classmethod def from_get_stream(cls, channel: Channel, data: JsonType) -> Stream: @@ -70,6 +70,29 @@ class Stream: return self.broadcast_id == other.broadcast_id return NotImplemented + async def get_stream_url(self) -> URLType: + if self._stream_url is not None: + return self._stream_url + # get the stream playback access token from GQL + playback_token_response: JsonType = await self.channel._twitch.gql_request( + GQL_OPERATIONS["PlaybackAccessToken"].with_variables({"login": self.channel._login}) + ) + token_data: JsonType = playback_token_response["data"]["streamPlaybackAccessToken"] + token_value = token_data["value"] + token_signature = token_data["signature"] + # using the token, query Twitch for a list of all available stream qualities + async with self.channel._twitch.request( + "GET", + URLType( + "https://usher.ttvnw.net/api/channel/hls/" + f"{self.channel._login}.m3u8?sig={token_signature}&token={token_value}" + ), + ) as qualities_response: + available_qualities: str = await qualities_response.text() + # pick the last URL from the list, usually with the lowest quality stream + self._stream_url = URLType(available_qualities.strip().split("\n")[-1]) + return self._stream_url + class Channel: def __init__( @@ -86,7 +109,6 @@ class Channel: self.id: int = int(id) self._login: str = login self._display_name: str | None = display_name - self._spade_url: URLType | None = None self.points: int | None = None self._stream: Stream | None = None self._pending_stream_up: asyncio.Task[Any] | None = None @@ -215,34 +237,6 @@ class Channel: self._pending_stream_up = None self._gui_channels.remove(self) - async def get_spade_url(self) -> URLType: - """ - To get this monstrous thing, you have to walk a chain of requests. - Streamer page (HTML) --parse-> Streamer Settings (JavaScript) --parse-> Spade URL - - For mobile view, spade_url is available immediately from the page, skipping step #2. - """ - SETTINGS_PATTERN: str = ( - r'src="(https://[\w.]+/config/settings\.[0-9a-f]{32}\.js)"' - ) - SPADE_PATTERN: str = ( - r'"spade_?url": ?"(https://video-edge-[.\w\-/]+\.ts(?:\?allow_stream=true)?)"' - ) - async with self._twitch.request("GET", self.url) as response1: - streamer_html: str = await response1.text(encoding="utf8") - match = re.search(SPADE_PATTERN, streamer_html, re.I) - if not match: - match = re.search(SETTINGS_PATTERN, streamer_html, re.I) - if not match: - raise MinerException("Error while spade_url extraction: step #1") - streamer_settings = match.group(1) - async with self._twitch.request("GET", streamer_settings) as response2: - settings_js: str = await response2.text(encoding="utf8") - match = re.search(SPADE_PATTERN, settings_js, re.I) - if not match: - raise MinerException("Error while spade_url extraction: step #2") - return URLType(match.group(1)) - async def get_stream(self) -> Stream | None: try: response: JsonType = await self._twitch.gql_request( @@ -283,7 +277,6 @@ class Channel: """ old_stream = self._stream self._stream = await self.get_stream() - invalidate_cache(self, "_payload") if trigger_events: self._twitch.on_channel_update(self, old_stream, self._stream) return self._stream is not None @@ -328,7 +321,6 @@ class Channel: if self.online: old_stream = self._stream self._stream = None - invalidate_cache(self, "_payload") self._twitch.on_channel_update(self, old_stream, self._stream) needs_display = False if needs_display: @@ -354,42 +346,26 @@ class Channel: # so if we're not calling it, we need to do it ourselves self.display() - @cached_property - def _payload(self) -> JsonType: - assert self._stream is not None - payload = [ - { - "event": "minute-watched", - "properties": { - "broadcast_id": str(self._stream.broadcast_id), - "channel_id": str(self.id), - "channel": self._login, - "hidden": False, - "live": True, - "location": "channel", - "logged_in": True, - "muted": False, - "player": "site", - "user_id": self._twitch._auth_state.user_id, - } - } - ] - return {"data": (b64encode(json_minify(payload).encode("utf8"))).decode("utf8")} - async def send_watch(self) -> bool: """ This uses the encoded payload on spade url to simulate watching the stream. Optimally, send every 60 seconds to advance drops. """ - if not self.online: - return False - if self._spade_url is None: - self._spade_url = await self.get_spade_url() - logger.debug(f"Sending minute-watched to {self.name}") - try: - async with self._twitch.request( - "POST", self._spade_url, data=self._payload - ) as response: - return response.status == 204 - except RequestException: + if self._stream is None: return False + # get the stream url + stream_url = await self._stream.get_stream_url() + # fetch a list of chunks available to download for the stream + # NOTE: the CDN is configured to forcibly disconnect shortly after serving the list, + # if we don't do it yourselves. Lets help it by actually doing it ourselves instead. + async with self._twitch.request( + "GET", stream_url, headers={"Connection": "close"} + ) as chunks_response: + available_chunks: str = await chunks_response.text() + # the list contains ~10-13 chunks of the stream at 2s intervals, + # let's pick the last chunk URL available + stream_chunk_url: URLType = URLType(available_chunks.strip().split("\n")[-1]) + # sending a HEAD request is enough to advance the drops, + # without downloading the actual stream data + async with self._twitch.request("HEAD", stream_chunk_url) as head_response: + return head_response.status == 200 diff --git a/constants.py b/constants.py index bad2318..d57cc81 100644 --- a/constants.py +++ b/constants.py @@ -116,7 +116,7 @@ DEFAULT_LANG = "English" PING_INTERVAL = timedelta(minutes=3) PING_TIMEOUT = timedelta(seconds=10) ONLINE_DELAY = timedelta(seconds=120) -WATCH_INTERVAL = timedelta(seconds=59) +WATCH_INTERVAL = timedelta(seconds=20) # Strings WINDOW_TITLE = f"Twitch Drops Miner v{__version__} (by DevilXD)" # Logging @@ -310,6 +310,18 @@ GQL_OPERATIONS: dict[str, GQLOperation] = { "channelID": ..., # channel ID as a str }, ), + # retuns stream playback access token + "PlaybackAccessToken": GQLOperation( + "PlaybackAccessToken", + "3093517e37e4f4cb48906155bcd894150aef92617939236d2508f3375ab732ce", + variables={ + "isLive": True, + "login": ..., # channel login + "isVod": False, + "vodID": "", + "playerType": "site" + }, + ), # returns live channels for a particular game "GameDirectory": GQLOperation( "DirectoryPage_Game", diff --git a/gui.py b/gui.py index b7837f0..5ae0dc4 100644 --- a/gui.py +++ b/gui.py @@ -700,6 +700,9 @@ class CampaignProgress: self._timer_task.cancel() self._timer_task = None + def is_counting(self) -> bool: + return self._timer_task is not None + def display(self, drop: TimedDrop | None, *, countdown: bool = True, subone: bool = False): self._drop = drop vars_drop = self._vars["drop"] diff --git a/inventory.py b/inventory.py index 0cb2dd4..03639aa 100644 --- a/inventory.py +++ b/inventory.py @@ -1,13 +1,14 @@ from __future__ import annotations import re +import logging from itertools import chain from typing import TYPE_CHECKING from functools import cached_property from datetime import datetime, timedelta, timezone from channel import Channel -from constants import GQL_OPERATIONS, URLType +from constants import CALL, GQL_OPERATIONS, URLType from utils import timestamp, invalidate_cache, Game if TYPE_CHECKING: @@ -18,6 +19,7 @@ if TYPE_CHECKING: from gui import GUIManager, InventoryOverview +logger = logging.getLogger("TwitchDrops") DIMS_PATTERN = re.compile(r'-\d+x\d+(?=\.(?:jpg|png|gif)$)', re.I) @@ -242,14 +244,26 @@ class TimedDrop(BaseDrop): def update_minutes(self, minutes: int): self.current_minutes = minutes self._on_minutes_changed() + self.display() def display(self, *, countdown: bool = True, subone: bool = False): self._manager.display_drop(self, countdown=countdown, subone=subone) def bump_minutes(self): + # this may get called more often than once every minute + # we can detect this by checking if the GUI progress display is currently counting down + # if we haven't finished counting down the last minute, then we can ignore this call + if self._manager.progress.is_counting(): + return if self.current_minutes < self.required_minutes: self.current_minutes += 1 self._on_minutes_changed() + drop_text = ( + f"{self.name} ({self.campaign.game}, " + f"{self.current_minutes}/{self.required_minutes})" + ) + logger.log(CALL, f"Drop progress from active search: {drop_text}") + self.display() class DropsCampaign: diff --git a/twitch.py b/twitch.py index 3529c4f..f559a4a 100644 --- a/twitch.py +++ b/twitch.py @@ -430,7 +430,6 @@ class Twitch: self.watching_channel: AwaitableValue[Channel] = AwaitableValue() self._watching_task: asyncio.Task[None] | None = None self._watching_restart = asyncio.Event() - self._drop_update: asyncio.Future[bool] | None = None # Websocket self.websocket = WebsocketPool(self) # Maintenance task @@ -487,7 +486,6 @@ class Twitch: cookie_jar.save(COOKIES_PATH) await self._session.close() self._session = None - self._drop_update = None self._drops.clear() self.channels.clear() self.inventory.clear() @@ -843,72 +841,20 @@ class Twitch: channel: Channel = await self.watching_channel.get() succeeded: bool = await channel.send_watch() if not succeeded: - # this usually means the campaign expired in the middle of mining - # NOTE: the maintenance task should switch the channel right after this happens - await self._watch_sleep(60) + logger.log(CALL, f"Watch requested failed for channel: {channel.name}") + await self._watch_sleep(interval) continue - last_watch = time() - self._drop_update = asyncio.Future() - use_active: bool = False - try: - handled: bool = await asyncio.wait_for(self._drop_update, timeout=10) - except asyncio.TimeoutError: - # there was no websocket update within 10s - handled = False - use_active = True - logger.log(CALL, "No drop update from the websocket received") - self._drop_update = None - if not handled: - # websocket update timed out, or the update was for an unrelated drop - if not use_active: - # we need to use GQL to get the current progress - context = await self.gql_request(GQL_OPERATIONS["CurrentDrop"]) - drop_data: JsonType | None = ( - context["data"]["currentUser"]["dropCurrentSession"] - ) - if drop_data is not None: - drop = self._drops.get(drop_data["dropID"]) - if drop is None: - use_active = True - # usually this means there was a campaign changed between reloads - logger.info("Missing drop detected, reloading...") - self.change_state(State.INVENTORY_FETCH) - elif not drop.can_earn(channel): - # we can't earn this drop in the current watching channel - use_active = True - drop_text = ( - f"{drop.name} ({drop.campaign.game}, " - f"{drop.current_minutes}/{drop.required_minutes})" - ) - logger.log(CALL, f"Current drop returned mismach: {drop_text}") - else: - drop.update_minutes(drop_data["currentMinutesWatched"]) - drop.display() - drop_text = ( - f"{drop.name} ({drop.campaign.game}, " - f"{drop.current_minutes}/{drop.required_minutes})" - ) - logger.log(CALL, f"Drop progress from GQL: {drop_text}") - else: - use_active = True - logger.log(CALL, "Current drop returned as none") - if use_active: - # Sometimes, even GQL fails to give us the correct drop. - # In that case, we can use the locally cached inventory to try - # and put together the drop that we're actually mining right now - # NOTE: get_active_drop uses the watching channel by default, - # so there's no point to pass it here - if (drop := self.get_active_drop()) is not None: - drop.bump_minutes() - drop.display() - drop_text = ( - f"{drop.name} ({drop.campaign.game}, " - f"{drop.current_minutes}/{drop.required_minutes})" - ) - logger.log(CALL, f"Drop progress from active search: {drop_text}") - else: - logger.log(CALL, "No active drop could be determined") - await self._watch_sleep(last_watch + interval - time()) + # Drop progress isn't always returned by the websocket, but we can "pretend" + # the progress is constantly advancing, by simply incrementing the minutes + # watched ourselves. Once the websocket "wakes up" to return proper progress, + # it'll update the display automatically. + # NOTE: get_active_drop uses the watching channel by default, + # so there's no point to pass it here + if (drop := self.get_active_drop()) is not None: + drop.bump_minutes() + else: + logger.log(CALL, "No active drop could be determined") + await self._watch_sleep(interval) @task_wrapper async def _maintenance_task(self) -> None: @@ -1179,22 +1125,9 @@ class Twitch: else: drop_text = "" logger.log(CALL, f"Drop update from websocket: {drop_text}") - if self._drop_update is None: - # we aren't actually waiting for a progress update right now, so we can just - # ignore the event this time - return - elif drop is not None and drop.can_earn(self.watching_channel.get_with_default(None)): + if drop is not None and drop.can_earn(self.watching_channel.get_with_default(None)): # the received payload is for the drop we expected drop.update_minutes(message["data"]["current_progress_min"]) - drop.display() - # Let the watch loop know we've handled it here - self._drop_update.set_result(True) - else: - # Sometimes, the drop update we receive doesn't actually match what we're mining. - # This is a Twitch bug workaround: signal the watch loop to use GQL - # to get the current drop progress instead. - self._drop_update.set_result(False) - self._drop_update = None @task_wrapper async def process_notifications(self, user_id: int, message: JsonType):