Reimplement the old mining method (ref b47b208)

This commit is contained in:
DevilXD
2025-08-17 22:05:44 +02:00
parent 87a30201e4
commit 826fe95f9c
4 changed files with 100 additions and 26 deletions

View File

@@ -4,13 +4,15 @@ import re
import json
import asyncio
import logging
from base64 import b64encode
from functools import cached_property
from typing import Any, SupportsInt, cast, TYPE_CHECKING
import aiohttp
from yarl import URL
from utils import Game
from exceptions import MinerException
from utils import Game, json_minify
from exceptions import MinerException, RequestException
from constants import CALL, GQL_OPERATIONS, ONLINE_DELAY, URLType
if TYPE_CHECKING:
@@ -23,10 +25,6 @@ logger = logging.getLogger("TwitchDrops")
class Stream:
__slots__ = (
"channel", "broadcast_id", "viewers", "drops_enabled", "game", "title", "_stream_url"
)
def __init__(
self,
channel: Channel,
@@ -44,6 +42,27 @@ class Stream:
self.title: str = title
self._stream_url: URLType | None = None
@cached_property
def _spade_payload(self) -> JsonType:
payload = [
{
"event": "minute-watched",
"properties": {
"broadcast_id": str(self.broadcast_id),
"channel_id": str(self.channel.id),
"channel": self.channel._login,
"hidden": False,
"live": True,
"location": "channel",
"logged_in": True,
"muted": False,
"player": "site",
"user_id": self.channel._twitch._auth_state.user_id,
}
}
]
return {"data": (b64encode(json_minify(payload).encode("utf8"))).decode("utf8")}
@classmethod
def from_get_stream(cls, channel: Channel, channel_data: JsonType) -> Stream:
stream = channel_data["stream"]
@@ -119,6 +138,11 @@ class Stream:
class Channel:
__slots__ = (
"_twitch", "_gui_channels", "id", "_login", "_display_name", "_spade_url",
"_stream", "_pending_stream_up", "acl_based"
)
def __init__(
self,
twitch: Twitch,
@@ -133,6 +157,7 @@ class Channel:
self.id: int = int(id)
self._login: str = login
self._display_name: str | None = display_name
self._spade_url: URLType | None = None
self._stream: Stream | None = None
self._pending_stream_up: asyncio.Task[Any] | None = None
# ACL-based channels are:
@@ -253,6 +278,34 @@ class Channel:
self._pending_stream_up = None
self._gui_channels.remove(self)
async def get_spade_url(self) -> URLType:
"""
To get this monstrous thing, you have to walk a chain of requests.
Streamer page (HTML) --parse-> Streamer Settings (JavaScript) --parse-> Spade URL
For mobile view, spade_url is available immediately from the page, skipping step #2.
"""
SETTINGS_PATTERN: str = (
r'src="(https://[\w.]+/config/settings\.[0-9a-f]{32}\.js)"'
)
SPADE_PATTERN: str = (
r'"spade_?url": ?"(https://video-edge-[.\w\-/]+\.ts(?:\?allow_stream=true)?)"'
)
async with self._twitch.request("GET", self.url) as response1:
streamer_html: str = await response1.text(encoding="utf8")
match = re.search(SPADE_PATTERN, streamer_html, re.I)
if not match:
match = re.search(SETTINGS_PATTERN, streamer_html, re.I)
if not match:
raise MinerException("Error while spade_url extraction: step #1")
streamer_settings = match.group(1)
async with self._twitch.request("GET", streamer_settings) as response2:
settings_js: str = await response2.text(encoding="utf8")
match = re.search(SPADE_PATTERN, settings_js, re.I)
if not match:
raise MinerException("Error while spade_url extraction: step #2")
return URLType(match.group(1))
def external_update(self, channel_data: JsonType, available_drops: list[JsonType]):
"""
Update stream information based on data provided externally.
@@ -354,7 +407,8 @@ class Channel:
if needs_display:
self.display()
async def send_watch(self) -> bool:
# NOTE: This is currently unused.
async def _send_watch(self) -> bool:
"""
This performs a HEAD request on the stream's current playlist,
to simulate watching the stream.
@@ -405,3 +459,16 @@ class Channel:
# without downloading the actual stream data
async with self._twitch.request("HEAD", stream_chunk_url) as head_response:
return head_response.status == 200
async def send_watch(self) -> bool:
if self._stream is None:
return False
if self._spade_url is None:
self._spade_url = await self.get_spade_url()
try:
async with self._twitch.request(
"POST", self._spade_url, data=self._stream._spade_payload
) as response:
return response.status == 204
except RequestException:
return False

View File

@@ -125,7 +125,7 @@ DEFAULT_LANG = "English"
PING_INTERVAL = timedelta(minutes=3)
PING_TIMEOUT = timedelta(seconds=10)
ONLINE_DELAY = timedelta(seconds=120)
WATCH_INTERVAL = timedelta(seconds=20)
WATCH_INTERVAL = timedelta(seconds=59)
# Strings
WINDOW_TITLE = f"Twitch Drops Miner v{__version__} (by DevilXD)"
# Logging

31
gui.py
View File

@@ -621,6 +621,7 @@ class _ProgressVars(TypedDict):
class CampaignProgress:
BAR_LENGTH = 420
ALMOST_DONE_SECONDS = 10
def __init__(self, manager: GUIManager, master: ttk.Widget):
self._manager = manager
@@ -685,17 +686,19 @@ class CampaignProgress:
variable=self._vars["drop"]["progress"],
).grid(column=0, row=10, columnspan=2)
self._drop: TimedDrop | None = None
self._seconds: int = 0
self._timer_task: asyncio.Task[None] | None = None
self.display(None)
@staticmethod
def _divmod(minutes: int, seconds: int) -> tuple[int, int]:
if seconds < 60 and minutes > 0:
def _divmod(self, minutes: int) -> tuple[int, int]:
if self._seconds < 60 and minutes > 0:
minutes -= 1
hours, minutes = divmod(minutes, 60)
return (hours, minutes)
def _update_time(self, seconds: int):
def _update_time(self, seconds: int | None = None):
if seconds is not None:
self._seconds = seconds
drop = self._drop
if drop is not None:
drop_minutes = drop.remaining_minutes
@@ -705,23 +708,22 @@ class CampaignProgress:
campaign_minutes = 0
drop_vars: _DropVars = self._vars["drop"]
campaign_vars: _CampaignVars = self._vars["campaign"]
dseconds = seconds % 60
hours, minutes = self._divmod(drop_minutes, seconds)
dseconds = self._seconds % 60
hours, minutes = self._divmod(drop_minutes)
drop_vars["remaining"].set(
_("gui", "progress", "remaining").format(time=f"{hours:>2}:{minutes:02}:{dseconds:02}")
)
hours, minutes = self._divmod(campaign_minutes, seconds)
hours, minutes = self._divmod(campaign_minutes)
campaign_vars["remaining"].set(
_("gui", "progress", "remaining").format(time=f"{hours:>2}:{minutes:02}:{dseconds:02}")
)
async def _timer_loop(self):
seconds = 60
self._update_time(seconds)
while seconds > 0:
self._update_time(60)
while self._seconds > 0:
await asyncio.sleep(1)
seconds -= 1
self._update_time(seconds)
self._seconds -= 1
self._update_time()
self._timer_task = None
def start_timer(self):
@@ -739,8 +741,9 @@ class CampaignProgress:
self._timer_task.cancel()
self._timer_task = None
def is_counting(self) -> bool:
return self._timer_task is not None
def minute_almost_done(self) -> bool:
# already or almost done
return self._timer_task is None or self._seconds <= self.ALMOST_DONE_SECONDS
def display(self, drop: TimedDrop | None, *, countdown: bool = True, subone: bool = False):
self._drop = drop

View File

@@ -885,13 +885,17 @@ class Twitch:
# if the channel isn't online anymore, we stop watching it
self.stop_watching()
continue
# logger.log(CALL, f"Sending watch payload to: {channel.name}")
succeeded: bool = await channel.send_watch()
last_sent: float = time()
if not succeeded:
logger.log(CALL, f"Watch requested failed for channel: {channel.name}")
elif not self.gui.progress.is_counting():
# If the previous update was more than 60s ago, and the progress tracker
# wait ~20 seconds for a progress update
await asyncio.sleep(20)
if self.gui.progress.minute_almost_done():
# If the previous update was more than ~60s ago, and the progress tracker
# isn't counting down anymore, that means Twitch has temporarily
# stopped reporting drops progress. To ensure the timer keeps at least somewhat
# stopped reporting drop's progress. To ensure the timer keeps at least somewhat
# accurate time, we can use GQL to query for the current drop,
# or even "pretend" mining as a last resort option.
handled: bool = False
@@ -932,7 +936,7 @@ class Twitch:
handled = True
else:
logger.log(CALL, "No active drop could be determined")
await self._watch_sleep(interval)
await self._watch_sleep(interval - min(time() - last_sent, interval))
@task_wrapper(critical=True)
async def _maintenance_task(self) -> None: