From 8ee49bebf0adc7a115eeeb53723034c75fa8fed8 Mon Sep 17 00:00:00 2001 From: DevilXD <4180725+DevilXD@users.noreply.github.com> Date: Sat, 21 Sep 2024 10:10:16 +0200 Subject: [PATCH] Fix asyncio.as_completed not cancelling its tasks --- twitch.py | 105 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 64 insertions(+), 41 deletions(-) diff --git a/twitch.py b/twitch.py index 57484b0..a142099 100644 --- a/twitch.py +++ b/twitch.py @@ -1458,18 +1458,21 @@ class Twitch: if c["status"] in applicable_statuses # that are currently not expired } # fetch detailed data for each campaign, in chunks - # specifically use an intermediate list per a Python bug - # https://github.com/python/cpython/issues/88342 status_update(_("gui", "status", "fetching_campaigns")) - for chunk_coro in asyncio.as_completed( - [ - self.fetch_campaigns(campaigns_chunk) - for campaigns_chunk in chunk(available_campaigns.items(), 20) - ] - ): - chunk_campaigns_data = await chunk_coro - # merge the inventory and campaigns datas together - inventory_data = self._merge_data(inventory_data, chunk_campaigns_data) + fetch_campaigns_tasks: list[asyncio.Task[Any]] = [ + asyncio.create_task(self.fetch_campaigns(campaigns_chunk)) + for campaigns_chunk in chunk(available_campaigns.items(), 20) + ] + try: + for coro in asyncio.as_completed(fetch_campaigns_tasks): + chunk_campaigns_data = await coro + # merge the inventory and campaigns datas together + inventory_data = self._merge_data(inventory_data, chunk_campaigns_data) + except Exception: + # asyncio.as_completed doesn't cancel tasks on errors + for task in fetch_campaigns_tasks: + task.cancel() + raise if self.settings.dump: # dump the campaigns data to the dump file @@ -1521,20 +1524,26 @@ class Twitch: status_update( _("gui", "status", "adding_campaigns").format(counter=f"(0/{len(campaigns)})") ) - for i, coro in enumerate( - asyncio.as_completed([ - asyncio.create_task(self.gui.inv.add_campaign(campaign)) - for campaign in campaigns - ]), - start=1, - ): - await coro - status_update( - _("gui", "status", "adding_campaigns").format(counter=f"({i}/{len(campaigns)})") - ) - # this is needed here explicitly, because cache reads from disk don't raise this - if self.gui.close_requested: - raise ExitRequest() + add_campaign_tasks: list[asyncio.Task[None]] = [ + asyncio.create_task(self.gui.inv.add_campaign(campaign)) + for campaign in campaigns + ] + try: + for i, coro in enumerate(asyncio.as_completed(add_campaign_tasks), start=1): + await coro + status_update( + _("gui", "status", "adding_campaigns").format( + counter=f"({i}/{len(campaigns)})" + ) + ) + # this is needed here explicitly, because cache reads from disk don't raise this + if self.gui.close_requested: + raise ExitRequest() + except Exception: + # asyncio.as_completed doesn't cancel tasks on errors + for task in add_campaign_tasks: + task.cancel() + raise self._mnt_triggers.extend(sorted(switch_triggers)) # trim out all triggers that we're already past now = datetime.now(timezone.utc) @@ -1615,14 +1624,21 @@ class Twitch: # shortcut for nothing to process # NOTE: Have to do this here, becase "channels" can be any iterable return - for coro in asyncio.as_completed([ - self.gql_request(stream_gql_chunk) + stream_gql_tasks: list[asyncio.Task[list[JsonType]]] = [ + asyncio.create_task(self.gql_request(stream_gql_chunk)) for stream_gql_chunk in chunk(stream_gql_ops, 20) - ]): - response_list: list[JsonType] = await coro - for response_json in response_list: - channel_data: JsonType = response_json["data"]["user"] - acl_streams_map[int(channel_data["id"])] = channel_data + ] + try: + for coro in asyncio.as_completed(stream_gql_tasks): + response_list: list[JsonType] = await coro + for response_json in response_list: + channel_data: JsonType = response_json["data"]["user"] + acl_streams_map[int(channel_data["id"])] = channel_data + except Exception: + # asyncio.as_completed doesn't cancel tasks on errors + for task in stream_gql_tasks: + task.cancel() + raise # for all channels with an active stream, check the available drops as well acl_available_drops_map: dict[int, list[JsonType]] = {} available_gql_ops: list[GQLOperation] = [ @@ -1630,16 +1646,23 @@ class Twitch: for channel_id, channel_data in acl_streams_map.items() if channel_data["stream"] is not None # only do this for ONLINE channels ] - for coro in asyncio.as_completed([ - self.gql_request(available_gql_chunk) + available_gql_tasks: list[asyncio.Task[list[JsonType]]] = [ + asyncio.create_task(self.gql_request(available_gql_chunk)) for available_gql_chunk in chunk(available_gql_ops, 20) - ]): - response_list = await coro - for response_json in response_list: - available_info: JsonType = response_json["data"]["channel"] - acl_available_drops_map[int(available_info["id"])] = ( - available_info["viewerDropCampaigns"] or [] - ) + ] + try: + for coro in asyncio.as_completed(available_gql_tasks): + response_list = await coro + for response_json in response_list: + available_info: JsonType = response_json["data"]["channel"] + acl_available_drops_map[int(available_info["id"])] = ( + available_info["viewerDropCampaigns"] or [] + ) + except Exception: + # asyncio.as_completed doesn't cancel tasks on errors + for task in available_gql_tasks: + task.cancel() + raise for channel in channels: channel_id = channel.id if channel_id not in acl_streams_map: