Implement channel watching using the new method

This commit is contained in:
DevilXD
2024-06-22 18:16:14 +02:00
parent c7f8d56946
commit b47b2089c0
5 changed files with 91 additions and 153 deletions

View File

@@ -1,14 +1,11 @@
from __future__ import annotations
import re
import asyncio
import logging
from base64 import b64encode
from functools import cached_property
from typing import Any, SupportsInt, TYPE_CHECKING
from utils import invalidate_cache, json_minify, Game
from exceptions import MinerException, RequestException
from utils import Game
from exceptions import MinerException
from constants import CALL, GQL_OPERATIONS, ONLINE_DELAY, URLType
if TYPE_CHECKING:
@@ -21,7 +18,9 @@ logger = logging.getLogger("TwitchDrops")
class Stream:
__slots__ = ("channel", "broadcast_id", "viewers", "drops_enabled", "game", "title")
__slots__ = (
"channel", "broadcast_id", "viewers", "drops_enabled", "game", "title", "_stream_url"
)
def __init__(
self,
@@ -38,6 +37,7 @@ class Stream:
self.drops_enabled: bool = False
self.game: Game | None = Game(game) if game else None
self.title: str = title
self._stream_url: URLType | None = None
@classmethod
def from_get_stream(cls, channel: Channel, data: JsonType) -> Stream:
@@ -70,6 +70,29 @@ class Stream:
return self.broadcast_id == other.broadcast_id
return NotImplemented
async def get_stream_url(self) -> URLType:
if self._stream_url is not None:
return self._stream_url
# get the stream playback access token from GQL
playback_token_response: JsonType = await self.channel._twitch.gql_request(
GQL_OPERATIONS["PlaybackAccessToken"].with_variables({"login": self.channel._login})
)
token_data: JsonType = playback_token_response["data"]["streamPlaybackAccessToken"]
token_value = token_data["value"]
token_signature = token_data["signature"]
# using the token, query Twitch for a list of all available stream qualities
async with self.channel._twitch.request(
"GET",
URLType(
"https://usher.ttvnw.net/api/channel/hls/"
f"{self.channel._login}.m3u8?sig={token_signature}&token={token_value}"
),
) as qualities_response:
available_qualities: str = await qualities_response.text()
# pick the last URL from the list, usually with the lowest quality stream
self._stream_url = URLType(available_qualities.strip().split("\n")[-1])
return self._stream_url
class Channel:
def __init__(
@@ -86,7 +109,6 @@ class Channel:
self.id: int = int(id)
self._login: str = login
self._display_name: str | None = display_name
self._spade_url: URLType | None = None
self.points: int | None = None
self._stream: Stream | None = None
self._pending_stream_up: asyncio.Task[Any] | None = None
@@ -215,34 +237,6 @@ class Channel:
self._pending_stream_up = None
self._gui_channels.remove(self)
async def get_spade_url(self) -> URLType:
"""
To get this monstrous thing, you have to walk a chain of requests.
Streamer page (HTML) --parse-> Streamer Settings (JavaScript) --parse-> Spade URL
For mobile view, spade_url is available immediately from the page, skipping step #2.
"""
SETTINGS_PATTERN: str = (
r'src="(https://[\w.]+/config/settings\.[0-9a-f]{32}\.js)"'
)
SPADE_PATTERN: str = (
r'"spade_?url": ?"(https://video-edge-[.\w\-/]+\.ts(?:\?allow_stream=true)?)"'
)
async with self._twitch.request("GET", self.url) as response1:
streamer_html: str = await response1.text(encoding="utf8")
match = re.search(SPADE_PATTERN, streamer_html, re.I)
if not match:
match = re.search(SETTINGS_PATTERN, streamer_html, re.I)
if not match:
raise MinerException("Error while spade_url extraction: step #1")
streamer_settings = match.group(1)
async with self._twitch.request("GET", streamer_settings) as response2:
settings_js: str = await response2.text(encoding="utf8")
match = re.search(SPADE_PATTERN, settings_js, re.I)
if not match:
raise MinerException("Error while spade_url extraction: step #2")
return URLType(match.group(1))
async def get_stream(self) -> Stream | None:
try:
response: JsonType = await self._twitch.gql_request(
@@ -283,7 +277,6 @@ class Channel:
"""
old_stream = self._stream
self._stream = await self.get_stream()
invalidate_cache(self, "_payload")
if trigger_events:
self._twitch.on_channel_update(self, old_stream, self._stream)
return self._stream is not None
@@ -328,7 +321,6 @@ class Channel:
if self.online:
old_stream = self._stream
self._stream = None
invalidate_cache(self, "_payload")
self._twitch.on_channel_update(self, old_stream, self._stream)
needs_display = False
if needs_display:
@@ -354,42 +346,26 @@ class Channel:
# so if we're not calling it, we need to do it ourselves
self.display()
@cached_property
def _payload(self) -> JsonType:
assert self._stream is not None
payload = [
{
"event": "minute-watched",
"properties": {
"broadcast_id": str(self._stream.broadcast_id),
"channel_id": str(self.id),
"channel": self._login,
"hidden": False,
"live": True,
"location": "channel",
"logged_in": True,
"muted": False,
"player": "site",
"user_id": self._twitch._auth_state.user_id,
}
}
]
return {"data": (b64encode(json_minify(payload).encode("utf8"))).decode("utf8")}
async def send_watch(self) -> bool:
"""
This uses the encoded payload on spade url to simulate watching the stream.
Optimally, send every 60 seconds to advance drops.
"""
if not self.online:
return False
if self._spade_url is None:
self._spade_url = await self.get_spade_url()
logger.debug(f"Sending minute-watched to {self.name}")
try:
async with self._twitch.request(
"POST", self._spade_url, data=self._payload
) as response:
return response.status == 204
except RequestException:
if self._stream is None:
return False
# get the stream url
stream_url = await self._stream.get_stream_url()
# fetch a list of chunks available to download for the stream
# NOTE: the CDN is configured to forcibly disconnect shortly after serving the list,
# if we don't do it yourselves. Lets help it by actually doing it ourselves instead.
async with self._twitch.request(
"GET", stream_url, headers={"Connection": "close"}
) as chunks_response:
available_chunks: str = await chunks_response.text()
# the list contains ~10-13 chunks of the stream at 2s intervals,
# let's pick the last chunk URL available
stream_chunk_url: URLType = URLType(available_chunks.strip().split("\n")[-1])
# sending a HEAD request is enough to advance the drops,
# without downloading the actual stream data
async with self._twitch.request("HEAD", stream_chunk_url) as head_response:
return head_response.status == 200

View File

@@ -116,7 +116,7 @@ DEFAULT_LANG = "English"
PING_INTERVAL = timedelta(minutes=3)
PING_TIMEOUT = timedelta(seconds=10)
ONLINE_DELAY = timedelta(seconds=120)
WATCH_INTERVAL = timedelta(seconds=59)
WATCH_INTERVAL = timedelta(seconds=20)
# Strings
WINDOW_TITLE = f"Twitch Drops Miner v{__version__} (by DevilXD)"
# Logging
@@ -310,6 +310,18 @@ GQL_OPERATIONS: dict[str, GQLOperation] = {
"channelID": ..., # channel ID as a str
},
),
# retuns stream playback access token
"PlaybackAccessToken": GQLOperation(
"PlaybackAccessToken",
"3093517e37e4f4cb48906155bcd894150aef92617939236d2508f3375ab732ce",
variables={
"isLive": True,
"login": ..., # channel login
"isVod": False,
"vodID": "",
"playerType": "site"
},
),
# returns live channels for a particular game
"GameDirectory": GQLOperation(
"DirectoryPage_Game",

3
gui.py
View File

@@ -700,6 +700,9 @@ class CampaignProgress:
self._timer_task.cancel()
self._timer_task = None
def is_counting(self) -> bool:
return self._timer_task is not None
def display(self, drop: TimedDrop | None, *, countdown: bool = True, subone: bool = False):
self._drop = drop
vars_drop = self._vars["drop"]

View File

@@ -1,13 +1,14 @@
from __future__ import annotations
import re
import logging
from itertools import chain
from typing import TYPE_CHECKING
from functools import cached_property
from datetime import datetime, timedelta, timezone
from channel import Channel
from constants import GQL_OPERATIONS, URLType
from constants import CALL, GQL_OPERATIONS, URLType
from utils import timestamp, invalidate_cache, Game
if TYPE_CHECKING:
@@ -18,6 +19,7 @@ if TYPE_CHECKING:
from gui import GUIManager, InventoryOverview
logger = logging.getLogger("TwitchDrops")
DIMS_PATTERN = re.compile(r'-\d+x\d+(?=\.(?:jpg|png|gif)$)', re.I)
@@ -242,14 +244,26 @@ class TimedDrop(BaseDrop):
def update_minutes(self, minutes: int):
self.current_minutes = minutes
self._on_minutes_changed()
self.display()
def display(self, *, countdown: bool = True, subone: bool = False):
self._manager.display_drop(self, countdown=countdown, subone=subone)
def bump_minutes(self):
# this may get called more often than once every minute
# we can detect this by checking if the GUI progress display is currently counting down
# if we haven't finished counting down the last minute, then we can ignore this call
if self._manager.progress.is_counting():
return
if self.current_minutes < self.required_minutes:
self.current_minutes += 1
self._on_minutes_changed()
drop_text = (
f"{self.name} ({self.campaign.game}, "
f"{self.current_minutes}/{self.required_minutes})"
)
logger.log(CALL, f"Drop progress from active search: {drop_text}")
self.display()
class DropsCampaign:

View File

@@ -430,7 +430,6 @@ class Twitch:
self.watching_channel: AwaitableValue[Channel] = AwaitableValue()
self._watching_task: asyncio.Task[None] | None = None
self._watching_restart = asyncio.Event()
self._drop_update: asyncio.Future[bool] | None = None
# Websocket
self.websocket = WebsocketPool(self)
# Maintenance task
@@ -487,7 +486,6 @@ class Twitch:
cookie_jar.save(COOKIES_PATH)
await self._session.close()
self._session = None
self._drop_update = None
self._drops.clear()
self.channels.clear()
self.inventory.clear()
@@ -843,72 +841,20 @@ class Twitch:
channel: Channel = await self.watching_channel.get()
succeeded: bool = await channel.send_watch()
if not succeeded:
# this usually means the campaign expired in the middle of mining
# NOTE: the maintenance task should switch the channel right after this happens
await self._watch_sleep(60)
logger.log(CALL, f"Watch requested failed for channel: {channel.name}")
await self._watch_sleep(interval)
continue
last_watch = time()
self._drop_update = asyncio.Future()
use_active: bool = False
try:
handled: bool = await asyncio.wait_for(self._drop_update, timeout=10)
except asyncio.TimeoutError:
# there was no websocket update within 10s
handled = False
use_active = True
logger.log(CALL, "No drop update from the websocket received")
self._drop_update = None
if not handled:
# websocket update timed out, or the update was for an unrelated drop
if not use_active:
# we need to use GQL to get the current progress
context = await self.gql_request(GQL_OPERATIONS["CurrentDrop"])
drop_data: JsonType | None = (
context["data"]["currentUser"]["dropCurrentSession"]
)
if drop_data is not None:
drop = self._drops.get(drop_data["dropID"])
if drop is None:
use_active = True
# usually this means there was a campaign changed between reloads
logger.info("Missing drop detected, reloading...")
self.change_state(State.INVENTORY_FETCH)
elif not drop.can_earn(channel):
# we can't earn this drop in the current watching channel
use_active = True
drop_text = (
f"{drop.name} ({drop.campaign.game}, "
f"{drop.current_minutes}/{drop.required_minutes})"
)
logger.log(CALL, f"Current drop returned mismach: {drop_text}")
else:
drop.update_minutes(drop_data["currentMinutesWatched"])
drop.display()
drop_text = (
f"{drop.name} ({drop.campaign.game}, "
f"{drop.current_minutes}/{drop.required_minutes})"
)
logger.log(CALL, f"Drop progress from GQL: {drop_text}")
else:
use_active = True
logger.log(CALL, "Current drop returned as none")
if use_active:
# Sometimes, even GQL fails to give us the correct drop.
# In that case, we can use the locally cached inventory to try
# and put together the drop that we're actually mining right now
# NOTE: get_active_drop uses the watching channel by default,
# so there's no point to pass it here
if (drop := self.get_active_drop()) is not None:
drop.bump_minutes()
drop.display()
drop_text = (
f"{drop.name} ({drop.campaign.game}, "
f"{drop.current_minutes}/{drop.required_minutes})"
)
logger.log(CALL, f"Drop progress from active search: {drop_text}")
else:
logger.log(CALL, "No active drop could be determined")
await self._watch_sleep(last_watch + interval - time())
# Drop progress isn't always returned by the websocket, but we can "pretend"
# the progress is constantly advancing, by simply incrementing the minutes
# watched ourselves. Once the websocket "wakes up" to return proper progress,
# it'll update the display automatically.
# NOTE: get_active_drop uses the watching channel by default,
# so there's no point to pass it here
if (drop := self.get_active_drop()) is not None:
drop.bump_minutes()
else:
logger.log(CALL, "No active drop could be determined")
await self._watch_sleep(interval)
@task_wrapper
async def _maintenance_task(self) -> None:
@@ -1179,22 +1125,9 @@ class Twitch:
else:
drop_text = "<Unknown>"
logger.log(CALL, f"Drop update from websocket: {drop_text}")
if self._drop_update is None:
# we aren't actually waiting for a progress update right now, so we can just
# ignore the event this time
return
elif drop is not None and drop.can_earn(self.watching_channel.get_with_default(None)):
if drop is not None and drop.can_earn(self.watching_channel.get_with_default(None)):
# the received payload is for the drop we expected
drop.update_minutes(message["data"]["current_progress_min"])
drop.display()
# Let the watch loop know we've handled it here
self._drop_update.set_result(True)
else:
# Sometimes, the drop update we receive doesn't actually match what we're mining.
# This is a Twitch bug workaround: signal the watch loop to use GQL
# to get the current drop progress instead.
self._drop_update.set_result(False)
self._drop_update = None
@task_wrapper
async def process_notifications(self, user_id: int, message: JsonType):