Fix Twitch watch events via GQL (#45)

based on the original author's fix 40000cf295
This commit is contained in:
Fengqing Liu
2026-04-29 22:18:25 +10:00
committed by GitHub
parent 93e8268c9e
commit 22b9a75882
10 changed files with 190 additions and 17 deletions

View File

@@ -174,16 +174,16 @@ lang/ # Translation JSON files (19 languages)
### Drop Mining Mechanism
The application sends periodic "watch" payloads to a spade URL every ~20 seconds:
The application sends periodic "watch" payloads through Twitch GraphQL `sendSpadeEvents`:
- Payload contains minute-watched events with channel/broadcast IDs
- Payload contains gzip/base64-encoded minute-watched events with channel/broadcast IDs
- Twitch reports progress via websocket (User.Drops topic)
- If websocket updates stop, fallback to GQL CurrentDrop query
- Extrapolation via "bump minutes" when no updates received
### GraphQL Operations
Defined in `src/config/operations.py` as `GQL_OPERATIONS`:
Persisted operations are defined in `src/config/operations.py` as `GQL_OPERATIONS`; raw GraphQL payloads such as `sendSpadeEvents` use `GQLQuery`:
- **Inventory** - Fetch in-progress campaigns and claimed benefits
- **Campaigns** - List available active/upcoming campaigns

View File

@@ -18,7 +18,7 @@ No more tab juggling, channel switching, or missing rewards — just set it, for
## ✨ Features
- 🚀 **Streamless Mining** — Earn drops without streaming video (save bandwidth)
- 🚀 **Streamless Mining** — Earn drops without streaming video by sending Twitch GraphQL watch events
- 🔍 **Automatic Campaign Discovery** — Detects new drop events automatically
- ⚙️ **Auto Channel Switching** — Always mines the best available stream
- 💾 **Persistent Login** — OAuth login saved via cookies

View File

@@ -18,7 +18,7 @@ from src.utils import ExponentialBackoff, RateLimiter
if TYPE_CHECKING:
from src.api.http_client import HTTPClient
from src.auth import _AuthState
from src.config import ClientInfo, GQLOperation, JsonType
from src.config import ClientInfo, GQLRequest, JsonType
logger = logging.getLogger("TwitchDrops")
@@ -63,18 +63,18 @@ class GQLClient:
self._qgl_limiter = RateLimiter(capacity=5, window=1)
@overload
async def request(self, ops: GQLOperation) -> JsonType: ...
async def request(self, ops: GQLRequest) -> JsonType: ...
@overload
async def request(self, ops: list[GQLOperation]) -> list[JsonType]: ...
async def request(self, ops: list[GQLRequest]) -> list[JsonType]: ...
async def request(self, ops: GQLOperation | list[GQLOperation]) -> JsonType | list[JsonType]:
async def request(self, ops: GQLRequest | list[GQLRequest]) -> JsonType | list[JsonType]:
"""
Execute one or more GraphQL operations.
Parameters
----------
ops : GQLOperation | list[GQLOperation]
ops : GQLRequest | list[GQLRequest]
Single operation or list of operations to execute
Returns

View File

@@ -24,6 +24,8 @@ from .constants import (
WEBSOCKET_TOPICS,
WS_TOPICS_LIMIT,
GQLOperation,
GQLQuery,
GQLRequest,
JsonType,
State,
TopicProcess,
@@ -52,6 +54,8 @@ __all__ = [
"URLType",
"TopicProcess",
"GQLOperation",
"GQLQuery",
"GQLRequest",
"MAX_INT",
"MAX_EXTRA_MINUTES",
"BASE_TOPICS",

View File

@@ -37,6 +37,7 @@ FILE_FORMATTER = logging.Formatter(
JsonType = dict[str, Any]
URLType = NewType("URLType", str)
TopicProcess: TypeAlias = "abc.Callable[[int, JsonType], Any]"
GQLRequest: TypeAlias = "GQLOperation | GQLQuery"
# Core constants
MAX_INT = sys.maxsize
@@ -99,6 +100,22 @@ class GQLOperation(JsonType):
return modified
class GQLQuery(JsonType):
"""Raw GraphQL query operation with gzip/base64 encoded spade events."""
def __init__(self, query: str, g64data: str):
super().__init__(
query=query,
variables={
"input": {
"data": g64data,
"repository": "twilight",
"encoding": "GZIP_B64",
}
},
)
class WebsocketTopic:
"""Represents a websocket topic subscription."""

View File

@@ -38,7 +38,7 @@ from src.websocket import WebsocketPool
if TYPE_CHECKING:
from src.config import ClientInfo, GQLOperation, JsonType
from src.config import ClientInfo, GQLRequest, JsonType
from src.config.settings import Settings
from src.models.channel import Stream
from src.models.drop import TimedDrop
@@ -597,9 +597,7 @@ class Twitch:
await self._auth_state.validate()
return self._auth_state
async def gql_request(
self, ops: GQLOperation | list[GQLOperation]
) -> JsonType | list[JsonType]:
async def gql_request(self, ops: GQLRequest | list[GQLRequest]) -> JsonType | list[JsonType]:
"""
Execute GraphQL request(s).

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import asyncio
import gzip
import json
import logging
import re
@@ -11,11 +12,11 @@ from typing import TYPE_CHECKING, Any, SupportsInt, cast
import aiohttp
from yarl import URL
from src.config.constants import CALL, ONLINE_DELAY, GQLOperation, JsonType, URLType
from src.config.constants import CALL, ONLINE_DELAY, GQLOperation, GQLQuery, JsonType, URLType
from src.config.operations import GQL_OPERATIONS
from src.exceptions import MinerException, RequestException
from src.models.game import Game
from src.utils.json_utils import json_minify
from src.utils.json_utils import isonow, json_minify
if TYPE_CHECKING:
@@ -65,6 +66,36 @@ class Stream:
]
return {"data": (b64encode(json_minify(payload).encode("utf8"))).decode("utf8")}
@property
def _gql_payload(self) -> GQLQuery:
payload = [
{
"event": "minute-watched",
"properties": {
"broadcast_id": str(self.broadcast_id),
"channel_id": str(self.channel.id),
"channel": self.channel._login,
"client_time": isonow(),
"game": self.game.name if self.game is not None else "",
"game_id": str(self.game.id) if self.game is not None else "",
"hidden": False,
"is_live": True,
"live": True,
"logged_in": True,
"minutes_logged": 1,
"muted": False,
"user_id": self.channel._twitch._auth_state.user_id,
},
}
]
return GQLQuery(
(
"\n mutation SendEvents($input: SendSpadeEventsInput!) "
"{\n sendSpadeEvents(input: $input) {\n statusCode\n}\n}\n"
),
b64encode(gzip.compress(json_minify(payload).encode("utf8"))).decode("utf8"),
)
@classmethod
def from_get_stream(cls, channel: Channel, channel_data: JsonType) -> Stream:
stream = channel_data["stream"]
@@ -419,7 +450,7 @@ class Channel:
self.display()
# NOTE: This is currently unused.
async def _send_watch(self) -> bool:
async def _send_watch_playlist(self) -> bool:
"""
This performs a HEAD request on the stream's current playlist,
to simulate watching the stream.
@@ -471,7 +502,8 @@ class Channel:
async with self._twitch.request("HEAD", stream_chunk_url) as head_response:
return head_response.status == 200
async def send_watch(self) -> bool:
# NOTE: This is currently unused.
async def _send_watch_spade(self) -> bool:
if self._stream is None:
return False
if self._spade_url is None:
@@ -483,3 +515,12 @@ class Channel:
return response.status == 204
except RequestException:
return False
async def send_watch(self) -> bool:
if self._stream is None:
return False
try:
watch_response: JsonType = await self._twitch.gql_request(self._stream._gql_payload)
return watch_response["data"]["sendSpadeEvents"]["statusCode"] == 204
except RequestException:
return False

View File

@@ -17,6 +17,7 @@ from .backoff import ExponentialBackoff
# JSON utilities
from .json_utils import (
SERIALIZE_ENV,
isonow,
json_load,
json_minify,
json_save,
@@ -47,6 +48,7 @@ __all__ = [
"deduplicate",
# JSON utilities
"json_minify",
"isonow",
"json_load",
"json_save",
"merge_json",

View File

@@ -31,6 +31,11 @@ def json_minify(data: JsonType | list[JsonType]) -> str:
return json.dumps(data, separators=(",", ":"))
def isonow() -> str:
"""Return the current UTC time in Twitch's expected ISO-8601 format."""
return datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
def _serialize(obj: Any) -> Any:
"""
Custom JSON encoder for special types.

View File

@@ -0,0 +1,106 @@
import base64
import gzip
import json
import unittest
from unittest.mock import AsyncMock, MagicMock
from src.config.constants import GQLQuery
from src.exceptions import RequestException
from src.models.channel import Channel, Stream
def _decode_gql_events(operation: GQLQuery):
encoded = operation["variables"]["input"]["data"]
return json.loads(gzip.decompress(base64.b64decode(encoded)).decode("utf8"))
class TestGQLWatchEvents(unittest.IsolatedAsyncioTestCase):
def test_gql_query_wraps_gzip_base64_payload(self):
event_payload = [{"event": "minute-watched", "properties": {"channel": "test"}}]
compressed = base64.b64encode(gzip.compress(json.dumps(event_payload).encode("utf8"))).decode(
"utf8"
)
operation = GQLQuery("mutation Example { ok }", compressed)
self.assertEqual(operation["query"], "mutation Example { ok }")
self.assertEqual(operation["variables"]["input"]["repository"], "twilight")
self.assertEqual(operation["variables"]["input"]["encoding"], "GZIP_B64")
self.assertEqual(_decode_gql_events(operation), event_payload)
def test_stream_gql_payload_contains_minute_watched_event(self):
twitch = MagicMock()
twitch._auth_state.user_id = 12345
channel = MagicMock(spec=Channel)
channel.id = 67890
channel._login = "example_channel"
channel._twitch = twitch
stream = Stream(
channel,
id=24680,
game={"id": "13579", "name": "Example Game"},
viewers=100,
title="Example Stream",
)
operation = stream._gql_payload
events = _decode_gql_events(operation)
self.assertIn("mutation SendEvents", operation["query"])
self.assertEqual(len(events), 1)
self.assertEqual(events[0]["event"], "minute-watched")
properties = events[0]["properties"]
self.assertEqual(properties["broadcast_id"], "24680")
self.assertEqual(properties["channel_id"], "67890")
self.assertEqual(properties["channel"], "example_channel")
self.assertEqual(properties["game"], "Example Game")
self.assertEqual(properties["game_id"], "13579")
self.assertEqual(properties["minutes_logged"], 1)
self.assertEqual(properties["user_id"], 12345)
self.assertRegex(properties["client_time"], r"^\d{4}-\d{2}-\d{2}T.*Z$")
async def test_send_watch_uses_gql_and_returns_true_for_204(self):
twitch = MagicMock()
twitch.gui.channels = MagicMock()
twitch._auth_state.user_id = 12345
twitch.gql_request = AsyncMock(return_value={"data": {"sendSpadeEvents": {"statusCode": 204}}})
channel = Channel(twitch, id=67890, login="example_channel")
channel._stream = Stream(
channel,
id=24680,
game={"id": "13579", "name": "Example Game"},
viewers=100,
title="Example Stream",
)
result = await channel.send_watch()
self.assertTrue(result)
twitch.gql_request.assert_awaited_once()
async def test_send_watch_returns_false_without_stream(self):
twitch = MagicMock()
twitch.gui.channels = MagicMock()
channel = Channel(twitch, id=67890, login="example_channel")
self.assertFalse(await channel.send_watch())
async def test_send_watch_returns_false_when_gql_request_fails(self):
twitch = MagicMock()
twitch.gui.channels = MagicMock()
twitch._auth_state.user_id = 12345
twitch.gql_request = AsyncMock(side_effect=RequestException())
channel = Channel(twitch, id=67890, login="example_channel")
channel._stream = Stream(
channel,
id=24680,
game={"id": "13579", "name": "Example Game"},
viewers=100,
title="Example Stream",
)
self.assertFalse(await channel.send_watch())
if __name__ == "__main__":
unittest.main()