Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding sync minor performance speed up using new table #17787

Merged
merged 5 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17787.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Sliding sync minor performance speed up using new table.
27 changes: 18 additions & 9 deletions synapse/handlers/sliding_sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
Requester,
SlidingSyncStreamToken,
StateMap,
StrCollection,
StreamKeyType,
StreamToken,
)
Expand Down Expand Up @@ -293,7 +294,6 @@ async def handle_room(room_id: str) -> None:
# to record rooms as having updates even if there might not actually
# be anything new for the user (e.g. due to event filters, events
# having happened after the user left, etc).
unsent_room_ids = []
if from_token:
# The set of rooms that the client (may) care about, but aren't
# in any list range (or subscribed to).
Expand All @@ -305,15 +305,24 @@ async def handle_room(room_id: str) -> None:
# TODO: Replace this with something faster. When we land the
# sliding sync tables that record the most recent event
# positions we can use that.
missing_event_map_by_room = (
await self.store.get_room_events_stream_for_rooms(
room_ids=missing_rooms,
from_key=to_token.room_key,
to_key=from_token.stream_token.room_key,
limit=1,
unsent_room_ids: StrCollection
if await self.store.have_finished_sliding_sync_background_jobs():
unsent_room_ids = await (
self.store.get_rooms_that_have_updates_since_sliding_sync_table(
room_ids=missing_rooms,
from_key=from_token.stream_token.room_key,
)
)
)
unsent_room_ids = list(missing_event_map_by_room)
else:
missing_event_map_by_room = (
await self.store.get_room_events_stream_for_rooms(
room_ids=missing_rooms,
from_key=to_token.room_key,
to_key=from_token.stream_token.room_key,
limit=1,
)
)
unsent_room_ids = list(missing_event_map_by_room)

new_connection_state.rooms.record_unsent_rooms(
unsent_room_ids, from_token.stream_token.room_key
Expand Down
42 changes: 42 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,48 @@ def get_rooms_that_changed(
if self._events_stream_cache.has_entity_changed(room_id, from_id)
}

async def get_rooms_that_have_updates_since_sliding_sync_table(
self,
room_ids: StrCollection,
from_key: RoomStreamToken,
) -> StrCollection:
"""Return the rooms that probably have had updates since the given
token (changes that are > `from_key`)."""
# If the stream change cache is valid for the stream token, we can just
# use the result of that.
if from_key.stream >= self._events_stream_cache.get_earliest_known_position():
return self._events_stream_cache.get_entities_changed(
room_ids, from_key.stream
)
Comment on lines +764 to +766
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference, the logic in get_entities_changed(...) was fixed in #17767 to abide by the inclusiveness of the token range that we expect (> from_token and <= to_token)


def get_rooms_that_have_updates_since_sliding_sync_table_txn(
txn: LoggingTransaction,
) -> StrCollection:
sql = """
SELECT room_id
FROM sliding_sync_joined_rooms
WHERE {clause}
AND event_stream_ordering > ?
"""

results: Set[str] = set()
for batch in batch_iter(room_ids, 1000):
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", batch
)

args.append(from_key.stream)
txn.execute(sql.format(clause=clause), args)

results.update(row[0] for row in txn)

return results

return await self.db_pool.runInteraction(
"get_rooms_that_have_updates_since_sliding_sync_table",
get_rooms_that_have_updates_since_sliding_sync_table_txn,
)

async def paginate_room_events_by_stream_ordering(
self,
*,
Expand Down
Loading