diff --git a/channel.py b/channel.py index 3672bed..81e9a08 100644 --- a/channel.py +++ b/channel.py @@ -4,13 +4,15 @@ import re import json import asyncio import logging +from base64 import b64encode +from functools import cached_property from typing import Any, SupportsInt, cast, TYPE_CHECKING import aiohttp from yarl import URL -from utils import Game -from exceptions import MinerException +from utils import Game, json_minify +from exceptions import MinerException, RequestException from constants import CALL, GQL_OPERATIONS, ONLINE_DELAY, URLType if TYPE_CHECKING: @@ -23,10 +25,6 @@ logger = logging.getLogger("TwitchDrops") class Stream: - __slots__ = ( - "channel", "broadcast_id", "viewers", "drops_enabled", "game", "title", "_stream_url" - ) - def __init__( self, channel: Channel, @@ -44,6 +42,27 @@ class Stream: self.title: str = title self._stream_url: URLType | None = None + @cached_property + def _spade_payload(self) -> JsonType: + payload = [ + { + "event": "minute-watched", + "properties": { + "broadcast_id": str(self.broadcast_id), + "channel_id": str(self.channel.id), + "channel": self.channel._login, + "hidden": False, + "live": True, + "location": "channel", + "logged_in": True, + "muted": False, + "player": "site", + "user_id": self.channel._twitch._auth_state.user_id, + } + } + ] + return {"data": (b64encode(json_minify(payload).encode("utf8"))).decode("utf8")} + @classmethod def from_get_stream(cls, channel: Channel, channel_data: JsonType) -> Stream: stream = channel_data["stream"] @@ -119,6 +138,11 @@ class Stream: class Channel: + __slots__ = ( + "_twitch", "_gui_channels", "id", "_login", "_display_name", "_spade_url", + "_stream", "_pending_stream_up", "acl_based" + ) + def __init__( self, twitch: Twitch, @@ -133,6 +157,7 @@ class Channel: self.id: int = int(id) self._login: str = login self._display_name: str | None = display_name + self._spade_url: URLType | None = None self._stream: Stream | None = None self._pending_stream_up: asyncio.Task[Any] | None = None # ACL-based channels are: @@ -253,6 +278,34 @@ class Channel: self._pending_stream_up = None self._gui_channels.remove(self) + async def get_spade_url(self) -> URLType: + """ + To get this monstrous thing, you have to walk a chain of requests. + Streamer page (HTML) --parse-> Streamer Settings (JavaScript) --parse-> Spade URL + + For mobile view, spade_url is available immediately from the page, skipping step #2. + """ + SETTINGS_PATTERN: str = ( + r'src="(https://[\w.]+/config/settings\.[0-9a-f]{32}\.js)"' + ) + SPADE_PATTERN: str = ( + r'"spade_?url": ?"(https://video-edge-[.\w\-/]+\.ts(?:\?allow_stream=true)?)"' + ) + async with self._twitch.request("GET", self.url) as response1: + streamer_html: str = await response1.text(encoding="utf8") + match = re.search(SPADE_PATTERN, streamer_html, re.I) + if not match: + match = re.search(SETTINGS_PATTERN, streamer_html, re.I) + if not match: + raise MinerException("Error while spade_url extraction: step #1") + streamer_settings = match.group(1) + async with self._twitch.request("GET", streamer_settings) as response2: + settings_js: str = await response2.text(encoding="utf8") + match = re.search(SPADE_PATTERN, settings_js, re.I) + if not match: + raise MinerException("Error while spade_url extraction: step #2") + return URLType(match.group(1)) + def external_update(self, channel_data: JsonType, available_drops: list[JsonType]): """ Update stream information based on data provided externally. @@ -354,7 +407,8 @@ class Channel: if needs_display: self.display() - async def send_watch(self) -> bool: + # NOTE: This is currently unused. + async def _send_watch(self) -> bool: """ This performs a HEAD request on the stream's current playlist, to simulate watching the stream. @@ -405,3 +459,16 @@ class Channel: # without downloading the actual stream data async with self._twitch.request("HEAD", stream_chunk_url) as head_response: return head_response.status == 200 + + async def send_watch(self) -> bool: + if self._stream is None: + return False + if self._spade_url is None: + self._spade_url = await self.get_spade_url() + try: + async with self._twitch.request( + "POST", self._spade_url, data=self._stream._spade_payload + ) as response: + return response.status == 204 + except RequestException: + return False diff --git a/constants.py b/constants.py index 080f8f3..74d8435 100644 --- a/constants.py +++ b/constants.py @@ -125,7 +125,7 @@ DEFAULT_LANG = "English" PING_INTERVAL = timedelta(minutes=3) PING_TIMEOUT = timedelta(seconds=10) ONLINE_DELAY = timedelta(seconds=120) -WATCH_INTERVAL = timedelta(seconds=20) +WATCH_INTERVAL = timedelta(seconds=59) # Strings WINDOW_TITLE = f"Twitch Drops Miner v{__version__} (by DevilXD)" # Logging diff --git a/gui.py b/gui.py index 264954e..d310ec4 100644 --- a/gui.py +++ b/gui.py @@ -621,6 +621,7 @@ class _ProgressVars(TypedDict): class CampaignProgress: BAR_LENGTH = 420 + ALMOST_DONE_SECONDS = 10 def __init__(self, manager: GUIManager, master: ttk.Widget): self._manager = manager @@ -685,17 +686,19 @@ class CampaignProgress: variable=self._vars["drop"]["progress"], ).grid(column=0, row=10, columnspan=2) self._drop: TimedDrop | None = None + self._seconds: int = 0 self._timer_task: asyncio.Task[None] | None = None self.display(None) - @staticmethod - def _divmod(minutes: int, seconds: int) -> tuple[int, int]: - if seconds < 60 and minutes > 0: + def _divmod(self, minutes: int) -> tuple[int, int]: + if self._seconds < 60 and minutes > 0: minutes -= 1 hours, minutes = divmod(minutes, 60) return (hours, minutes) - def _update_time(self, seconds: int): + def _update_time(self, seconds: int | None = None): + if seconds is not None: + self._seconds = seconds drop = self._drop if drop is not None: drop_minutes = drop.remaining_minutes @@ -705,23 +708,22 @@ class CampaignProgress: campaign_minutes = 0 drop_vars: _DropVars = self._vars["drop"] campaign_vars: _CampaignVars = self._vars["campaign"] - dseconds = seconds % 60 - hours, minutes = self._divmod(drop_minutes, seconds) + dseconds = self._seconds % 60 + hours, minutes = self._divmod(drop_minutes) drop_vars["remaining"].set( _("gui", "progress", "remaining").format(time=f"{hours:>2}:{minutes:02}:{dseconds:02}") ) - hours, minutes = self._divmod(campaign_minutes, seconds) + hours, minutes = self._divmod(campaign_minutes) campaign_vars["remaining"].set( _("gui", "progress", "remaining").format(time=f"{hours:>2}:{minutes:02}:{dseconds:02}") ) async def _timer_loop(self): - seconds = 60 - self._update_time(seconds) - while seconds > 0: + self._update_time(60) + while self._seconds > 0: await asyncio.sleep(1) - seconds -= 1 - self._update_time(seconds) + self._seconds -= 1 + self._update_time() self._timer_task = None def start_timer(self): @@ -739,8 +741,9 @@ class CampaignProgress: self._timer_task.cancel() self._timer_task = None - def is_counting(self) -> bool: - return self._timer_task is not None + def minute_almost_done(self) -> bool: + # already or almost done + return self._timer_task is None or self._seconds <= self.ALMOST_DONE_SECONDS def display(self, drop: TimedDrop | None, *, countdown: bool = True, subone: bool = False): self._drop = drop diff --git a/twitch.py b/twitch.py index 07f908b..174fc0a 100644 --- a/twitch.py +++ b/twitch.py @@ -885,13 +885,17 @@ class Twitch: # if the channel isn't online anymore, we stop watching it self.stop_watching() continue + # logger.log(CALL, f"Sending watch payload to: {channel.name}") succeeded: bool = await channel.send_watch() + last_sent: float = time() if not succeeded: logger.log(CALL, f"Watch requested failed for channel: {channel.name}") - elif not self.gui.progress.is_counting(): - # If the previous update was more than 60s ago, and the progress tracker + # wait ~20 seconds for a progress update + await asyncio.sleep(20) + if self.gui.progress.minute_almost_done(): + # If the previous update was more than ~60s ago, and the progress tracker # isn't counting down anymore, that means Twitch has temporarily - # stopped reporting drops progress. To ensure the timer keeps at least somewhat + # stopped reporting drop's progress. To ensure the timer keeps at least somewhat # accurate time, we can use GQL to query for the current drop, # or even "pretend" mining as a last resort option. handled: bool = False @@ -932,7 +936,7 @@ class Twitch: handled = True else: logger.log(CALL, "No active drop could be determined") - await self._watch_sleep(interval) + await self._watch_sleep(interval - min(time() - last_sent, interval)) @task_wrapper(critical=True) async def _maintenance_task(self) -> None: