diff --git a/changelog.d/17789.misc b/changelog.d/17789.misc new file mode 100644 index 00000000000..43ed360ce8f --- /dev/null +++ b/changelog.d/17789.misc @@ -0,0 +1 @@ +Speed up sliding sync when there are many active subscriptions. diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 08e619042b7..a1730b7e05b 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -500,6 +500,16 @@ async def _compute_interested_rooms_new_tables( # depending on the `required_state` requested (see below). partial_state_rooms = await self.store.get_partial_rooms() + # Fetch any rooms that we have not already fetched from the database. + subscription_sliding_sync_rooms = ( + await self.store.get_sliding_sync_room_for_user_batch( + user_id, + sync_config.room_subscriptions.keys() + - room_membership_for_user_map.keys(), + ) + ) + room_membership_for_user_map.update(subscription_sliding_sync_rooms) + for ( room_id, room_subscription, @@ -507,17 +517,11 @@ async def _compute_interested_rooms_new_tables( # Check if we have a membership for the room, but didn't pull it out # above. This could be e.g. a leave that we don't pull out by # default. - current_room_entry = ( - await self.store.get_sliding_sync_room_for_user( - user_id, room_id - ) - ) + current_room_entry = room_membership_for_user_map.get(room_id) if not current_room_entry: # TODO: Handle rooms the user isn't in. continue - room_membership_for_user_map[room_id] = current_room_entry - all_rooms.add(room_id) # Take the superset of the `RoomSyncConfig` for each room. diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 0a62613d347..6f15e513390 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1499,6 +1499,57 @@ def get_sliding_sync_room_for_user_txn( "get_sliding_sync_room_for_user", get_sliding_sync_room_for_user_txn ) + async def get_sliding_sync_room_for_user_batch( + self, user_id: str, room_ids: StrCollection + ) -> Dict[str, RoomsForUserSlidingSync]: + """Get the sliding sync room entry for the given user and rooms.""" + + if not room_ids: + return {} + + def get_sliding_sync_room_for_user_batch_txn( + txn: LoggingTransaction, + ) -> Dict[str, RoomsForUserSlidingSync]: + clause, args = make_in_list_sql_clause( + self.database_engine, "m.room_id", room_ids + ) + sql = f""" + SELECT m.room_id, m.sender, m.membership, m.membership_event_id, + r.room_version, + m.event_instance_name, m.event_stream_ordering, + m.has_known_state, + COALESCE(j.room_type, m.room_type), + COALESCE(j.is_encrypted, m.is_encrypted) + FROM sliding_sync_membership_snapshots AS m + INNER JOIN rooms AS r USING (room_id) + LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join') + WHERE m.forgotten = 0 + AND {clause} + AND user_id = ? + """ + args.append(user_id) + txn.execute(sql, args) + + return { + row[0]: RoomsForUserSlidingSync( + room_id=row[0], + sender=row[1], + membership=row[2], + event_id=row[3], + room_version_id=row[4], + event_pos=PersistedEventPosition(row[5], row[6]), + has_known_state=bool(row[7]), + room_type=row[8], + is_encrypted=row[9], + ) + for row in txn + } + + return await self.db_pool.runInteraction( + "get_sliding_sync_room_for_user_batch", + get_sliding_sync_room_for_user_batch_txn, + ) + class RoomMemberBackgroundUpdateStore(SQLBaseStore): def __init__(