diff --git a/changelog.d/9910.bugfix b/changelog.d/9910.bugfix new file mode 100644 index 000000000000..06d523fd46a8 --- /dev/null +++ b/changelog.d/9910.bugfix @@ -0,0 +1 @@ +Fix bug where user directory could get out of sync if room visibility and membership changed in quick succession. diff --git a/changelog.d/9910.feature b/changelog.d/9910.feature new file mode 100644 index 000000000000..54165cce18af --- /dev/null +++ b/changelog.d/9910.feature @@ -0,0 +1 @@ +Improve performance after joining a large room when presence is enabled. diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index de1b14cde39a..4064a2b85913 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -78,7 +78,7 @@ async def _create_association( # TODO(erikj): Add transactions. # TODO(erikj): Check if there is a current association. if not servers: - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) servers = {get_domain_from_id(u) for u in users} if not servers: @@ -270,7 +270,7 @@ async def get_association(self, room_alias: RoomAlias) -> JsonDict: Codes.NOT_FOUND, ) - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) extra_servers = {get_domain_from_id(u) for u in users} servers = set(extra_servers) | set(servers) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index d82144d7fa8d..f134f1e234b6 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -103,7 +103,7 @@ async def get_stream( # Send down presence. if event.state_key == auth_user_id: # Send down presence for everyone in the room. - users = await self.state.get_current_users_in_room( + users = await self.store.get_users_in_room( event.room_id ) # type: Iterable[str] else: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 49f8aa25ea7b..393f17c3a346 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -258,7 +258,7 @@ async def get_joined_members(self, requester: Requester, room_id: str) -> dict: "Getting joined members after leaving is not implemented" ) - users_with_profile = await self.state.get_current_users_in_room(room_id) + users_with_profile = await self.store.get_users_in_room_with_profiles(room_id) # If this is an AS, double check that they are allowed to see the members. # This can either be because the AS user is in the room or because there diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ebbc2343343d..8e085dfbec22 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1293,7 +1293,7 @@ async def _on_user_joined_room( remote_host = get_domain_from_id(user_id) - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) user_ids = list(filter(self.is_mine_id, users)) states_d = await self.current_state_for_users(user_ids) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5a888b794168..fb4823a5cc6e 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1327,7 +1327,7 @@ async def shutdown_room( new_room_id = None logger.info("Shutting down room %r", room_id) - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) kicked_users = [] failed_to_kick_users = [] for user_id in users: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a9a3ee05c3f3..0fcc1532da8d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1190,7 +1190,7 @@ async def _generate_sync_entry_for_device_list( # Step 1b, check for newly joined rooms for room_id in newly_joined_rooms: - joined_users = await self.state.get_current_users_in_room(room_id) + joined_users = await self.store.get_users_in_room(room_id) newly_joined_or_invited_users.update(joined_users) # TODO: Check that these users are actually new, i.e. either they @@ -1206,7 +1206,7 @@ async def _generate_sync_entry_for_device_list( # Now find users that we no longer track for room_id in newly_left_rooms: - left_users = await self.state.get_current_users_in_room(room_id) + left_users = await self.store.get_users_in_room(room_id) newly_left_users.update(left_users) # Remove any users that we still share a room with. @@ -1361,7 +1361,7 @@ async def _generate_sync_entry_for_presence( extra_users_ids = set(newly_joined_or_invited_users) for room_id in newly_joined_rooms: - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) extra_users_ids.update(users) extra_users_ids.discard(user.to_string()) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index b3bd92d37c02..a1770f620e59 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -213,19 +213,23 @@ async def get_current_state_ids( return ret.state async def get_current_users_in_room( - self, room_id: str, latest_event_ids: Optional[List[str]] = None + self, room_id: str, latest_event_ids: List[str] ) -> Dict[str, ProfileInfo]: """ Get the users who are currently in a room. + Note: This is much slower than using the equivalent method + `DataStore.get_users_in_room` or `DataStore.get_users_in_room_with_profiles`, + so this should only be used when wanting the users at a particular point + in the room. + Args: room_id: The ID of the room. latest_event_ids: Precomputed list of latest event IDs. Will be computed if None. Returns: Dictionary of user IDs to their profileinfo. """ - if not latest_event_ids: - latest_event_ids = await self.store.get_latest_event_ids_in_room(room_id) + assert latest_event_ids is not None logger.debug("calling resolve_state_groups from get_current_users_in_room") diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 6b68d8720c92..3d98d3f5f822 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -69,6 +69,7 @@ def _invalidate_state_caches( self._attempt_to_invalidate_cache("is_host_joined", (room_id, host)) self._attempt_to_invalidate_cache("get_users_in_room", (room_id,)) + self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,)) self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,)) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 2a8532f8c1f9..5fc3bb5a7d7b 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -205,8 +205,12 @@ async def get_users_in_room_with_profiles( def _get_users_in_room_with_profiles(txn) -> Dict[str, ProfileInfo]: sql = """ - SELECT user_id, display_name, avatar_url FROM room_memberships - WHERE room_id = ? AND membership = ? + SELECT state_key, display_name, avatar_url FROM room_memberships as m + INNER JOIN current_state_events as c + ON m.event_id = c.event_id + AND m.room_id = c.room_id + AND m.user_id = c.state_key + WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ? """ txn.execute(sql, (room_id, Membership.JOIN)) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 7a082fdd217f..a6bfb4902a1c 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -142,8 +142,6 @@ async def _populate_user_directory_process_rooms(self, progress, batch_size): batch_size (int): Maximum number of state events to process per cycle. """ - state = self.hs.get_state_handler() - # If we don't have progress filed, delete everything. if not progress: await self.delete_all_from_user_dir() @@ -197,7 +195,7 @@ def _get_next_batch(txn): room_id ) - users_with_profile = await state.get_current_users_in_room(room_id) + users_with_profile = await self.get_users_in_room_with_profiles(room_id) user_ids = set(users_with_profile) # Update each user in the user directory.