From b9dfc75398caac63f9291ef4fff86eff0f4932f6 Mon Sep 17 00:00:00 2001 From: DevilXD <4180725+DevilXD@users.noreply.github.com> Date: Sun, 15 Sep 2024 16:28:50 +0200 Subject: [PATCH] Implement channel status bulk-check logic --- channel.py | 33 +++++++++++++++++++++++++-------- twitch.py | 52 +++++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 72 insertions(+), 13 deletions(-) diff --git a/channel.py b/channel.py index 3193cf37..d8c0f600 100644 --- a/channel.py +++ b/channel.py @@ -14,7 +14,7 @@ if TYPE_CHECKING: from twitch import Twitch from gui import ChannelList - from constants import JsonType + from constants import JsonType, GQLOperation logger = logging.getLogger("TwitchDrops") @@ -173,6 +173,10 @@ def __eq__(self, other: object) -> bool: def __hash__(self) -> int: return self.id + @property + def stream_gql(self) -> GQLOperation: + return GQL_OPERATIONS["GetStreamInfo"].with_variables({"channel": self._login}) + @property def name(self) -> str: if self._display_name is not None: @@ -245,11 +249,25 @@ def remove(self): self._pending_stream_up = None self._gui_channels.remove(self) + def external_update(self, channel_data: JsonType, available_drops: list[JsonType]): + """ + Update stream information based on data provided externally. + + Used for bulk-updates of channel statuses during reload. + """ + if not channel_data["stream"]: + self._stream = None + return + stream = Stream.from_get_stream(self, channel_data) + if not stream.drops_enabled: + stream.drops_enabled = any( + bool(campaign["timeBasedDrops"]) for campaign in available_drops + ) + self._stream = stream + async def get_stream(self) -> Stream | None: try: - response: JsonType = await self._twitch.gql_request( - GQL_OPERATIONS["GetStreamInfo"].with_variables({"channel": self._login}) - ) + response: JsonType = await self._twitch.gql_request(self.stream_gql) except MinerException as exc: raise MinerException(f"Channel: {self._login}") from exc channel_data: JsonType | None = response["data"]["user"] @@ -277,7 +295,7 @@ async def get_stream(self) -> Stream | None: ) return stream - async def update_stream(self, *, trigger_events: bool) -> bool: + async def update_stream(self) -> bool: """ Fetches the current channel stream, and if one exists, updates it's game, title, tags and viewers. Updates channel status in general. @@ -287,8 +305,7 @@ async def update_stream(self, *, trigger_events: bool) -> bool: """ old_stream = self._stream self._stream = await self.get_stream() - if trigger_events: - self._twitch.on_channel_update(self, old_stream, self._stream) + self._twitch.on_channel_update(self, old_stream, self._stream) return self._stream is not None async def _online_delay(self): @@ -298,7 +315,7 @@ async def _online_delay(self): """ await asyncio.sleep(ONLINE_DELAY.total_seconds()) self._pending_stream_up = None # for 'display' to work properly - await self.update_stream(trigger_events=True) # triggers 'display' via the event + await self.update_stream() def check_online(self): """ diff --git a/twitch.py b/twitch.py index 3fd866f3..f0917690 100644 --- a/twitch.py +++ b/twitch.py @@ -744,11 +744,7 @@ async def _run(self): # remove all ACL channels that already exist from the other set acl_channels.difference_update(new_channels) # use the other set to set them online if possible - # if acl_channels: - # await asyncio.gather(*( - # channel.update_stream(trigger_events=False) - # for channel in acl_channels - # )) + await self.bulk_check_online(acl_channels) # finally, add them as new channels new_channels.update(acl_channels) for game in no_acl: @@ -1610,3 +1606,49 @@ async def claim_points(self, channel_id: str | int, claim_id: str) -> None: {"input": {"channelID": str(channel_id), "claimID": claim_id}} ) ) + + async def bulk_check_online(self, channels: abc.Iterable[Channel]): + """ + Utilize batch GQL requests to check ONLINE status for a lot of channels at once. + Also handles the drops_enabled check. + """ + acl_streams_map: dict[int, JsonType] = {} + stream_gql_ops: list[GQLOperation] = [channel.stream_gql for channel in channels] + if not stream_gql_ops: + # shortcut for nothing to process + # NOTE: Have to do this here, becase "channels" can be any iterable + return + for coro in asyncio.as_completed([ + self.gql_request(stream_gql_chunk) + for stream_gql_chunk in chunk(stream_gql_ops, 20) + ]): + response_list: list[JsonType] = await coro + for response_json in response_list: + channel_data: JsonType = response_json["data"]["user"] + acl_streams_map[int(channel_data["id"])] = channel_data + # for all channels with an active stream, check the available drops as well + acl_available_drops_map: dict[int, list[JsonType]] = {} + available_gql_ops: list[GQLOperation] = [ + GQL_OPERATIONS["AvailableDrops"].with_variables({"channelID": str(channel_id)}) + for channel_id, channel_data in acl_streams_map.items() + if channel_data["stream"] is not None # only do this for ONLINE channels + ] + for coro in asyncio.as_completed([ + self.gql_request(available_gql_chunk) + for available_gql_chunk in chunk(available_gql_ops, 20) + ]): + response_list = await coro + for response_json in response_list: + available_info: JsonType = response_json["data"]["channel"] + acl_available_drops_map[int(available_info["id"])] = ( + available_info["viewerDropCampaigns"] or [] + ) + for channel in channels: + channel_id = channel.id + if channel_id not in acl_streams_map: + continue + channel_data = acl_streams_map[channel_id] + if channel_data["stream"] is None: + continue + available_drops: list[JsonType] = acl_available_drops_map[channel_id] + channel.external_update(channel_data, available_drops)