mirror of
https://github.com/rangermix/TwitchDropsMiner.git
synced 2026-06-06 04:19:39 +00:00
Implement drop progression tracking across all drops in a campaign
This commit is contained in:
@@ -310,7 +310,7 @@ class Channel:
|
||||
return any(
|
||||
(
|
||||
(campaign := self._twitch._campaigns.get(campaign_data["id"])) is not None
|
||||
and campaign.can_earn(self)
|
||||
and campaign.can_earn(self, ignore_channel_status=True)
|
||||
)
|
||||
for campaign_data in available_drops
|
||||
)
|
||||
|
||||
@@ -113,6 +113,7 @@ URLType = NewType("URLType", str)
|
||||
TopicProcess: TypeAlias = "abc.Callable[[int, JsonType], Any]"
|
||||
# Values
|
||||
MAX_INT = sys.maxsize
|
||||
MAX_EXTRA_MINUTES = 15
|
||||
BASE_TOPICS = 2
|
||||
MAX_WEBSOCKETS = 8
|
||||
WS_TOPICS_LIMIT = 50
|
||||
|
||||
213
inventory.py
213
inventory.py
@@ -11,16 +11,15 @@ from datetime import datetime, timedelta, timezone
|
||||
|
||||
from translate import _
|
||||
from channel import Channel
|
||||
from utils import timestamp, Game
|
||||
from exceptions import GQLException
|
||||
from constants import GQL_OPERATIONS, URLType
|
||||
from utils import timestamp, invalidate_cache, Game
|
||||
from constants import GQL_OPERATIONS, MAX_EXTRA_MINUTES, URLType, State
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections import abc
|
||||
|
||||
from twitch import Twitch
|
||||
from constants import JsonType
|
||||
from gui import GUIManager, InventoryOverview
|
||||
|
||||
|
||||
logger = logging.getLogger("TwitchDrops")
|
||||
@@ -100,11 +99,14 @@ class BaseDrop:
|
||||
additional = ''
|
||||
return f"Drop({self.rewards_text()}{additional})"
|
||||
|
||||
@cached_property
|
||||
@property
|
||||
def preconditions_met(self) -> bool:
|
||||
campaign = self.campaign
|
||||
return all(campaign.timed_drops[pid].is_claimed for pid in self.precondition_drops)
|
||||
|
||||
def _on_state_changed(self) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def _base_earn_conditions(self) -> bool:
|
||||
# define when a drop can be earned or not
|
||||
return (
|
||||
@@ -122,16 +124,21 @@ class BaseDrop:
|
||||
and self.starts_at <= datetime.now(timezone.utc) < self.ends_at
|
||||
)
|
||||
|
||||
def can_earn(self, channel: Channel | None = None) -> bool:
|
||||
return self._base_can_earn() and self.campaign._base_can_earn(channel)
|
||||
|
||||
def can_earn_within(self, stamp: datetime) -> bool:
|
||||
def _can_earn_within(self, stamp: datetime) -> bool:
|
||||
# NOTE: This does not check the campaign's eligibility or active status
|
||||
return (
|
||||
self._base_earn_conditions()
|
||||
and self.ends_at > datetime.now(timezone.utc)
|
||||
and self.starts_at < stamp
|
||||
)
|
||||
|
||||
def can_earn(
|
||||
self, channel: Channel | None = None, ignore_channel_status: bool = False
|
||||
) -> bool:
|
||||
return (
|
||||
self._base_can_earn() and self.campaign._base_can_earn(channel, ignore_channel_status)
|
||||
)
|
||||
|
||||
@property
|
||||
def can_claim(self) -> bool:
|
||||
# https://help.twitch.tv/s/article/mission-based-drops?language=en_US#claiming
|
||||
@@ -143,9 +150,6 @@ class BaseDrop:
|
||||
and datetime.now(timezone.utc) < self.campaign.ends_at + timedelta(hours=24)
|
||||
)
|
||||
|
||||
def _on_claim(self) -> None:
|
||||
invalidate_cache(self, "preconditions_met")
|
||||
|
||||
def update_claim(self, claim_id: str):
|
||||
self.claim_id = claim_id
|
||||
|
||||
@@ -164,9 +168,6 @@ class BaseDrop:
|
||||
result = await self._claim()
|
||||
if result:
|
||||
self.is_claimed = result
|
||||
# notify the campaign about claiming
|
||||
# this will cause it to call our _on_claim, so no need to call it ourselves here
|
||||
self.campaign._on_claim()
|
||||
claim_text = (
|
||||
f"{self.campaign.game.name}\n"
|
||||
f"{self.rewards_text()} "
|
||||
@@ -208,7 +209,7 @@ class BaseDrop:
|
||||
return False
|
||||
elif (
|
||||
data["claimDropRewards"]["status"]
|
||||
in ["ELIGIBLE_FOR_ALL", "DROP_INSTANCE_ALREADY_CLAIMED"]
|
||||
in ("ELIGIBLE_FOR_ALL", "DROP_INSTANCE_ALREADY_CLAIMED")
|
||||
):
|
||||
return True
|
||||
return False
|
||||
@@ -219,13 +220,14 @@ class TimedDrop(BaseDrop):
|
||||
self, campaign: DropsCampaign, data: JsonType, claimed_benefits: dict[str, datetime]
|
||||
):
|
||||
super().__init__(campaign, data, claimed_benefits)
|
||||
self._manager: GUIManager = self._twitch.gui
|
||||
self._gui_inv: InventoryOverview = self._manager.inv
|
||||
self.current_minutes: int = "self" in data and data["self"]["currentMinutesWatched"] or 0
|
||||
self.real_current_minutes: int = (
|
||||
"self" in data and data["self"]["currentMinutesWatched"] or 0
|
||||
)
|
||||
self.required_minutes: int = data["requiredMinutesWatched"]
|
||||
self.extra_current_minutes: int = 0
|
||||
if self.is_claimed:
|
||||
# claimed drops may report inconsistent current minutes, so we need to overwrite them
|
||||
self.current_minutes = self.required_minutes
|
||||
self.real_current_minutes = self.required_minutes
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if self.is_claimed:
|
||||
@@ -240,11 +242,15 @@ class TimedDrop(BaseDrop):
|
||||
minutes = ''
|
||||
return f"Drop({self.rewards_text()}{minutes}{additional})"
|
||||
|
||||
@cached_property
|
||||
@property
|
||||
def current_minutes(self) -> int:
|
||||
return self.real_current_minutes + self.extra_current_minutes
|
||||
|
||||
@property
|
||||
def remaining_minutes(self) -> int:
|
||||
return self.required_minutes - self.current_minutes
|
||||
|
||||
@cached_property
|
||||
@property
|
||||
def total_required_minutes(self) -> int:
|
||||
return self.required_minutes + max(
|
||||
(
|
||||
@@ -254,7 +260,7 @@ class TimedDrop(BaseDrop):
|
||||
default=0,
|
||||
)
|
||||
|
||||
@cached_property
|
||||
@property
|
||||
def total_remaining_minutes(self) -> int:
|
||||
return self.remaining_minutes + max(
|
||||
(
|
||||
@@ -264,7 +270,7 @@ class TimedDrop(BaseDrop):
|
||||
default=0,
|
||||
)
|
||||
|
||||
@cached_property
|
||||
@property
|
||||
def progress(self) -> float:
|
||||
if self.current_minutes <= 0 or self.required_minutes <= 0:
|
||||
return 0.0
|
||||
@@ -280,45 +286,55 @@ class TimedDrop(BaseDrop):
|
||||
return math.inf
|
||||
|
||||
def _base_earn_conditions(self) -> bool:
|
||||
return super()._base_earn_conditions() and self.required_minutes > 0
|
||||
return (
|
||||
super()._base_earn_conditions()
|
||||
and self.required_minutes > 0
|
||||
# NOTE: This may be a bad idea, as it invalidates the can_earn status
|
||||
# and provides no way to recover from this state until the next reload.
|
||||
and self.extra_current_minutes < MAX_EXTRA_MINUTES
|
||||
)
|
||||
|
||||
def _on_claim(self) -> None:
|
||||
result = super()._on_claim()
|
||||
self._gui_inv.update_drop(self)
|
||||
return result
|
||||
def _on_state_changed(self) -> None:
|
||||
self._twitch.gui.inv.update_drop(self)
|
||||
|
||||
def _on_minutes_changed(self) -> None:
|
||||
invalidate_cache(self, "progress", "remaining_minutes")
|
||||
self.campaign._on_minutes_changed()
|
||||
self._gui_inv.update_drop(self)
|
||||
def _update_real_minutes(self, delta: int) -> None:
|
||||
if delta == 0 or self.real_current_minutes + delta < 0 or not self.can_earn():
|
||||
return
|
||||
if self.real_current_minutes + delta < self.required_minutes:
|
||||
self.real_current_minutes += delta
|
||||
else:
|
||||
self.real_current_minutes = self.required_minutes
|
||||
self.extra_current_minutes = 0
|
||||
self._on_state_changed()
|
||||
|
||||
def _on_total_minutes_changed(self) -> None:
|
||||
invalidate_cache(self, "total_required_minutes", "total_remaining_minutes")
|
||||
def _bump_minutes(self, channel: Channel | None) -> bool:
|
||||
if self.can_earn(channel):
|
||||
self.extra_current_minutes += 1
|
||||
self._on_state_changed()
|
||||
if self.extra_current_minutes >= MAX_EXTRA_MINUTES:
|
||||
return True
|
||||
return False
|
||||
|
||||
async def claim(self) -> bool:
|
||||
result = await super().claim()
|
||||
if result:
|
||||
self.current_minutes = self.required_minutes
|
||||
self.real_current_minutes = self.required_minutes
|
||||
self.extra_current_minutes = 0
|
||||
self._on_state_changed()
|
||||
return result
|
||||
|
||||
def update_minutes(self, minutes: int):
|
||||
if minutes < 0:
|
||||
return
|
||||
elif minutes <= self.required_minutes:
|
||||
self.current_minutes = minutes
|
||||
else:
|
||||
self.current_minutes = self.required_minutes
|
||||
self._on_minutes_changed()
|
||||
self.display()
|
||||
|
||||
def display(self, *, countdown: bool = True, subone: bool = False):
|
||||
self._manager.display_drop(self, countdown=countdown, subone=subone)
|
||||
self._twitch.gui.display_drop(self, countdown=countdown, subone=subone)
|
||||
|
||||
def bump_minutes(self):
|
||||
if self.current_minutes < self.required_minutes:
|
||||
self.current_minutes += 1
|
||||
self._on_minutes_changed()
|
||||
self.display()
|
||||
def update_minutes(self, new_minutes: int):
|
||||
delta: int = new_minutes - self.real_current_minutes
|
||||
if delta == 0:
|
||||
return
|
||||
elif self.real_current_minutes + delta < 0:
|
||||
delta = -self.real_current_minutes
|
||||
elif self.real_current_minutes + delta > self.required_minutes:
|
||||
delta = self.required_minutes - self.real_current_minutes
|
||||
self.campaign._update_real_minutes(delta)
|
||||
|
||||
|
||||
class DropsCampaign:
|
||||
@@ -387,27 +403,27 @@ class DropsCampaign:
|
||||
benefit.type.is_badge_or_emote() for drop in self.drops for benefit in drop.benefits
|
||||
)
|
||||
|
||||
@cached_property
|
||||
@property
|
||||
def finished(self) -> bool:
|
||||
return all(d.is_claimed or d.required_minutes <= 0 for d in self.drops)
|
||||
|
||||
@cached_property
|
||||
@property
|
||||
def claimed_drops(self) -> int:
|
||||
return sum(d.is_claimed for d in self.drops)
|
||||
|
||||
@cached_property
|
||||
@property
|
||||
def remaining_drops(self) -> int:
|
||||
return sum(not d.is_claimed for d in self.drops)
|
||||
|
||||
@cached_property
|
||||
@property
|
||||
def required_minutes(self) -> int:
|
||||
return max(d.total_required_minutes for d in self.drops)
|
||||
|
||||
@cached_property
|
||||
@property
|
||||
def remaining_minutes(self) -> int:
|
||||
return max(d.total_remaining_minutes for d in self.drops)
|
||||
|
||||
@cached_property
|
||||
@property
|
||||
def progress(self) -> float:
|
||||
return sum(d.progress for d in self.drops) / self.total_drops
|
||||
|
||||
@@ -415,6 +431,44 @@ class DropsCampaign:
|
||||
def availability(self) -> float:
|
||||
return min(d.availability for d in self.drops)
|
||||
|
||||
@property
|
||||
def first_drop(self) -> TimedDrop | None:
|
||||
drops: list[TimedDrop] = sorted(
|
||||
(drop for drop in self.drops if drop.can_earn()),
|
||||
key=lambda d: d.remaining_minutes,
|
||||
)
|
||||
return drops[0] if drops else None
|
||||
|
||||
def _update_real_minutes(self, delta: int) -> None:
|
||||
for drop in self.drops:
|
||||
drop._update_real_minutes(delta)
|
||||
if (first_drop := self.first_drop) is not None:
|
||||
first_drop.display()
|
||||
|
||||
def _base_can_earn(
|
||||
self, channel: Channel | None = None, ignore_channel_status: bool = False
|
||||
) -> bool:
|
||||
return (
|
||||
self.eligible # account is eligible
|
||||
and self.active # campaign is active (and valid)
|
||||
and (
|
||||
channel is None or ( # channel isn't specified,
|
||||
# or there's no ACL, or the channel is in the ACL
|
||||
(not self.allowed_channels or channel in self.allowed_channels)
|
||||
# and the channel is live and playing the campaign's game
|
||||
and (
|
||||
ignore_channel_status
|
||||
or channel.game is not None
|
||||
and channel.game == self.game
|
||||
or self.has_badge_or_emote
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
def get_drop(self, drop_id: str) -> TimedDrop | None:
|
||||
return self.timed_drops.get(drop_id)
|
||||
|
||||
def preconditions_chain(self) -> set[str]:
|
||||
return set(
|
||||
chain.from_iterable(
|
||||
@@ -422,30 +476,14 @@ class DropsCampaign:
|
||||
)
|
||||
)
|
||||
|
||||
def _on_claim(self) -> None:
|
||||
invalidate_cache(self, "finished", "claimed_drops", "remaining_drops")
|
||||
for drop in self.drops:
|
||||
drop._on_claim()
|
||||
|
||||
def _on_minutes_changed(self) -> None:
|
||||
invalidate_cache(self, "progress", "required_minutes", "remaining_minutes")
|
||||
for drop in self.drops:
|
||||
drop._on_total_minutes_changed()
|
||||
|
||||
def get_drop(self, drop_id: str) -> TimedDrop | None:
|
||||
return self.timed_drops.get(drop_id)
|
||||
|
||||
def _base_can_earn(self, channel: Channel | None = None) -> bool:
|
||||
return (
|
||||
self.eligible # account is eligible
|
||||
and self.active # campaign is active
|
||||
# channel isn't specified, or there's no ACL, or the channel is in the ACL
|
||||
and (channel is None or not self.allowed_channels or channel in self.allowed_channels)
|
||||
)
|
||||
|
||||
def can_earn(self, channel: Channel | None = None) -> bool:
|
||||
def can_earn(
|
||||
self, channel: Channel | None = None, ignore_channel_status: bool = False
|
||||
) -> bool:
|
||||
# True if any of the containing drops can be earned
|
||||
return self._base_can_earn(channel) and any(drop._base_can_earn() for drop in self.drops)
|
||||
return (
|
||||
self._base_can_earn(channel, ignore_channel_status)
|
||||
and any(drop._base_can_earn() for drop in self.drops)
|
||||
)
|
||||
|
||||
def can_earn_within(self, stamp: datetime) -> bool:
|
||||
# Same as can_earn, but doesn't check the channel
|
||||
@@ -455,5 +493,18 @@ class DropsCampaign:
|
||||
and self._valid
|
||||
and self.ends_at > datetime.now(timezone.utc)
|
||||
and self.starts_at < stamp
|
||||
and any(drop.can_earn_within(stamp) for drop in self.drops)
|
||||
and any(drop._can_earn_within(stamp) for drop in self.drops)
|
||||
)
|
||||
|
||||
def bump_minutes(self, channel: Channel) -> None:
|
||||
# NOTE: Use a temporary list to ensure all drops are bumped before checking
|
||||
if any([drop._bump_minutes(channel) for drop in self.drops]):
|
||||
# Executes if any drop's extra_current_minutes reach MAX_ESTIMATED_MINUTES
|
||||
# TODO: Figure out a better way to handle this case
|
||||
logger.warning(
|
||||
f"At least one of the drops in campaign \"{self.name}({self.game.name})\" "
|
||||
"has reached the maximum extra minutes limit!"
|
||||
)
|
||||
self._twitch.change_state(State.CHANNEL_SWITCH)
|
||||
if (first_drop := self.first_drop) is not None:
|
||||
first_drop.display()
|
||||
|
||||
60
twitch.py
60
twitch.py
@@ -812,9 +812,11 @@ class Twitch:
|
||||
for channel in channels.values():
|
||||
# check if there's any channels we can watch first
|
||||
if self.can_watch(channel):
|
||||
if (active_drop := self.get_active_drop(channel)) is not None:
|
||||
if (
|
||||
(active_campaign := self.get_active_campaign(channel)) is not None
|
||||
and (active_drop := active_campaign.first_drop) is not None
|
||||
):
|
||||
active_drop.display(countdown=False, subone=True)
|
||||
del active_drop
|
||||
break
|
||||
self.change_state(State.CHANNEL_SWITCH)
|
||||
del (
|
||||
@@ -914,25 +916,29 @@ class Twitch:
|
||||
except GQLException:
|
||||
drop_data = None
|
||||
if drop_data is not None:
|
||||
drop = self._drops.get(drop_data["dropID"])
|
||||
if drop is not None and drop.can_earn(channel):
|
||||
drop.update_minutes(drop_data["currentMinutesWatched"])
|
||||
drop_text = (
|
||||
f"{drop.name} ({drop.campaign.game}, "
|
||||
f"{drop.current_minutes}/{drop.required_minutes})"
|
||||
gql_drop: TimedDrop | None = self._drops.get(drop_data["dropID"])
|
||||
if gql_drop is not None and gql_drop.can_earn(channel):
|
||||
gql_drop.update_minutes(drop_data["currentMinutesWatched"])
|
||||
drop_text: str = (
|
||||
f"{gql_drop.name} ({gql_drop.campaign.game}, "
|
||||
f"{gql_drop.current_minutes}/{gql_drop.required_minutes})"
|
||||
)
|
||||
logger.log(CALL, f"Drop progress from GQL: {drop_text}")
|
||||
handled = True
|
||||
|
||||
# Solution 2: If GQL fails, figure out which drop we're most likely mining
|
||||
# right now, and then bump up the minutes on that drop
|
||||
# Solution 2: If GQL fails, figure out which campaign we're most likely mining
|
||||
# right now, and then bump up the minutes on it's drops
|
||||
if not handled:
|
||||
if (drop := self.get_active_drop(channel)) is not None:
|
||||
drop.bump_minutes()
|
||||
drop_text = (
|
||||
f"{drop.name} ({drop.campaign.game}, "
|
||||
f"{drop.current_minutes}/{drop.required_minutes})"
|
||||
)
|
||||
if (active_campaign := self.get_active_campaign(channel)) is not None:
|
||||
active_campaign.bump_minutes(channel)
|
||||
# NOTE: This usually gets overwritten below
|
||||
drop_text = f"Unknown drop ({active_campaign.game})"
|
||||
if (active_drop := active_campaign.first_drop) is not None:
|
||||
active_drop.display()
|
||||
drop_text = (
|
||||
f"{active_drop.name} ({active_drop.campaign.game}, "
|
||||
f"{active_drop.current_minutes}/{active_drop.required_minutes})"
|
||||
)
|
||||
logger.log(CALL, f"Drop progress from active search: {drop_text}")
|
||||
handled = True
|
||||
else:
|
||||
@@ -1501,28 +1507,20 @@ class Twitch:
|
||||
self._mnt_task.cancel()
|
||||
self._mnt_task = asyncio.create_task(self._maintenance_task())
|
||||
|
||||
def get_active_drop(self, channel: Channel | None = None) -> TimedDrop | None:
|
||||
def get_active_campaign(self, channel: Channel | None = None) -> DropsCampaign | None:
|
||||
if not self.wanted_games:
|
||||
return None
|
||||
watching_channel = self.watching_channel.get_with_default(channel)
|
||||
if watching_channel is None:
|
||||
# if we aren't watching anything, we can't earn any drops
|
||||
return None
|
||||
watching_game: Game | None = watching_channel.game
|
||||
if watching_game is None:
|
||||
# if the channel isn't playing anything in particular, we can't determine the drop
|
||||
return None
|
||||
drops: list[TimedDrop] = []
|
||||
campaigns: list[DropsCampaign] = []
|
||||
for campaign in self.inventory:
|
||||
if (
|
||||
campaign.game == watching_game
|
||||
or campaign.has_badge_or_emote
|
||||
and campaign.can_earn(watching_channel)
|
||||
):
|
||||
drops.extend(drop for drop in campaign.drops if drop.can_earn(watching_channel))
|
||||
if drops:
|
||||
drops.sort(key=lambda d: d.remaining_minutes)
|
||||
return drops[0]
|
||||
if campaign.can_earn(watching_channel):
|
||||
campaigns.append(campaign)
|
||||
if campaigns:
|
||||
campaigns.sort(key=lambda c: c.remaining_minutes)
|
||||
return campaigns[0]
|
||||
return None
|
||||
|
||||
async def get_live_streams(
|
||||
|
||||
Reference in New Issue
Block a user