mirror of
https://github.com/rangermix/TwitchDropsMiner.git
synced 2026-05-26 15:13:32 +00:00
Implement channel watching using the new method
This commit is contained in:
116
channel.py
116
channel.py
@@ -1,14 +1,11 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import asyncio
|
||||
import logging
|
||||
from base64 import b64encode
|
||||
from functools import cached_property
|
||||
from typing import Any, SupportsInt, TYPE_CHECKING
|
||||
|
||||
from utils import invalidate_cache, json_minify, Game
|
||||
from exceptions import MinerException, RequestException
|
||||
from utils import Game
|
||||
from exceptions import MinerException
|
||||
from constants import CALL, GQL_OPERATIONS, ONLINE_DELAY, URLType
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -21,7 +18,9 @@ logger = logging.getLogger("TwitchDrops")
|
||||
|
||||
|
||||
class Stream:
|
||||
__slots__ = ("channel", "broadcast_id", "viewers", "drops_enabled", "game", "title")
|
||||
__slots__ = (
|
||||
"channel", "broadcast_id", "viewers", "drops_enabled", "game", "title", "_stream_url"
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -38,6 +37,7 @@ class Stream:
|
||||
self.drops_enabled: bool = False
|
||||
self.game: Game | None = Game(game) if game else None
|
||||
self.title: str = title
|
||||
self._stream_url: URLType | None = None
|
||||
|
||||
@classmethod
|
||||
def from_get_stream(cls, channel: Channel, data: JsonType) -> Stream:
|
||||
@@ -70,6 +70,29 @@ class Stream:
|
||||
return self.broadcast_id == other.broadcast_id
|
||||
return NotImplemented
|
||||
|
||||
async def get_stream_url(self) -> URLType:
|
||||
if self._stream_url is not None:
|
||||
return self._stream_url
|
||||
# get the stream playback access token from GQL
|
||||
playback_token_response: JsonType = await self.channel._twitch.gql_request(
|
||||
GQL_OPERATIONS["PlaybackAccessToken"].with_variables({"login": self.channel._login})
|
||||
)
|
||||
token_data: JsonType = playback_token_response["data"]["streamPlaybackAccessToken"]
|
||||
token_value = token_data["value"]
|
||||
token_signature = token_data["signature"]
|
||||
# using the token, query Twitch for a list of all available stream qualities
|
||||
async with self.channel._twitch.request(
|
||||
"GET",
|
||||
URLType(
|
||||
"https://usher.ttvnw.net/api/channel/hls/"
|
||||
f"{self.channel._login}.m3u8?sig={token_signature}&token={token_value}"
|
||||
),
|
||||
) as qualities_response:
|
||||
available_qualities: str = await qualities_response.text()
|
||||
# pick the last URL from the list, usually with the lowest quality stream
|
||||
self._stream_url = URLType(available_qualities.strip().split("\n")[-1])
|
||||
return self._stream_url
|
||||
|
||||
|
||||
class Channel:
|
||||
def __init__(
|
||||
@@ -86,7 +109,6 @@ class Channel:
|
||||
self.id: int = int(id)
|
||||
self._login: str = login
|
||||
self._display_name: str | None = display_name
|
||||
self._spade_url: URLType | None = None
|
||||
self.points: int | None = None
|
||||
self._stream: Stream | None = None
|
||||
self._pending_stream_up: asyncio.Task[Any] | None = None
|
||||
@@ -215,34 +237,6 @@ class Channel:
|
||||
self._pending_stream_up = None
|
||||
self._gui_channels.remove(self)
|
||||
|
||||
async def get_spade_url(self) -> URLType:
|
||||
"""
|
||||
To get this monstrous thing, you have to walk a chain of requests.
|
||||
Streamer page (HTML) --parse-> Streamer Settings (JavaScript) --parse-> Spade URL
|
||||
|
||||
For mobile view, spade_url is available immediately from the page, skipping step #2.
|
||||
"""
|
||||
SETTINGS_PATTERN: str = (
|
||||
r'src="(https://[\w.]+/config/settings\.[0-9a-f]{32}\.js)"'
|
||||
)
|
||||
SPADE_PATTERN: str = (
|
||||
r'"spade_?url": ?"(https://video-edge-[.\w\-/]+\.ts(?:\?allow_stream=true)?)"'
|
||||
)
|
||||
async with self._twitch.request("GET", self.url) as response1:
|
||||
streamer_html: str = await response1.text(encoding="utf8")
|
||||
match = re.search(SPADE_PATTERN, streamer_html, re.I)
|
||||
if not match:
|
||||
match = re.search(SETTINGS_PATTERN, streamer_html, re.I)
|
||||
if not match:
|
||||
raise MinerException("Error while spade_url extraction: step #1")
|
||||
streamer_settings = match.group(1)
|
||||
async with self._twitch.request("GET", streamer_settings) as response2:
|
||||
settings_js: str = await response2.text(encoding="utf8")
|
||||
match = re.search(SPADE_PATTERN, settings_js, re.I)
|
||||
if not match:
|
||||
raise MinerException("Error while spade_url extraction: step #2")
|
||||
return URLType(match.group(1))
|
||||
|
||||
async def get_stream(self) -> Stream | None:
|
||||
try:
|
||||
response: JsonType = await self._twitch.gql_request(
|
||||
@@ -283,7 +277,6 @@ class Channel:
|
||||
"""
|
||||
old_stream = self._stream
|
||||
self._stream = await self.get_stream()
|
||||
invalidate_cache(self, "_payload")
|
||||
if trigger_events:
|
||||
self._twitch.on_channel_update(self, old_stream, self._stream)
|
||||
return self._stream is not None
|
||||
@@ -328,7 +321,6 @@ class Channel:
|
||||
if self.online:
|
||||
old_stream = self._stream
|
||||
self._stream = None
|
||||
invalidate_cache(self, "_payload")
|
||||
self._twitch.on_channel_update(self, old_stream, self._stream)
|
||||
needs_display = False
|
||||
if needs_display:
|
||||
@@ -354,42 +346,26 @@ class Channel:
|
||||
# so if we're not calling it, we need to do it ourselves
|
||||
self.display()
|
||||
|
||||
@cached_property
|
||||
def _payload(self) -> JsonType:
|
||||
assert self._stream is not None
|
||||
payload = [
|
||||
{
|
||||
"event": "minute-watched",
|
||||
"properties": {
|
||||
"broadcast_id": str(self._stream.broadcast_id),
|
||||
"channel_id": str(self.id),
|
||||
"channel": self._login,
|
||||
"hidden": False,
|
||||
"live": True,
|
||||
"location": "channel",
|
||||
"logged_in": True,
|
||||
"muted": False,
|
||||
"player": "site",
|
||||
"user_id": self._twitch._auth_state.user_id,
|
||||
}
|
||||
}
|
||||
]
|
||||
return {"data": (b64encode(json_minify(payload).encode("utf8"))).decode("utf8")}
|
||||
|
||||
async def send_watch(self) -> bool:
|
||||
"""
|
||||
This uses the encoded payload on spade url to simulate watching the stream.
|
||||
Optimally, send every 60 seconds to advance drops.
|
||||
"""
|
||||
if not self.online:
|
||||
return False
|
||||
if self._spade_url is None:
|
||||
self._spade_url = await self.get_spade_url()
|
||||
logger.debug(f"Sending minute-watched to {self.name}")
|
||||
try:
|
||||
async with self._twitch.request(
|
||||
"POST", self._spade_url, data=self._payload
|
||||
) as response:
|
||||
return response.status == 204
|
||||
except RequestException:
|
||||
if self._stream is None:
|
||||
return False
|
||||
# get the stream url
|
||||
stream_url = await self._stream.get_stream_url()
|
||||
# fetch a list of chunks available to download for the stream
|
||||
# NOTE: the CDN is configured to forcibly disconnect shortly after serving the list,
|
||||
# if we don't do it yourselves. Lets help it by actually doing it ourselves instead.
|
||||
async with self._twitch.request(
|
||||
"GET", stream_url, headers={"Connection": "close"}
|
||||
) as chunks_response:
|
||||
available_chunks: str = await chunks_response.text()
|
||||
# the list contains ~10-13 chunks of the stream at 2s intervals,
|
||||
# let's pick the last chunk URL available
|
||||
stream_chunk_url: URLType = URLType(available_chunks.strip().split("\n")[-1])
|
||||
# sending a HEAD request is enough to advance the drops,
|
||||
# without downloading the actual stream data
|
||||
async with self._twitch.request("HEAD", stream_chunk_url) as head_response:
|
||||
return head_response.status == 200
|
||||
|
||||
14
constants.py
14
constants.py
@@ -116,7 +116,7 @@ DEFAULT_LANG = "English"
|
||||
PING_INTERVAL = timedelta(minutes=3)
|
||||
PING_TIMEOUT = timedelta(seconds=10)
|
||||
ONLINE_DELAY = timedelta(seconds=120)
|
||||
WATCH_INTERVAL = timedelta(seconds=59)
|
||||
WATCH_INTERVAL = timedelta(seconds=20)
|
||||
# Strings
|
||||
WINDOW_TITLE = f"Twitch Drops Miner v{__version__} (by DevilXD)"
|
||||
# Logging
|
||||
@@ -310,6 +310,18 @@ GQL_OPERATIONS: dict[str, GQLOperation] = {
|
||||
"channelID": ..., # channel ID as a str
|
||||
},
|
||||
),
|
||||
# retuns stream playback access token
|
||||
"PlaybackAccessToken": GQLOperation(
|
||||
"PlaybackAccessToken",
|
||||
"3093517e37e4f4cb48906155bcd894150aef92617939236d2508f3375ab732ce",
|
||||
variables={
|
||||
"isLive": True,
|
||||
"login": ..., # channel login
|
||||
"isVod": False,
|
||||
"vodID": "",
|
||||
"playerType": "site"
|
||||
},
|
||||
),
|
||||
# returns live channels for a particular game
|
||||
"GameDirectory": GQLOperation(
|
||||
"DirectoryPage_Game",
|
||||
|
||||
3
gui.py
3
gui.py
@@ -700,6 +700,9 @@ class CampaignProgress:
|
||||
self._timer_task.cancel()
|
||||
self._timer_task = None
|
||||
|
||||
def is_counting(self) -> bool:
|
||||
return self._timer_task is not None
|
||||
|
||||
def display(self, drop: TimedDrop | None, *, countdown: bool = True, subone: bool = False):
|
||||
self._drop = drop
|
||||
vars_drop = self._vars["drop"]
|
||||
|
||||
16
inventory.py
16
inventory.py
@@ -1,13 +1,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import logging
|
||||
from itertools import chain
|
||||
from typing import TYPE_CHECKING
|
||||
from functools import cached_property
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from channel import Channel
|
||||
from constants import GQL_OPERATIONS, URLType
|
||||
from constants import CALL, GQL_OPERATIONS, URLType
|
||||
from utils import timestamp, invalidate_cache, Game
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -18,6 +19,7 @@ if TYPE_CHECKING:
|
||||
from gui import GUIManager, InventoryOverview
|
||||
|
||||
|
||||
logger = logging.getLogger("TwitchDrops")
|
||||
DIMS_PATTERN = re.compile(r'-\d+x\d+(?=\.(?:jpg|png|gif)$)', re.I)
|
||||
|
||||
|
||||
@@ -242,14 +244,26 @@ class TimedDrop(BaseDrop):
|
||||
def update_minutes(self, minutes: int):
|
||||
self.current_minutes = minutes
|
||||
self._on_minutes_changed()
|
||||
self.display()
|
||||
|
||||
def display(self, *, countdown: bool = True, subone: bool = False):
|
||||
self._manager.display_drop(self, countdown=countdown, subone=subone)
|
||||
|
||||
def bump_minutes(self):
|
||||
# this may get called more often than once every minute
|
||||
# we can detect this by checking if the GUI progress display is currently counting down
|
||||
# if we haven't finished counting down the last minute, then we can ignore this call
|
||||
if self._manager.progress.is_counting():
|
||||
return
|
||||
if self.current_minutes < self.required_minutes:
|
||||
self.current_minutes += 1
|
||||
self._on_minutes_changed()
|
||||
drop_text = (
|
||||
f"{self.name} ({self.campaign.game}, "
|
||||
f"{self.current_minutes}/{self.required_minutes})"
|
||||
)
|
||||
logger.log(CALL, f"Drop progress from active search: {drop_text}")
|
||||
self.display()
|
||||
|
||||
|
||||
class DropsCampaign:
|
||||
|
||||
95
twitch.py
95
twitch.py
@@ -430,7 +430,6 @@ class Twitch:
|
||||
self.watching_channel: AwaitableValue[Channel] = AwaitableValue()
|
||||
self._watching_task: asyncio.Task[None] | None = None
|
||||
self._watching_restart = asyncio.Event()
|
||||
self._drop_update: asyncio.Future[bool] | None = None
|
||||
# Websocket
|
||||
self.websocket = WebsocketPool(self)
|
||||
# Maintenance task
|
||||
@@ -487,7 +486,6 @@ class Twitch:
|
||||
cookie_jar.save(COOKIES_PATH)
|
||||
await self._session.close()
|
||||
self._session = None
|
||||
self._drop_update = None
|
||||
self._drops.clear()
|
||||
self.channels.clear()
|
||||
self.inventory.clear()
|
||||
@@ -843,72 +841,20 @@ class Twitch:
|
||||
channel: Channel = await self.watching_channel.get()
|
||||
succeeded: bool = await channel.send_watch()
|
||||
if not succeeded:
|
||||
# this usually means the campaign expired in the middle of mining
|
||||
# NOTE: the maintenance task should switch the channel right after this happens
|
||||
await self._watch_sleep(60)
|
||||
logger.log(CALL, f"Watch requested failed for channel: {channel.name}")
|
||||
await self._watch_sleep(interval)
|
||||
continue
|
||||
last_watch = time()
|
||||
self._drop_update = asyncio.Future()
|
||||
use_active: bool = False
|
||||
try:
|
||||
handled: bool = await asyncio.wait_for(self._drop_update, timeout=10)
|
||||
except asyncio.TimeoutError:
|
||||
# there was no websocket update within 10s
|
||||
handled = False
|
||||
use_active = True
|
||||
logger.log(CALL, "No drop update from the websocket received")
|
||||
self._drop_update = None
|
||||
if not handled:
|
||||
# websocket update timed out, or the update was for an unrelated drop
|
||||
if not use_active:
|
||||
# we need to use GQL to get the current progress
|
||||
context = await self.gql_request(GQL_OPERATIONS["CurrentDrop"])
|
||||
drop_data: JsonType | None = (
|
||||
context["data"]["currentUser"]["dropCurrentSession"]
|
||||
)
|
||||
if drop_data is not None:
|
||||
drop = self._drops.get(drop_data["dropID"])
|
||||
if drop is None:
|
||||
use_active = True
|
||||
# usually this means there was a campaign changed between reloads
|
||||
logger.info("Missing drop detected, reloading...")
|
||||
self.change_state(State.INVENTORY_FETCH)
|
||||
elif not drop.can_earn(channel):
|
||||
# we can't earn this drop in the current watching channel
|
||||
use_active = True
|
||||
drop_text = (
|
||||
f"{drop.name} ({drop.campaign.game}, "
|
||||
f"{drop.current_minutes}/{drop.required_minutes})"
|
||||
)
|
||||
logger.log(CALL, f"Current drop returned mismach: {drop_text}")
|
||||
else:
|
||||
drop.update_minutes(drop_data["currentMinutesWatched"])
|
||||
drop.display()
|
||||
drop_text = (
|
||||
f"{drop.name} ({drop.campaign.game}, "
|
||||
f"{drop.current_minutes}/{drop.required_minutes})"
|
||||
)
|
||||
logger.log(CALL, f"Drop progress from GQL: {drop_text}")
|
||||
else:
|
||||
use_active = True
|
||||
logger.log(CALL, "Current drop returned as none")
|
||||
if use_active:
|
||||
# Sometimes, even GQL fails to give us the correct drop.
|
||||
# In that case, we can use the locally cached inventory to try
|
||||
# and put together the drop that we're actually mining right now
|
||||
# NOTE: get_active_drop uses the watching channel by default,
|
||||
# so there's no point to pass it here
|
||||
if (drop := self.get_active_drop()) is not None:
|
||||
drop.bump_minutes()
|
||||
drop.display()
|
||||
drop_text = (
|
||||
f"{drop.name} ({drop.campaign.game}, "
|
||||
f"{drop.current_minutes}/{drop.required_minutes})"
|
||||
)
|
||||
logger.log(CALL, f"Drop progress from active search: {drop_text}")
|
||||
else:
|
||||
logger.log(CALL, "No active drop could be determined")
|
||||
await self._watch_sleep(last_watch + interval - time())
|
||||
# Drop progress isn't always returned by the websocket, but we can "pretend"
|
||||
# the progress is constantly advancing, by simply incrementing the minutes
|
||||
# watched ourselves. Once the websocket "wakes up" to return proper progress,
|
||||
# it'll update the display automatically.
|
||||
# NOTE: get_active_drop uses the watching channel by default,
|
||||
# so there's no point to pass it here
|
||||
if (drop := self.get_active_drop()) is not None:
|
||||
drop.bump_minutes()
|
||||
else:
|
||||
logger.log(CALL, "No active drop could be determined")
|
||||
await self._watch_sleep(interval)
|
||||
|
||||
@task_wrapper
|
||||
async def _maintenance_task(self) -> None:
|
||||
@@ -1179,22 +1125,9 @@ class Twitch:
|
||||
else:
|
||||
drop_text = "<Unknown>"
|
||||
logger.log(CALL, f"Drop update from websocket: {drop_text}")
|
||||
if self._drop_update is None:
|
||||
# we aren't actually waiting for a progress update right now, so we can just
|
||||
# ignore the event this time
|
||||
return
|
||||
elif drop is not None and drop.can_earn(self.watching_channel.get_with_default(None)):
|
||||
if drop is not None and drop.can_earn(self.watching_channel.get_with_default(None)):
|
||||
# the received payload is for the drop we expected
|
||||
drop.update_minutes(message["data"]["current_progress_min"])
|
||||
drop.display()
|
||||
# Let the watch loop know we've handled it here
|
||||
self._drop_update.set_result(True)
|
||||
else:
|
||||
# Sometimes, the drop update we receive doesn't actually match what we're mining.
|
||||
# This is a Twitch bug workaround: signal the watch loop to use GQL
|
||||
# to get the current drop progress instead.
|
||||
self._drop_update.set_result(False)
|
||||
self._drop_update = None
|
||||
|
||||
@task_wrapper
|
||||
async def process_notifications(self, user_id: int, message: JsonType):
|
||||
|
||||
Reference in New Issue
Block a user