Skip to content

Commit

Permalink
Sliding sync minor performance speed up using new table (#17787)
Browse files Browse the repository at this point in the history
Use the new tables to work out which rooms have changed.

---------

Co-authored-by: Eric Eastwood <[email protected]>
  • Loading branch information
erikjohnston and MadLittleMods authored Oct 8, 2024
1 parent e2610de commit 4e90221
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 9 deletions.
1 change: 1 addition & 0 deletions changelog.d/17787.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Sliding sync minor performance speed up using new table.
27 changes: 18 additions & 9 deletions synapse/handlers/sliding_sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
Requester,
SlidingSyncStreamToken,
StateMap,
StrCollection,
StreamKeyType,
StreamToken,
)
Expand Down Expand Up @@ -293,7 +294,6 @@ async def handle_room(room_id: str) -> None:
# to record rooms as having updates even if there might not actually
# be anything new for the user (e.g. due to event filters, events
# having happened after the user left, etc).
unsent_room_ids = []
if from_token:
# The set of rooms that the client (may) care about, but aren't
# in any list range (or subscribed to).
Expand All @@ -305,15 +305,24 @@ async def handle_room(room_id: str) -> None:
# TODO: Replace this with something faster. When we land the
# sliding sync tables that record the most recent event
# positions we can use that.
missing_event_map_by_room = (
await self.store.get_room_events_stream_for_rooms(
room_ids=missing_rooms,
from_key=to_token.room_key,
to_key=from_token.stream_token.room_key,
limit=1,
unsent_room_ids: StrCollection
if await self.store.have_finished_sliding_sync_background_jobs():
unsent_room_ids = await (
self.store.get_rooms_that_have_updates_since_sliding_sync_table(
room_ids=missing_rooms,
from_key=from_token.stream_token.room_key,
)
)
)
unsent_room_ids = list(missing_event_map_by_room)
else:
missing_event_map_by_room = (
await self.store.get_room_events_stream_for_rooms(
room_ids=missing_rooms,
from_key=to_token.room_key,
to_key=from_token.stream_token.room_key,
limit=1,
)
)
unsent_room_ids = list(missing_event_map_by_room)

new_connection_state.rooms.record_unsent_rooms(
unsent_room_ids, from_token.stream_token.room_key
Expand Down
42 changes: 42 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,48 @@ def get_rooms_that_changed(
if self._events_stream_cache.has_entity_changed(room_id, from_id)
}

async def get_rooms_that_have_updates_since_sliding_sync_table(
self,
room_ids: StrCollection,
from_key: RoomStreamToken,
) -> StrCollection:
"""Return the rooms that probably have had updates since the given
token (changes that are > `from_key`)."""
# If the stream change cache is valid for the stream token, we can just
# use the result of that.
if from_key.stream >= self._events_stream_cache.get_earliest_known_position():
return self._events_stream_cache.get_entities_changed(
room_ids, from_key.stream
)

def get_rooms_that_have_updates_since_sliding_sync_table_txn(
txn: LoggingTransaction,
) -> StrCollection:
sql = """
SELECT room_id
FROM sliding_sync_joined_rooms
WHERE {clause}
AND event_stream_ordering > ?
"""

results: Set[str] = set()
for batch in batch_iter(room_ids, 1000):
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", batch
)

args.append(from_key.stream)
txn.execute(sql.format(clause=clause), args)

results.update(row[0] for row in txn)

return results

return await self.db_pool.runInteraction(
"get_rooms_that_have_updates_since_sliding_sync_table",
get_rooms_that_have_updates_since_sliding_sync_table_txn,
)

async def paginate_room_events_by_stream_ordering(
self,
*,
Expand Down

0 comments on commit 4e90221

Please sign in to comment.