From b44100764c49d69d6166432eef007923b6ded055 Mon Sep 17 00:00:00 2001 From: DevilXD <4180725+DevilXD@users.noreply.github.com> Date: Wed, 10 Sep 2025 00:08:03 +0200 Subject: [PATCH] Implement drop progression tracking across all drops in a campaign --- channel.py | 2 +- constants.py | 1 + inventory.py | 213 +++++++++++++++++++++++++++++++-------------------- twitch.py | 60 +++++++-------- 4 files changed, 163 insertions(+), 113 deletions(-) diff --git a/channel.py b/channel.py index 33ff4bb..f6be024 100644 --- a/channel.py +++ b/channel.py @@ -310,7 +310,7 @@ class Channel: return any( ( (campaign := self._twitch._campaigns.get(campaign_data["id"])) is not None - and campaign.can_earn(self) + and campaign.can_earn(self, ignore_channel_status=True) ) for campaign_data in available_drops ) diff --git a/constants.py b/constants.py index 74d8435..87fa56c 100644 --- a/constants.py +++ b/constants.py @@ -113,6 +113,7 @@ URLType = NewType("URLType", str) TopicProcess: TypeAlias = "abc.Callable[[int, JsonType], Any]" # Values MAX_INT = sys.maxsize +MAX_EXTRA_MINUTES = 15 BASE_TOPICS = 2 MAX_WEBSOCKETS = 8 WS_TOPICS_LIMIT = 50 diff --git a/inventory.py b/inventory.py index 632a0e3..6e8e9f8 100644 --- a/inventory.py +++ b/inventory.py @@ -11,16 +11,15 @@ from datetime import datetime, timedelta, timezone from translate import _ from channel import Channel +from utils import timestamp, Game from exceptions import GQLException -from constants import GQL_OPERATIONS, URLType -from utils import timestamp, invalidate_cache, Game +from constants import GQL_OPERATIONS, MAX_EXTRA_MINUTES, URLType, State if TYPE_CHECKING: from collections import abc from twitch import Twitch from constants import JsonType - from gui import GUIManager, InventoryOverview logger = logging.getLogger("TwitchDrops") @@ -100,11 +99,14 @@ class BaseDrop: additional = '' return f"Drop({self.rewards_text()}{additional})" - @cached_property + @property def preconditions_met(self) -> bool: campaign = self.campaign return all(campaign.timed_drops[pid].is_claimed for pid in self.precondition_drops) + def _on_state_changed(self) -> None: + raise NotImplementedError + def _base_earn_conditions(self) -> bool: # define when a drop can be earned or not return ( @@ -122,16 +124,21 @@ class BaseDrop: and self.starts_at <= datetime.now(timezone.utc) < self.ends_at ) - def can_earn(self, channel: Channel | None = None) -> bool: - return self._base_can_earn() and self.campaign._base_can_earn(channel) - - def can_earn_within(self, stamp: datetime) -> bool: + def _can_earn_within(self, stamp: datetime) -> bool: + # NOTE: This does not check the campaign's eligibility or active status return ( self._base_earn_conditions() and self.ends_at > datetime.now(timezone.utc) and self.starts_at < stamp ) + def can_earn( + self, channel: Channel | None = None, ignore_channel_status: bool = False + ) -> bool: + return ( + self._base_can_earn() and self.campaign._base_can_earn(channel, ignore_channel_status) + ) + @property def can_claim(self) -> bool: # https://help.twitch.tv/s/article/mission-based-drops?language=en_US#claiming @@ -143,9 +150,6 @@ class BaseDrop: and datetime.now(timezone.utc) < self.campaign.ends_at + timedelta(hours=24) ) - def _on_claim(self) -> None: - invalidate_cache(self, "preconditions_met") - def update_claim(self, claim_id: str): self.claim_id = claim_id @@ -164,9 +168,6 @@ class BaseDrop: result = await self._claim() if result: self.is_claimed = result - # notify the campaign about claiming - # this will cause it to call our _on_claim, so no need to call it ourselves here - self.campaign._on_claim() claim_text = ( f"{self.campaign.game.name}\n" f"{self.rewards_text()} " @@ -208,7 +209,7 @@ class BaseDrop: return False elif ( data["claimDropRewards"]["status"] - in ["ELIGIBLE_FOR_ALL", "DROP_INSTANCE_ALREADY_CLAIMED"] + in ("ELIGIBLE_FOR_ALL", "DROP_INSTANCE_ALREADY_CLAIMED") ): return True return False @@ -219,13 +220,14 @@ class TimedDrop(BaseDrop): self, campaign: DropsCampaign, data: JsonType, claimed_benefits: dict[str, datetime] ): super().__init__(campaign, data, claimed_benefits) - self._manager: GUIManager = self._twitch.gui - self._gui_inv: InventoryOverview = self._manager.inv - self.current_minutes: int = "self" in data and data["self"]["currentMinutesWatched"] or 0 + self.real_current_minutes: int = ( + "self" in data and data["self"]["currentMinutesWatched"] or 0 + ) self.required_minutes: int = data["requiredMinutesWatched"] + self.extra_current_minutes: int = 0 if self.is_claimed: # claimed drops may report inconsistent current minutes, so we need to overwrite them - self.current_minutes = self.required_minutes + self.real_current_minutes = self.required_minutes def __repr__(self) -> str: if self.is_claimed: @@ -240,11 +242,15 @@ class TimedDrop(BaseDrop): minutes = '' return f"Drop({self.rewards_text()}{minutes}{additional})" - @cached_property + @property + def current_minutes(self) -> int: + return self.real_current_minutes + self.extra_current_minutes + + @property def remaining_minutes(self) -> int: return self.required_minutes - self.current_minutes - @cached_property + @property def total_required_minutes(self) -> int: return self.required_minutes + max( ( @@ -254,7 +260,7 @@ class TimedDrop(BaseDrop): default=0, ) - @cached_property + @property def total_remaining_minutes(self) -> int: return self.remaining_minutes + max( ( @@ -264,7 +270,7 @@ class TimedDrop(BaseDrop): default=0, ) - @cached_property + @property def progress(self) -> float: if self.current_minutes <= 0 or self.required_minutes <= 0: return 0.0 @@ -280,45 +286,55 @@ class TimedDrop(BaseDrop): return math.inf def _base_earn_conditions(self) -> bool: - return super()._base_earn_conditions() and self.required_minutes > 0 + return ( + super()._base_earn_conditions() + and self.required_minutes > 0 + # NOTE: This may be a bad idea, as it invalidates the can_earn status + # and provides no way to recover from this state until the next reload. + and self.extra_current_minutes < MAX_EXTRA_MINUTES + ) - def _on_claim(self) -> None: - result = super()._on_claim() - self._gui_inv.update_drop(self) - return result + def _on_state_changed(self) -> None: + self._twitch.gui.inv.update_drop(self) - def _on_minutes_changed(self) -> None: - invalidate_cache(self, "progress", "remaining_minutes") - self.campaign._on_minutes_changed() - self._gui_inv.update_drop(self) + def _update_real_minutes(self, delta: int) -> None: + if delta == 0 or self.real_current_minutes + delta < 0 or not self.can_earn(): + return + if self.real_current_minutes + delta < self.required_minutes: + self.real_current_minutes += delta + else: + self.real_current_minutes = self.required_minutes + self.extra_current_minutes = 0 + self._on_state_changed() - def _on_total_minutes_changed(self) -> None: - invalidate_cache(self, "total_required_minutes", "total_remaining_minutes") + def _bump_minutes(self, channel: Channel | None) -> bool: + if self.can_earn(channel): + self.extra_current_minutes += 1 + self._on_state_changed() + if self.extra_current_minutes >= MAX_EXTRA_MINUTES: + return True + return False async def claim(self) -> bool: result = await super().claim() if result: - self.current_minutes = self.required_minutes + self.real_current_minutes = self.required_minutes + self.extra_current_minutes = 0 + self._on_state_changed() return result - def update_minutes(self, minutes: int): - if minutes < 0: - return - elif minutes <= self.required_minutes: - self.current_minutes = minutes - else: - self.current_minutes = self.required_minutes - self._on_minutes_changed() - self.display() - def display(self, *, countdown: bool = True, subone: bool = False): - self._manager.display_drop(self, countdown=countdown, subone=subone) + self._twitch.gui.display_drop(self, countdown=countdown, subone=subone) - def bump_minutes(self): - if self.current_minutes < self.required_minutes: - self.current_minutes += 1 - self._on_minutes_changed() - self.display() + def update_minutes(self, new_minutes: int): + delta: int = new_minutes - self.real_current_minutes + if delta == 0: + return + elif self.real_current_minutes + delta < 0: + delta = -self.real_current_minutes + elif self.real_current_minutes + delta > self.required_minutes: + delta = self.required_minutes - self.real_current_minutes + self.campaign._update_real_minutes(delta) class DropsCampaign: @@ -387,27 +403,27 @@ class DropsCampaign: benefit.type.is_badge_or_emote() for drop in self.drops for benefit in drop.benefits ) - @cached_property + @property def finished(self) -> bool: return all(d.is_claimed or d.required_minutes <= 0 for d in self.drops) - @cached_property + @property def claimed_drops(self) -> int: return sum(d.is_claimed for d in self.drops) - @cached_property + @property def remaining_drops(self) -> int: return sum(not d.is_claimed for d in self.drops) - @cached_property + @property def required_minutes(self) -> int: return max(d.total_required_minutes for d in self.drops) - @cached_property + @property def remaining_minutes(self) -> int: return max(d.total_remaining_minutes for d in self.drops) - @cached_property + @property def progress(self) -> float: return sum(d.progress for d in self.drops) / self.total_drops @@ -415,6 +431,44 @@ class DropsCampaign: def availability(self) -> float: return min(d.availability for d in self.drops) + @property + def first_drop(self) -> TimedDrop | None: + drops: list[TimedDrop] = sorted( + (drop for drop in self.drops if drop.can_earn()), + key=lambda d: d.remaining_minutes, + ) + return drops[0] if drops else None + + def _update_real_minutes(self, delta: int) -> None: + for drop in self.drops: + drop._update_real_minutes(delta) + if (first_drop := self.first_drop) is not None: + first_drop.display() + + def _base_can_earn( + self, channel: Channel | None = None, ignore_channel_status: bool = False + ) -> bool: + return ( + self.eligible # account is eligible + and self.active # campaign is active (and valid) + and ( + channel is None or ( # channel isn't specified, + # or there's no ACL, or the channel is in the ACL + (not self.allowed_channels or channel in self.allowed_channels) + # and the channel is live and playing the campaign's game + and ( + ignore_channel_status + or channel.game is not None + and channel.game == self.game + or self.has_badge_or_emote + ) + ) + ) + ) + + def get_drop(self, drop_id: str) -> TimedDrop | None: + return self.timed_drops.get(drop_id) + def preconditions_chain(self) -> set[str]: return set( chain.from_iterable( @@ -422,30 +476,14 @@ class DropsCampaign: ) ) - def _on_claim(self) -> None: - invalidate_cache(self, "finished", "claimed_drops", "remaining_drops") - for drop in self.drops: - drop._on_claim() - - def _on_minutes_changed(self) -> None: - invalidate_cache(self, "progress", "required_minutes", "remaining_minutes") - for drop in self.drops: - drop._on_total_minutes_changed() - - def get_drop(self, drop_id: str) -> TimedDrop | None: - return self.timed_drops.get(drop_id) - - def _base_can_earn(self, channel: Channel | None = None) -> bool: - return ( - self.eligible # account is eligible - and self.active # campaign is active - # channel isn't specified, or there's no ACL, or the channel is in the ACL - and (channel is None or not self.allowed_channels or channel in self.allowed_channels) - ) - - def can_earn(self, channel: Channel | None = None) -> bool: + def can_earn( + self, channel: Channel | None = None, ignore_channel_status: bool = False + ) -> bool: # True if any of the containing drops can be earned - return self._base_can_earn(channel) and any(drop._base_can_earn() for drop in self.drops) + return ( + self._base_can_earn(channel, ignore_channel_status) + and any(drop._base_can_earn() for drop in self.drops) + ) def can_earn_within(self, stamp: datetime) -> bool: # Same as can_earn, but doesn't check the channel @@ -455,5 +493,18 @@ class DropsCampaign: and self._valid and self.ends_at > datetime.now(timezone.utc) and self.starts_at < stamp - and any(drop.can_earn_within(stamp) for drop in self.drops) + and any(drop._can_earn_within(stamp) for drop in self.drops) ) + + def bump_minutes(self, channel: Channel) -> None: + # NOTE: Use a temporary list to ensure all drops are bumped before checking + if any([drop._bump_minutes(channel) for drop in self.drops]): + # Executes if any drop's extra_current_minutes reach MAX_ESTIMATED_MINUTES + # TODO: Figure out a better way to handle this case + logger.warning( + f"At least one of the drops in campaign \"{self.name}({self.game.name})\" " + "has reached the maximum extra minutes limit!" + ) + self._twitch.change_state(State.CHANNEL_SWITCH) + if (first_drop := self.first_drop) is not None: + first_drop.display() diff --git a/twitch.py b/twitch.py index 8295826..d6a24e6 100644 --- a/twitch.py +++ b/twitch.py @@ -812,9 +812,11 @@ class Twitch: for channel in channels.values(): # check if there's any channels we can watch first if self.can_watch(channel): - if (active_drop := self.get_active_drop(channel)) is not None: + if ( + (active_campaign := self.get_active_campaign(channel)) is not None + and (active_drop := active_campaign.first_drop) is not None + ): active_drop.display(countdown=False, subone=True) - del active_drop break self.change_state(State.CHANNEL_SWITCH) del ( @@ -914,25 +916,29 @@ class Twitch: except GQLException: drop_data = None if drop_data is not None: - drop = self._drops.get(drop_data["dropID"]) - if drop is not None and drop.can_earn(channel): - drop.update_minutes(drop_data["currentMinutesWatched"]) - drop_text = ( - f"{drop.name} ({drop.campaign.game}, " - f"{drop.current_minutes}/{drop.required_minutes})" + gql_drop: TimedDrop | None = self._drops.get(drop_data["dropID"]) + if gql_drop is not None and gql_drop.can_earn(channel): + gql_drop.update_minutes(drop_data["currentMinutesWatched"]) + drop_text: str = ( + f"{gql_drop.name} ({gql_drop.campaign.game}, " + f"{gql_drop.current_minutes}/{gql_drop.required_minutes})" ) logger.log(CALL, f"Drop progress from GQL: {drop_text}") handled = True - # Solution 2: If GQL fails, figure out which drop we're most likely mining - # right now, and then bump up the minutes on that drop + # Solution 2: If GQL fails, figure out which campaign we're most likely mining + # right now, and then bump up the minutes on it's drops if not handled: - if (drop := self.get_active_drop(channel)) is not None: - drop.bump_minutes() - drop_text = ( - f"{drop.name} ({drop.campaign.game}, " - f"{drop.current_minutes}/{drop.required_minutes})" - ) + if (active_campaign := self.get_active_campaign(channel)) is not None: + active_campaign.bump_minutes(channel) + # NOTE: This usually gets overwritten below + drop_text = f"Unknown drop ({active_campaign.game})" + if (active_drop := active_campaign.first_drop) is not None: + active_drop.display() + drop_text = ( + f"{active_drop.name} ({active_drop.campaign.game}, " + f"{active_drop.current_minutes}/{active_drop.required_minutes})" + ) logger.log(CALL, f"Drop progress from active search: {drop_text}") handled = True else: @@ -1501,28 +1507,20 @@ class Twitch: self._mnt_task.cancel() self._mnt_task = asyncio.create_task(self._maintenance_task()) - def get_active_drop(self, channel: Channel | None = None) -> TimedDrop | None: + def get_active_campaign(self, channel: Channel | None = None) -> DropsCampaign | None: if not self.wanted_games: return None watching_channel = self.watching_channel.get_with_default(channel) if watching_channel is None: # if we aren't watching anything, we can't earn any drops return None - watching_game: Game | None = watching_channel.game - if watching_game is None: - # if the channel isn't playing anything in particular, we can't determine the drop - return None - drops: list[TimedDrop] = [] + campaigns: list[DropsCampaign] = [] for campaign in self.inventory: - if ( - campaign.game == watching_game - or campaign.has_badge_or_emote - and campaign.can_earn(watching_channel) - ): - drops.extend(drop for drop in campaign.drops if drop.can_earn(watching_channel)) - if drops: - drops.sort(key=lambda d: d.remaining_minutes) - return drops[0] + if campaign.can_earn(watching_channel): + campaigns.append(campaign) + if campaigns: + campaigns.sort(key=lambda c: c.remaining_minutes) + return campaigns[0] return None async def get_live_streams(