Fix asyncio.as_completed not cancelling its tasks

This commit is contained in:
DevilXD
2024-09-21 10:10:16 +02:00
parent c08199da67
commit 8ee49bebf0

105
twitch.py
View File

@@ -1458,18 +1458,21 @@ class Twitch:
if c["status"] in applicable_statuses # that are currently not expired
}
# fetch detailed data for each campaign, in chunks
# specifically use an intermediate list per a Python bug
# https://github.com/python/cpython/issues/88342
status_update(_("gui", "status", "fetching_campaigns"))
for chunk_coro in asyncio.as_completed(
[
self.fetch_campaigns(campaigns_chunk)
for campaigns_chunk in chunk(available_campaigns.items(), 20)
]
):
chunk_campaigns_data = await chunk_coro
# merge the inventory and campaigns datas together
inventory_data = self._merge_data(inventory_data, chunk_campaigns_data)
fetch_campaigns_tasks: list[asyncio.Task[Any]] = [
asyncio.create_task(self.fetch_campaigns(campaigns_chunk))
for campaigns_chunk in chunk(available_campaigns.items(), 20)
]
try:
for coro in asyncio.as_completed(fetch_campaigns_tasks):
chunk_campaigns_data = await coro
# merge the inventory and campaigns datas together
inventory_data = self._merge_data(inventory_data, chunk_campaigns_data)
except Exception:
# asyncio.as_completed doesn't cancel tasks on errors
for task in fetch_campaigns_tasks:
task.cancel()
raise
if self.settings.dump:
# dump the campaigns data to the dump file
@@ -1521,20 +1524,26 @@ class Twitch:
status_update(
_("gui", "status", "adding_campaigns").format(counter=f"(0/{len(campaigns)})")
)
for i, coro in enumerate(
asyncio.as_completed([
asyncio.create_task(self.gui.inv.add_campaign(campaign))
for campaign in campaigns
]),
start=1,
):
await coro
status_update(
_("gui", "status", "adding_campaigns").format(counter=f"({i}/{len(campaigns)})")
)
# this is needed here explicitly, because cache reads from disk don't raise this
if self.gui.close_requested:
raise ExitRequest()
add_campaign_tasks: list[asyncio.Task[None]] = [
asyncio.create_task(self.gui.inv.add_campaign(campaign))
for campaign in campaigns
]
try:
for i, coro in enumerate(asyncio.as_completed(add_campaign_tasks), start=1):
await coro
status_update(
_("gui", "status", "adding_campaigns").format(
counter=f"({i}/{len(campaigns)})"
)
)
# this is needed here explicitly, because cache reads from disk don't raise this
if self.gui.close_requested:
raise ExitRequest()
except Exception:
# asyncio.as_completed doesn't cancel tasks on errors
for task in add_campaign_tasks:
task.cancel()
raise
self._mnt_triggers.extend(sorted(switch_triggers))
# trim out all triggers that we're already past
now = datetime.now(timezone.utc)
@@ -1615,14 +1624,21 @@ class Twitch:
# shortcut for nothing to process
# NOTE: Have to do this here, becase "channels" can be any iterable
return
for coro in asyncio.as_completed([
self.gql_request(stream_gql_chunk)
stream_gql_tasks: list[asyncio.Task[list[JsonType]]] = [
asyncio.create_task(self.gql_request(stream_gql_chunk))
for stream_gql_chunk in chunk(stream_gql_ops, 20)
]):
response_list: list[JsonType] = await coro
for response_json in response_list:
channel_data: JsonType = response_json["data"]["user"]
acl_streams_map[int(channel_data["id"])] = channel_data
]
try:
for coro in asyncio.as_completed(stream_gql_tasks):
response_list: list[JsonType] = await coro
for response_json in response_list:
channel_data: JsonType = response_json["data"]["user"]
acl_streams_map[int(channel_data["id"])] = channel_data
except Exception:
# asyncio.as_completed doesn't cancel tasks on errors
for task in stream_gql_tasks:
task.cancel()
raise
# for all channels with an active stream, check the available drops as well
acl_available_drops_map: dict[int, list[JsonType]] = {}
available_gql_ops: list[GQLOperation] = [
@@ -1630,16 +1646,23 @@ class Twitch:
for channel_id, channel_data in acl_streams_map.items()
if channel_data["stream"] is not None # only do this for ONLINE channels
]
for coro in asyncio.as_completed([
self.gql_request(available_gql_chunk)
available_gql_tasks: list[asyncio.Task[list[JsonType]]] = [
asyncio.create_task(self.gql_request(available_gql_chunk))
for available_gql_chunk in chunk(available_gql_ops, 20)
]):
response_list = await coro
for response_json in response_list:
available_info: JsonType = response_json["data"]["channel"]
acl_available_drops_map[int(available_info["id"])] = (
available_info["viewerDropCampaigns"] or []
)
]
try:
for coro in asyncio.as_completed(available_gql_tasks):
response_list = await coro
for response_json in response_list:
available_info: JsonType = response_json["data"]["channel"]
acl_available_drops_map[int(available_info["id"])] = (
available_info["viewerDropCampaigns"] or []
)
except Exception:
# asyncio.as_completed doesn't cancel tasks on errors
for task in available_gql_tasks:
task.cancel()
raise
for channel in channels:
channel_id = channel.id
if channel_id not in acl_streams_map: