diff --git a/twitch.py b/twitch.py index a9b361f..e321b3a 100644 --- a/twitch.py +++ b/twitch.py @@ -55,6 +55,7 @@ class Twitch: self._state_change = asyncio.Event() self.game: Optional[Game] = None self.inventory: Dict[Game, List[DropsCampaign]] = {} + self._inv_lock = asyncio.Lock() # GUI self.gui = GUIManager(self) # Cookies, session and auth @@ -388,8 +389,8 @@ class Twitch: # ensure every 30 minutes that we don't have unclaimed points bonus await channel.claim_bonus() if i % 60 == 0: - # cleanup channels every hour - self.change_state(State.CHANNELS_CLEANUP) + # refresh inventory and cleanup channels every hour + self.change_state(State.INVENTORY_FETCH) i = (i + 1) % 3600 await self._watch_sleep(last_watch + interval - time()) @@ -483,59 +484,56 @@ class Twitch: if msg_type not in ("drop-progress", "drop-claim"): return drop_id: str = message["data"]["drop_id"] - drop: Optional[TimedDrop] = self.get_drop(drop_id) - if msg_type == "drop-claim": - if drop is None: - logger.error( - f"Received a drop claim ID for a non-existing drop: {drop_id}\n" - f"Drop claim ID: {message['data']['drop_instance_id']}" - ) + async with self._inv_lock: + drop: Optional[TimedDrop] = self.get_drop(drop_id) + if msg_type == "drop-claim": + if drop is None: + logger.error( + f"Received a drop claim ID for a non-existing drop: {drop_id}\n" + f"Drop claim ID: {message['data']['drop_instance_id']}" + ) + return + drop.update_claim(message["data"]["drop_instance_id"]) + campaign = drop.campaign + mined = await drop.claim() + if mined: + claim_text = ( + f"{drop.rewards_text()} " + f"({campaign.claimed_drops}/{campaign.total_drops})" + ) + self.gui.print(f"Claimed drop: {claim_text}") + self.gui.tray.notify(claim_text, "Mined Drop") + else: + logger.error(f"Drop claim failed! Drop ID: {drop_id}") + # About 4-20s after claiming the drop, next drop can be started + # by re-sending the watch payload. We can test for it by fetching the current drop + # via GQL, and then comparing drop IDs. + await asyncio.sleep(4) + for attempt in range(8): + context = await self.gql_request(GQL_OPERATIONS["CurrentDrop"]) + drop_data: JsonType = context["data"]["currentUser"]["dropCurrentSession"] + if drop_data["dropID"] != drop.id: + break + await asyncio.sleep(2) + self.restart_watching() return - drop.update_claim(message["data"]["drop_instance_id"]) - campaign = drop.campaign - mined = await drop.claim() - if mined: - claim_text = ( - f"{drop.rewards_text()} " - f"({campaign.claimed_drops}/{campaign.total_drops})" - ) - self.gui.print(f"Claimed drop: {claim_text}") - self.gui.tray.notify(claim_text, "Mined Drop") + assert msg_type == "drop-progress" + watching_channel = self.watching_channel.get_with_default(None) + if self._drop_update is None: + # we aren't actually waiting for a progress update right now, so we can just + # ignore the event this time + return + elif drop is not None and drop.can_earn(watching_channel): + drop.update_minutes(message["data"]["current_progress_min"]) + drop.display() + # Let the watch loop know we've handled it here + self._drop_update.set_result(True) else: - logger.error(f"Drop claim failed! Drop ID: {drop_id}") - # About 4-20s after claiming the drop, next drop can be started - # by re-sending the watch payload. We can test for it by fetching the current drop - # via GQL, and then comparing drop IDs. - await asyncio.sleep(4) - for attempt in range(8): - context = await self.gql_request(GQL_OPERATIONS["CurrentDrop"]) - drop_data: JsonType = context["data"]["currentUser"]["dropCurrentSession"] - if drop_data["dropID"] != drop.id: - # When two drops are ready to be claimed, we receive an update only for one. - # This means we have to refresh the games list and claim any additional drops - # after each earned drop, claimed or not. - break - await asyncio.sleep(2) - # This also restarts the watch loop as needed here. - self.change_state(State.GAMES_UPDATE) - return - assert msg_type == "drop-progress" - watching_channel = self.watching_channel.get_with_default(None) - if self._drop_update is None: - # we aren't actually waiting for a progress update right now, so we can just - # ignore the event this time - return - elif drop is not None and drop.can_earn(watching_channel): - drop.update_minutes(message["data"]["current_progress_min"]) - drop.display() - # Let the watch loop know we've handled it here - self._drop_update.set_result(True) - else: - # Sometimes, the drop update we receive doesn't actually match what we're mining. - # This is a Twitch bug workaround: signal the watch loop to use GQL - # to get the current drop progress instead. - self._drop_update.set_result(False) - self._drop_update = None + # Sometimes, the drop update we receive doesn't actually match what we're mining. + # This is a Twitch bug workaround: signal the watch loop to use GQL + # to get the current drop progress instead. + self._drop_update.set_result(False) + self._drop_update = None @task_wrapper async def process_points(self, user_id: int, message: JsonType): @@ -771,42 +769,43 @@ class Twitch: return DropsCampaign(self, response["data"]["user"]["dropCampaign"], claimed_benefits) async def fetch_inventory(self) -> None: - # fetch all available campaign IDs, that are currently ACTIVE and account is connected - response = await self.gql_request(GQL_OPERATIONS["Campaigns"]) - data = response["data"]["currentUser"]["dropCampaigns"] or [] - applicable_statuses = ("ACTIVE", "UPCOMING") - available_campaigns: Set[str] = set( - c["id"] for c in data - if c["status"] in applicable_statuses and c["self"]["isAccountConnected"] - ) - # fetch in-progress campaigns (inventory) - response = await self.gql_request(GQL_OPERATIONS["Inventory"]) - inventory = response["data"]["currentUser"]["inventory"] - ongoing_campaigns = inventory["dropCampaignsInProgress"] or [] - # this contains claimed benefit edge IDs, not drop IDs - claimed_benefits: Dict[str, datetime] = { - b["id"]: timestamp(b["lastAwardedAt"]) for b in inventory["gameEventDrops"] - } - campaigns: List[DropsCampaign] = [ - DropsCampaign(self, campaign_data, claimed_benefits) - for campaign_data in ongoing_campaigns - ] - # filter out in-progress campaigns from all available campaigns, - # since we already have all information needed for them - for campaign in campaigns: - available_campaigns.discard(campaign.id) - # add campaigns that remained, that can be earned but are not in-progress yet - for campaign_id in available_campaigns: - campaign = await self.fetch_campaign(campaign_id, claimed_benefits) - if any(drop.can_earn() for drop in campaign.drops): - campaigns.append(campaign) - campaigns.sort(key=lambda c: c.ends_at) - self.inventory.clear() - for campaign in campaigns: - game = campaign.game - if game not in self.inventory: - self.inventory[game] = [] - self.inventory[game].append(campaign) + async with self._inv_lock: + # fetch all available campaign IDs, that are currently ACTIVE and account is connected + response = await self.gql_request(GQL_OPERATIONS["Campaigns"]) + data = response["data"]["currentUser"]["dropCampaigns"] or [] + applicable_statuses = ("ACTIVE", "UPCOMING") + available_campaigns: Set[str] = set( + c["id"] for c in data + if c["status"] in applicable_statuses and c["self"]["isAccountConnected"] + ) + # fetch in-progress campaigns (inventory) + response = await self.gql_request(GQL_OPERATIONS["Inventory"]) + inventory = response["data"]["currentUser"]["inventory"] + ongoing_campaigns = inventory["dropCampaignsInProgress"] or [] + # this contains claimed benefit edge IDs, not drop IDs + claimed_benefits: Dict[str, datetime] = { + b["id"]: timestamp(b["lastAwardedAt"]) for b in inventory["gameEventDrops"] + } + campaigns: List[DropsCampaign] = [ + DropsCampaign(self, campaign_data, claimed_benefits) + for campaign_data in ongoing_campaigns + ] + # filter out in-progress campaigns from all available campaigns, + # since we already have all information needed for them + for campaign in campaigns: + available_campaigns.discard(campaign.id) + # add campaigns that remained, that can be earned but are not in-progress yet + for campaign_id in available_campaigns: + campaign = await self.fetch_campaign(campaign_id, claimed_benefits) + if any(drop.can_earn() for drop in campaign.drops): + campaigns.append(campaign) + campaigns.sort(key=lambda c: c.ends_at) + self.inventory.clear() + for campaign in campaigns: + game = campaign.game + if game not in self.inventory: + self.inventory[game] = [] + self.inventory[game].append(campaign) def get_drop(self, drop_id: str) -> Optional[TimedDrop]: """