From 22b9a7588295fb4b3fb66ff2867062663c9cf4e8 Mon Sep 17 00:00:00 2001 From: Fengqing Liu Date: Wed, 29 Apr 2026 22:18:25 +1000 Subject: [PATCH] Fix Twitch watch events via GQL (#45) based on the original author's fix https://github.com/DevilXD/TwitchDropsMiner/commit/40000cf2954c3ab731e7201d17a1d600e772259c --- AGENTS.md | 6 +- README.md | 2 +- src/api/gql_client.py | 10 ++-- src/config/__init__.py | 4 ++ src/config/constants.py | 17 ++++++ src/core/client.py | 6 +- src/models/channel.py | 49 +++++++++++++-- src/utils/__init__.py | 2 + src/utils/json_utils.py | 5 ++ tests/test_gql_watch_events.py | 106 +++++++++++++++++++++++++++++++++ 10 files changed, 190 insertions(+), 17 deletions(-) create mode 100644 tests/test_gql_watch_events.py diff --git a/AGENTS.md b/AGENTS.md index ecbfba8..af7072d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -174,16 +174,16 @@ lang/ # Translation JSON files (19 languages) ### Drop Mining Mechanism -The application sends periodic "watch" payloads to a spade URL every ~20 seconds: +The application sends periodic "watch" payloads through Twitch GraphQL `sendSpadeEvents`: -- Payload contains minute-watched events with channel/broadcast IDs +- Payload contains gzip/base64-encoded minute-watched events with channel/broadcast IDs - Twitch reports progress via websocket (User.Drops topic) - If websocket updates stop, fallback to GQL CurrentDrop query - Extrapolation via "bump minutes" when no updates received ### GraphQL Operations -Defined in `src/config/operations.py` as `GQL_OPERATIONS`: +Persisted operations are defined in `src/config/operations.py` as `GQL_OPERATIONS`; raw GraphQL payloads such as `sendSpadeEvents` use `GQLQuery`: - **Inventory** - Fetch in-progress campaigns and claimed benefits - **Campaigns** - List available active/upcoming campaigns diff --git a/README.md b/README.md index f14225e..d426b37 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ No more tab juggling, channel switching, or missing rewards — just set it, for ## ✨ Features -- 🚀 **Streamless Mining** — Earn drops without streaming video (save bandwidth) +- 🚀 **Streamless Mining** — Earn drops without streaming video by sending Twitch GraphQL watch events - 🔍 **Automatic Campaign Discovery** — Detects new drop events automatically - ⚙️ **Auto Channel Switching** — Always mines the best available stream - 💾 **Persistent Login** — OAuth login saved via cookies diff --git a/src/api/gql_client.py b/src/api/gql_client.py index a0fa446..7710d32 100644 --- a/src/api/gql_client.py +++ b/src/api/gql_client.py @@ -18,7 +18,7 @@ from src.utils import ExponentialBackoff, RateLimiter if TYPE_CHECKING: from src.api.http_client import HTTPClient from src.auth import _AuthState - from src.config import ClientInfo, GQLOperation, JsonType + from src.config import ClientInfo, GQLRequest, JsonType logger = logging.getLogger("TwitchDrops") @@ -63,18 +63,18 @@ class GQLClient: self._qgl_limiter = RateLimiter(capacity=5, window=1) @overload - async def request(self, ops: GQLOperation) -> JsonType: ... + async def request(self, ops: GQLRequest) -> JsonType: ... @overload - async def request(self, ops: list[GQLOperation]) -> list[JsonType]: ... + async def request(self, ops: list[GQLRequest]) -> list[JsonType]: ... - async def request(self, ops: GQLOperation | list[GQLOperation]) -> JsonType | list[JsonType]: + async def request(self, ops: GQLRequest | list[GQLRequest]) -> JsonType | list[JsonType]: """ Execute one or more GraphQL operations. Parameters ---------- - ops : GQLOperation | list[GQLOperation] + ops : GQLRequest | list[GQLRequest] Single operation or list of operations to execute Returns diff --git a/src/config/__init__.py b/src/config/__init__.py index 5ec8c8f..12518f2 100644 --- a/src/config/__init__.py +++ b/src/config/__init__.py @@ -24,6 +24,8 @@ from .constants import ( WEBSOCKET_TOPICS, WS_TOPICS_LIMIT, GQLOperation, + GQLQuery, + GQLRequest, JsonType, State, TopicProcess, @@ -52,6 +54,8 @@ __all__ = [ "URLType", "TopicProcess", "GQLOperation", + "GQLQuery", + "GQLRequest", "MAX_INT", "MAX_EXTRA_MINUTES", "BASE_TOPICS", diff --git a/src/config/constants.py b/src/config/constants.py index b54d761..48f7fe2 100644 --- a/src/config/constants.py +++ b/src/config/constants.py @@ -37,6 +37,7 @@ FILE_FORMATTER = logging.Formatter( JsonType = dict[str, Any] URLType = NewType("URLType", str) TopicProcess: TypeAlias = "abc.Callable[[int, JsonType], Any]" +GQLRequest: TypeAlias = "GQLOperation | GQLQuery" # Core constants MAX_INT = sys.maxsize @@ -99,6 +100,22 @@ class GQLOperation(JsonType): return modified +class GQLQuery(JsonType): + """Raw GraphQL query operation with gzip/base64 encoded spade events.""" + + def __init__(self, query: str, g64data: str): + super().__init__( + query=query, + variables={ + "input": { + "data": g64data, + "repository": "twilight", + "encoding": "GZIP_B64", + } + }, + ) + + class WebsocketTopic: """Represents a websocket topic subscription.""" diff --git a/src/core/client.py b/src/core/client.py index 5f1939f..beb85ec 100644 --- a/src/core/client.py +++ b/src/core/client.py @@ -38,7 +38,7 @@ from src.websocket import WebsocketPool if TYPE_CHECKING: - from src.config import ClientInfo, GQLOperation, JsonType + from src.config import ClientInfo, GQLRequest, JsonType from src.config.settings import Settings from src.models.channel import Stream from src.models.drop import TimedDrop @@ -597,9 +597,7 @@ class Twitch: await self._auth_state.validate() return self._auth_state - async def gql_request( - self, ops: GQLOperation | list[GQLOperation] - ) -> JsonType | list[JsonType]: + async def gql_request(self, ops: GQLRequest | list[GQLRequest]) -> JsonType | list[JsonType]: """ Execute GraphQL request(s). diff --git a/src/models/channel.py b/src/models/channel.py index e11b7b2..3049be6 100644 --- a/src/models/channel.py +++ b/src/models/channel.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import gzip import json import logging import re @@ -11,11 +12,11 @@ from typing import TYPE_CHECKING, Any, SupportsInt, cast import aiohttp from yarl import URL -from src.config.constants import CALL, ONLINE_DELAY, GQLOperation, JsonType, URLType +from src.config.constants import CALL, ONLINE_DELAY, GQLOperation, GQLQuery, JsonType, URLType from src.config.operations import GQL_OPERATIONS from src.exceptions import MinerException, RequestException from src.models.game import Game -from src.utils.json_utils import json_minify +from src.utils.json_utils import isonow, json_minify if TYPE_CHECKING: @@ -65,6 +66,36 @@ class Stream: ] return {"data": (b64encode(json_minify(payload).encode("utf8"))).decode("utf8")} + @property + def _gql_payload(self) -> GQLQuery: + payload = [ + { + "event": "minute-watched", + "properties": { + "broadcast_id": str(self.broadcast_id), + "channel_id": str(self.channel.id), + "channel": self.channel._login, + "client_time": isonow(), + "game": self.game.name if self.game is not None else "", + "game_id": str(self.game.id) if self.game is not None else "", + "hidden": False, + "is_live": True, + "live": True, + "logged_in": True, + "minutes_logged": 1, + "muted": False, + "user_id": self.channel._twitch._auth_state.user_id, + }, + } + ] + return GQLQuery( + ( + "\n mutation SendEvents($input: SendSpadeEventsInput!) " + "{\n sendSpadeEvents(input: $input) {\n statusCode\n}\n}\n" + ), + b64encode(gzip.compress(json_minify(payload).encode("utf8"))).decode("utf8"), + ) + @classmethod def from_get_stream(cls, channel: Channel, channel_data: JsonType) -> Stream: stream = channel_data["stream"] @@ -419,7 +450,7 @@ class Channel: self.display() # NOTE: This is currently unused. - async def _send_watch(self) -> bool: + async def _send_watch_playlist(self) -> bool: """ This performs a HEAD request on the stream's current playlist, to simulate watching the stream. @@ -471,7 +502,8 @@ class Channel: async with self._twitch.request("HEAD", stream_chunk_url) as head_response: return head_response.status == 200 - async def send_watch(self) -> bool: + # NOTE: This is currently unused. + async def _send_watch_spade(self) -> bool: if self._stream is None: return False if self._spade_url is None: @@ -483,3 +515,12 @@ class Channel: return response.status == 204 except RequestException: return False + + async def send_watch(self) -> bool: + if self._stream is None: + return False + try: + watch_response: JsonType = await self._twitch.gql_request(self._stream._gql_payload) + return watch_response["data"]["sendSpadeEvents"]["statusCode"] == 204 + except RequestException: + return False diff --git a/src/utils/__init__.py b/src/utils/__init__.py index 8ad16b4..f83e885 100644 --- a/src/utils/__init__.py +++ b/src/utils/__init__.py @@ -17,6 +17,7 @@ from .backoff import ExponentialBackoff # JSON utilities from .json_utils import ( SERIALIZE_ENV, + isonow, json_load, json_minify, json_save, @@ -47,6 +48,7 @@ __all__ = [ "deduplicate", # JSON utilities "json_minify", + "isonow", "json_load", "json_save", "merge_json", diff --git a/src/utils/json_utils.py b/src/utils/json_utils.py index 6f72581..1705b47 100644 --- a/src/utils/json_utils.py +++ b/src/utils/json_utils.py @@ -31,6 +31,11 @@ def json_minify(data: JsonType | list[JsonType]) -> str: return json.dumps(data, separators=(",", ":")) +def isonow() -> str: + """Return the current UTC time in Twitch's expected ISO-8601 format.""" + return datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") + + def _serialize(obj: Any) -> Any: """ Custom JSON encoder for special types. diff --git a/tests/test_gql_watch_events.py b/tests/test_gql_watch_events.py new file mode 100644 index 0000000..1d53a9b --- /dev/null +++ b/tests/test_gql_watch_events.py @@ -0,0 +1,106 @@ +import base64 +import gzip +import json +import unittest +from unittest.mock import AsyncMock, MagicMock + +from src.config.constants import GQLQuery +from src.exceptions import RequestException +from src.models.channel import Channel, Stream + + +def _decode_gql_events(operation: GQLQuery): + encoded = operation["variables"]["input"]["data"] + return json.loads(gzip.decompress(base64.b64decode(encoded)).decode("utf8")) + + +class TestGQLWatchEvents(unittest.IsolatedAsyncioTestCase): + def test_gql_query_wraps_gzip_base64_payload(self): + event_payload = [{"event": "minute-watched", "properties": {"channel": "test"}}] + compressed = base64.b64encode(gzip.compress(json.dumps(event_payload).encode("utf8"))).decode( + "utf8" + ) + + operation = GQLQuery("mutation Example { ok }", compressed) + + self.assertEqual(operation["query"], "mutation Example { ok }") + self.assertEqual(operation["variables"]["input"]["repository"], "twilight") + self.assertEqual(operation["variables"]["input"]["encoding"], "GZIP_B64") + self.assertEqual(_decode_gql_events(operation), event_payload) + + def test_stream_gql_payload_contains_minute_watched_event(self): + twitch = MagicMock() + twitch._auth_state.user_id = 12345 + channel = MagicMock(spec=Channel) + channel.id = 67890 + channel._login = "example_channel" + channel._twitch = twitch + stream = Stream( + channel, + id=24680, + game={"id": "13579", "name": "Example Game"}, + viewers=100, + title="Example Stream", + ) + + operation = stream._gql_payload + events = _decode_gql_events(operation) + + self.assertIn("mutation SendEvents", operation["query"]) + self.assertEqual(len(events), 1) + self.assertEqual(events[0]["event"], "minute-watched") + properties = events[0]["properties"] + self.assertEqual(properties["broadcast_id"], "24680") + self.assertEqual(properties["channel_id"], "67890") + self.assertEqual(properties["channel"], "example_channel") + self.assertEqual(properties["game"], "Example Game") + self.assertEqual(properties["game_id"], "13579") + self.assertEqual(properties["minutes_logged"], 1) + self.assertEqual(properties["user_id"], 12345) + self.assertRegex(properties["client_time"], r"^\d{4}-\d{2}-\d{2}T.*Z$") + + async def test_send_watch_uses_gql_and_returns_true_for_204(self): + twitch = MagicMock() + twitch.gui.channels = MagicMock() + twitch._auth_state.user_id = 12345 + twitch.gql_request = AsyncMock(return_value={"data": {"sendSpadeEvents": {"statusCode": 204}}}) + channel = Channel(twitch, id=67890, login="example_channel") + channel._stream = Stream( + channel, + id=24680, + game={"id": "13579", "name": "Example Game"}, + viewers=100, + title="Example Stream", + ) + + result = await channel.send_watch() + + self.assertTrue(result) + twitch.gql_request.assert_awaited_once() + + async def test_send_watch_returns_false_without_stream(self): + twitch = MagicMock() + twitch.gui.channels = MagicMock() + channel = Channel(twitch, id=67890, login="example_channel") + + self.assertFalse(await channel.send_watch()) + + async def test_send_watch_returns_false_when_gql_request_fails(self): + twitch = MagicMock() + twitch.gui.channels = MagicMock() + twitch._auth_state.user_id = 12345 + twitch.gql_request = AsyncMock(side_effect=RequestException()) + channel = Channel(twitch, id=67890, login="example_channel") + channel._stream = Stream( + channel, + id=24680, + game={"id": "13579", "name": "Example Game"}, + viewers=100, + title="Example Stream", + ) + + self.assertFalse(await channel.send_watch()) + + +if __name__ == "__main__": + unittest.main()