Introduce back GQL progress querying;

to ensure more accurate drop progress
This commit is contained in:
DevilXD
2024-06-23 09:40:37 +02:00
parent 2bd2845790
commit 7b0f25d5d0
3 changed files with 45 additions and 27 deletions

View File

@@ -284,8 +284,11 @@ GQL_OPERATIONS: dict[str, GQLOperation] = {
# returns current state of drops (current drop progress)
"CurrentDrop": GQLOperation(
"DropCurrentSessionContext",
"2e4b3630b91552eb05b76a94b6850eb25fe42263b7cf6d06bee6d156dd247c1c",
# no variables needed
"4d06b702d25d652afb9ef835d2a550031f1cf762b193523a92166f40ea3d142b",
variables={
"channelID": ..., # watched channel ID as a str
"channelLogin": "", # always empty string
},
),
# returns all available campaigns
"Campaigns": GQLOperation(

View File

@@ -1,14 +1,13 @@
from __future__ import annotations
import re
import logging
from itertools import chain
from typing import TYPE_CHECKING
from functools import cached_property
from datetime import datetime, timedelta, timezone
from channel import Channel
from constants import CALL, GQL_OPERATIONS, URLType
from constants import GQL_OPERATIONS, URLType
from utils import timestamp, invalidate_cache, Game
if TYPE_CHECKING:
@@ -19,7 +18,6 @@ if TYPE_CHECKING:
from gui import GUIManager, InventoryOverview
logger = logging.getLogger("TwitchDrops")
DIMS_PATTERN = re.compile(r'-\d+x\d+(?=\.(?:jpg|png|gif)$)', re.I)
@@ -250,19 +248,9 @@ class TimedDrop(BaseDrop):
self._manager.display_drop(self, countdown=countdown, subone=subone)
def bump_minutes(self):
# this may get called more often than once every minute
# we can detect this by checking if the GUI progress display is currently counting down
# if we haven't finished counting down the last minute, then we can ignore this call
if self._manager.progress.is_counting():
return
if self.current_minutes < self.required_minutes:
self.current_minutes += 1
self._on_minutes_changed()
drop_text = (
f"{self.name} ({self.campaign.game}, "
f"{self.current_minutes}/{self.required_minutes})"
)
logger.log(CALL, f"Drop progress from active search: {drop_text}")
self.display()

View File

@@ -842,18 +842,45 @@ class Twitch:
succeeded: bool = await channel.send_watch()
if not succeeded:
logger.log(CALL, f"Watch requested failed for channel: {channel.name}")
await self._watch_sleep(interval)
continue
# Drop progress isn't always returned by the websocket, but we can "pretend"
# the progress is constantly advancing, by simply incrementing the minutes
# watched ourselves. Once the websocket "wakes up" to return proper progress,
# it'll update the display automatically.
# NOTE: get_active_drop uses the watching channel by default,
# so there's no point to pass it here
if (drop := self.get_active_drop()) is not None:
drop.bump_minutes()
else:
logger.log(CALL, "No active drop could be determined")
elif not self.gui.progress.is_counting():
# If the previous update was more than 60s ago, and the progress tracker
# isn't counting down anymore, that means Twitch has temporarily
# stopped reporting drops progress. To ensure the timer keeps at least somewhat
# accurate time, we can use GQL to query for the current drop,
# or even "pretend" mining as a last resort option.
handled: bool = False
# Solution 1: use GQL to query for the currently mined drop status
context = await self.gql_request(
GQL_OPERATIONS["CurrentDrop"].with_variables({"channelID": str(channel.id)})
)
drop_data: JsonType | None = (
context["data"]["currentUser"]["dropCurrentSession"]
)
if drop_data is not None:
drop = self._drops.get(drop_data["dropID"])
if drop is not None and drop.can_earn(channel):
drop.update_minutes(drop_data["currentMinutesWatched"])
drop_text = (
f"{drop.name} ({drop.campaign.game}, "
f"{drop.current_minutes}/{drop.required_minutes})"
)
logger.log(CALL, f"Drop progress from GQL: {drop_text}")
handled = True
# Solution 2: If GQL fails, figure out which drop we're most likely mining
# right now, and then bump up the minutes on that drop
if not handled:
if (drop := self.get_active_drop(channel)) is not None:
drop.bump_minutes()
drop_text = (
f"{drop.name} ({drop.campaign.game}, "
f"{drop.current_minutes}/{drop.required_minutes})"
)
logger.log(CALL, f"Drop progress from active search: {drop_text}")
handled = True
else:
logger.log(CALL, "No active drop could be determined")
await self._watch_sleep(interval)
@task_wrapper