Implement channel status bulk-check logic

This commit is contained in:
DevilXD
2024-09-15 16:28:50 +02:00
parent 06d495c026
commit b9dfc75398
2 changed files with 72 additions and 13 deletions

View File

@@ -14,7 +14,7 @@ from constants import CALL, GQL_OPERATIONS, ONLINE_DELAY, URLType
if TYPE_CHECKING:
from twitch import Twitch
from gui import ChannelList
from constants import JsonType
from constants import JsonType, GQLOperation
logger = logging.getLogger("TwitchDrops")
@@ -173,6 +173,10 @@ class Channel:
def __hash__(self) -> int:
return self.id
@property
def stream_gql(self) -> GQLOperation:
return GQL_OPERATIONS["GetStreamInfo"].with_variables({"channel": self._login})
@property
def name(self) -> str:
if self._display_name is not None:
@@ -245,11 +249,25 @@ class Channel:
self._pending_stream_up = None
self._gui_channels.remove(self)
def external_update(self, channel_data: JsonType, available_drops: list[JsonType]):
"""
Update stream information based on data provided externally.
Used for bulk-updates of channel statuses during reload.
"""
if not channel_data["stream"]:
self._stream = None
return
stream = Stream.from_get_stream(self, channel_data)
if not stream.drops_enabled:
stream.drops_enabled = any(
bool(campaign["timeBasedDrops"]) for campaign in available_drops
)
self._stream = stream
async def get_stream(self) -> Stream | None:
try:
response: JsonType = await self._twitch.gql_request(
GQL_OPERATIONS["GetStreamInfo"].with_variables({"channel": self._login})
)
response: JsonType = await self._twitch.gql_request(self.stream_gql)
except MinerException as exc:
raise MinerException(f"Channel: {self._login}") from exc
channel_data: JsonType | None = response["data"]["user"]
@@ -277,7 +295,7 @@ class Channel:
)
return stream
async def update_stream(self, *, trigger_events: bool) -> bool:
async def update_stream(self) -> bool:
"""
Fetches the current channel stream, and if one exists,
updates it's game, title, tags and viewers. Updates channel status in general.
@@ -287,8 +305,7 @@ class Channel:
"""
old_stream = self._stream
self._stream = await self.get_stream()
if trigger_events:
self._twitch.on_channel_update(self, old_stream, self._stream)
self._twitch.on_channel_update(self, old_stream, self._stream)
return self._stream is not None
async def _online_delay(self):
@@ -298,7 +315,7 @@ class Channel:
"""
await asyncio.sleep(ONLINE_DELAY.total_seconds())
self._pending_stream_up = None # for 'display' to work properly
await self.update_stream(trigger_events=True) # triggers 'display' via the event
await self.update_stream()
def check_online(self):
"""

View File

@@ -744,11 +744,7 @@ class Twitch:
# remove all ACL channels that already exist from the other set
acl_channels.difference_update(new_channels)
# use the other set to set them online if possible
# if acl_channels:
# await asyncio.gather(*(
# channel.update_stream(trigger_events=False)
# for channel in acl_channels
# ))
await self.bulk_check_online(acl_channels)
# finally, add them as new channels
new_channels.update(acl_channels)
for game in no_acl:
@@ -1610,3 +1606,49 @@ class Twitch:
{"input": {"channelID": str(channel_id), "claimID": claim_id}}
)
)
async def bulk_check_online(self, channels: abc.Iterable[Channel]):
"""
Utilize batch GQL requests to check ONLINE status for a lot of channels at once.
Also handles the drops_enabled check.
"""
acl_streams_map: dict[int, JsonType] = {}
stream_gql_ops: list[GQLOperation] = [channel.stream_gql for channel in channels]
if not stream_gql_ops:
# shortcut for nothing to process
# NOTE: Have to do this here, becase "channels" can be any iterable
return
for coro in asyncio.as_completed([
self.gql_request(stream_gql_chunk)
for stream_gql_chunk in chunk(stream_gql_ops, 20)
]):
response_list: list[JsonType] = await coro
for response_json in response_list:
channel_data: JsonType = response_json["data"]["user"]
acl_streams_map[int(channel_data["id"])] = channel_data
# for all channels with an active stream, check the available drops as well
acl_available_drops_map: dict[int, list[JsonType]] = {}
available_gql_ops: list[GQLOperation] = [
GQL_OPERATIONS["AvailableDrops"].with_variables({"channelID": str(channel_id)})
for channel_id, channel_data in acl_streams_map.items()
if channel_data["stream"] is not None # only do this for ONLINE channels
]
for coro in asyncio.as_completed([
self.gql_request(available_gql_chunk)
for available_gql_chunk in chunk(available_gql_ops, 20)
]):
response_list = await coro
for response_json in response_list:
available_info: JsonType = response_json["data"]["channel"]
acl_available_drops_map[int(available_info["id"])] = (
available_info["viewerDropCampaigns"] or []
)
for channel in channels:
channel_id = channel.id
if channel_id not in acl_streams_map:
continue
channel_data = acl_streams_map[channel_id]
if channel_data["stream"] is None:
continue
available_drops: list[JsonType] = acl_available_drops_map[channel_id]
channel.external_update(channel_data, available_drops)