From 75e464ba39d56c06960c7454e643490a844e5378 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 31 Jul 2024 15:16:49 -0500 Subject: [PATCH 01/17] Working in Sliding sync --- synapse/handlers/sliding_sync.py | 39 ++++-- synapse/storage/databases/main/stream.py | 111 +++++++++++------- .../sliding_sync/test_rooms_timeline.py | 111 +++++++++++++++++- 3 files changed, 201 insertions(+), 60 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 84677665182..6d8ced0e1f9 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1855,19 +1855,34 @@ async def get_room_sync_data( room_membership_for_user_at_to_token.event_pos.to_room_stream_token() ) - timeline_events, new_room_key = await self.store.paginate_room_events( - room_id=room_id, - # The bounds are reversed so we can paginate backwards - # (from newer to older events) starting at to_bound. - # This ensures we fill the `limit` with the newest events first, - from_key=to_bound, - to_key=from_bound, - direction=Direction.BACKWARDS, - # We add one so we can determine if there are enough events to saturate - # the limit or not (see `limited`) - limit=room_sync_config.timeline_limit + 1, - event_filter=None, + timeline_events, new_room_key = ( + await self.store.get_room_events_stream_for_room( + room_id=room_id, + # The bounds are reversed so we can paginate backwards + # (from newer to older events) starting at to_bound. + # This ensures we fill the `limit` with the newest events first, + from_key=to_bound, + to_key=from_bound, + direction=Direction.BACKWARDS, + # We add one so we can determine if there are enough events to saturate + # the limit or not (see `limited`) + limit=room_sync_config.timeline_limit + 1, + ) ) + # timeline_events, new_room_key = await self.store.paginate_room_events( + # room_id=room_id, + # # The bounds are reversed so we can paginate backwards + # # (from newer to older events) starting at to_bound. + # # This ensures we fill the `limit` with the newest events first, + # from_key=to_bound, + # to_key=from_bound, + # direction=Direction.BACKWARDS, + # # We add one so we can determine if there are enough events to saturate + # # the limit or not (see `limited`) + # limit=room_sync_config.timeline_limit + 1, + # event_filter=None, + # ) + logger.info("timeline_events %s", timeline_events) # We want to return the events in ascending order (the last event is the # most recent). diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 4207e73c7f9..7c276f1ab2b 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -59,7 +59,7 @@ import attr from immutabledict import immutabledict -from typing_extensions import Literal +from typing_extensions import Literal, assert_never from twisted.internet import defer @@ -447,7 +447,6 @@ def _filter_results_by_stream( The `instance_name` arg is optional to handle historic rows, and is interpreted as if it was "master". """ - if instance_name is None: instance_name = "master" @@ -731,65 +730,87 @@ async def get_room_events_stream_for_room( self, room_id: str, from_key: RoomStreamToken, - to_key: RoomStreamToken, + to_key: Optional[RoomStreamToken] = None, limit: int = 0, - order: str = "DESC", + direction: Direction = Direction.BACKWARDS, ) -> Tuple[List[EventBase], RoomStreamToken]: """Get new room events in stream ordering since `from_key`. + When Direction.FORWARDS: from_key < x <= to_key, (ascending order) + When Direction.BACKWARDS: from_key >= x > to_key, (descending order) + Args: room_id - from_key: Token from which no events are returned before - to_key: Token from which no events are returned after. (This - is typically the current stream token) + from_key: The token to stream from (starting point and heading in the given direction) + to_key: The token representing the end stream position (end point) limit: Maximum number of events to return - order: Either "DESC" or "ASC". Determines which events are - returned when the result is limited. If "DESC" then the most - recent `limit` events are returned, otherwise returns the - oldest `limit` events. + direction: Indicates whether we are paginating forwards or backwards + from `from_key`. Returns: - The list of events (in ascending stream order) and the token from the start - of the chunk of events returned. + TODO """ + # We should only be working with `stream_ordering` tokens here + assert from_key is None or from_key.topological is None + assert to_key is None or to_key.topological is None + if from_key == to_key: return [], from_key - has_changed = self._events_stream_cache.has_entity_changed( - room_id, from_key.stream - ) + has_changed = True + if direction == Direction.FORWARDS: + has_changed = self._events_stream_cache.has_entity_changed( + room_id, from_key.stream + ) + elif direction == Direction.BACKWARDS: + if to_key is not None: + has_changed = self._events_stream_cache.has_entity_changed( + room_id, to_key.stream + ) + else: + assert_never(direction) if not has_changed: return [], from_key - def f(txn: LoggingTransaction) -> List[_EventDictReturn]: - # To handle tokens with a non-empty instance_map we fetch more - # results than necessary and then filter down - min_from_id = from_key.stream - max_to_id = to_key.get_max_stream_pos() + order, from_bound, to_bound = generate_pagination_bounds( + direction, from_key, to_key + ) - sql = """ - SELECT event_id, instance_name, topological_ordering, stream_ordering + bounds = generate_pagination_where_clause( + direction=direction, + column_names=(None, "stream_ordering"), + from_token=from_bound, + to_token=to_bound, + engine=self.database_engine, + ) + + def f(txn: LoggingTransaction) -> List[_EventDictReturn]: + sql = f""" + SELECT event_id, instance_name, stream_ordering FROM events WHERE room_id = ? AND not outlier - AND stream_ordering > ? AND stream_ordering <= ? - ORDER BY stream_ordering %s LIMIT ? - """ % ( - order, - ) - txn.execute(sql, (room_id, min_from_id, max_to_id, 2 * limit)) + AND {bounds} + ORDER BY stream_ordering {order} LIMIT ? + """ + txn.execute(sql, (room_id, 2 * limit)) + + logger.info("asdf sql %s", sql) rows = [ _EventDictReturn(event_id, None, stream_ordering) - for event_id, instance_name, topological_ordering, stream_ordering in txn - if _filter_results( - from_key, - to_key, - instance_name, - topological_ordering, - stream_ordering, + for event_id, instance_name, stream_ordering in txn + if _filter_results_by_stream( + lower_token=( + to_key if direction == Direction.BACKWARDS else from_key + ), + upper_token=( + from_key if direction == Direction.BACKWARDS else to_key + ), + instance_name=instance_name, + stream_ordering=stream_ordering, ) ][:limit] return rows @@ -800,17 +821,17 @@ def f(txn: LoggingTransaction) -> List[_EventDictReturn]: [r.event_id for r in rows], get_prev_content=True ) - if order.lower() == "desc": - ret.reverse() - if rows: - key = RoomStreamToken(stream=min(r.stream_ordering for r in rows)) + next_key = generate_next_token( + direction=direction, + last_topo_ordering=None, + last_stream_ordering=rows[-1].stream_ordering, + ) else: - # Assume we didn't get anything because there was nothing to - # get. - key = from_key + # TODO (erikj): We should work out what to do here instead. + next_key = to_key if to_key else from_key - return ret, key + return ret, next_key async def get_current_state_delta_membership_changes_for_user( self, @@ -1908,7 +1929,7 @@ def _paginate_room_events_txn( "bounds": bounds, "order": order, } - + logger.info("asdf sql %s", sql) txn.execute(sql, args) # Filter the result set. diff --git a/tests/rest/client/sliding_sync/test_rooms_timeline.py b/tests/rest/client/sliding_sync/test_rooms_timeline.py index 84a1e0d223a..3b193cc7c9a 100644 --- a/tests/rest/client/sliding_sync/test_rooms_timeline.py +++ b/tests/rest/client/sliding_sync/test_rooms_timeline.py @@ -13,12 +13,13 @@ # import logging +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin from synapse.rest.client import login, room, sync from synapse.server import HomeServer -from synapse.types import StreamToken +from synapse.types import StreamToken, StrSequence from synapse.util import Clock from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase @@ -42,6 +43,82 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + def _assertListEqual( + self, + actual_items: StrSequence, + expected_items: StrSequence, + message: Optional[str] = None, + ): + """ + Like `self.assertListEqual(...)` but with an actually understandable diff message. + """ + + if actual_items == expected_items: + return + + expected_lines: List[str] = [] + for expected_item in expected_items: + is_expected_in_actual = expected_item in actual_items + expected_lines.append( + "{} {}".format(" " if is_expected_in_actual else "?", expected_item) + ) + + actual_lines: List[str] = [] + for actual_item in actual_items: + is_actual_in_expected = actual_item in expected_items + actual_lines.append( + "{} {}".format("+" if is_actual_in_expected else " ", actual_item) + ) + + newline = "\n" + expected_string = f"Expected items to be in actual ('?' = missing expected items):\n [\n{newline.join(expected_lines)}\n ]" + actual_string = f"Actual ('+' = found expected items):\n [\n{newline.join(actual_lines)}\n ]" + first_message = "Items must" + diff_message = f"{first_message}\n{expected_string}\n{actual_string}" + + self.fail(f"{diff_message}\n{message}") + + def _assertTimelineEqual( + self, + *, + room_id: str, + actual_event_ids: StrSequence, + expected_event_ids: StrSequence, + message: Optional[str] = None, + ): + """ + Like `self.assertListEqual(...)` for event IDs in a room but will give a nicer + output with context for what each event_id is (type, stream_ordering, content, + etc). + """ + if actual_event_ids == expected_event_ids: + return + + event_id_set = set(actual_event_ids + expected_event_ids) + events = self.get_success(self.store.get_events(event_id_set)) + + def event_id_to_string(event_id: str) -> str: + event = events.get(event_id) + if event: + state_key = event.get_state_key() + state_key_piece = f", {state_key}" if state_key is not None else "" + return ( + f"({event.internal_metadata.stream_ordering: >2}, {event.internal_metadata.instance_name}) " + + f"{event.event_id} ({event.type}{state_key_piece}) {event.content.get('membership', '')}{event.content.get('body', '')}" + ) + + return f"{event_id} " + + self._assertListEqual( + actual_items=[ + event_id_to_string(event_id) for event_id in actual_event_ids + ], + expected_items=[ + event_id_to_string(event_id) for event_id in expected_event_ids + ], + message=message, + ) + def test_rooms_limited_initial_sync(self) -> None: """ Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit` @@ -85,18 +162,46 @@ def test_rooms_limited_initial_sync(self) -> None: response_body["rooms"][room_id1], ) # Check to make sure the latest events are returned - self.assertEqual( + logger.info( + "list a %s", [ event["event_id"] for event in response_body["rooms"][room_id1]["timeline"] ], + ) + logger.info( + "list b %s", [ event_response4["event_id"], event_response5["event_id"], user1_join_response["event_id"], ], - response_body["rooms"][room_id1]["timeline"], ) + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[ + event["event_id"] + for event in response_body["rooms"][room_id1]["timeline"] + ], + expected_event_ids=[ + event_response4["event_id"], + event_response5["event_id"], + user1_join_response["event_id"], + ], + message=str(response_body["rooms"][room_id1]["timeline"]), + ) + # self.assertListEqual( + # [ + # event["event_id"] + # for event in response_body["rooms"][room_id1]["timeline"] + # ], + # [ + # event_response4["event_id"], + # event_response5["event_id"], + # user1_join_response["event_id"], + # ], + # response_body["rooms"][room_id1]["timeline"], + # ) # Check to make sure the `prev_batch` points at the right place prev_batch_token = self.get_success( From c5966e5493bd6ef7f6794b1e230118a720c7cfa3 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 31 Jul 2024 15:37:31 -0500 Subject: [PATCH 02/17] Refine function --- synapse/handlers/sliding_sync.py | 14 --- synapse/storage/databases/main/stream.py | 92 +++++++++++-------- .../sliding_sync/test_rooms_timeline.py | 10 +- 3 files changed, 61 insertions(+), 55 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 6d8ced0e1f9..7ea4cbd0996 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1869,20 +1869,6 @@ async def get_room_sync_data( limit=room_sync_config.timeline_limit + 1, ) ) - # timeline_events, new_room_key = await self.store.paginate_room_events( - # room_id=room_id, - # # The bounds are reversed so we can paginate backwards - # # (from newer to older events) starting at to_bound. - # # This ensures we fill the `limit` with the newest events first, - # from_key=to_bound, - # to_key=from_bound, - # direction=Direction.BACKWARDS, - # # We add one so we can determine if there are enough events to saturate - # # the limit or not (see `limited`) - # limit=room_sync_config.timeline_limit + 1, - # event_filter=None, - # ) - logger.info("timeline_events %s", timeline_events) # We want to return the events in ascending order (the last event is the # most recent). diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 7c276f1ab2b..7f8945f4773 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -280,7 +280,7 @@ def generate_pagination_bounds( def generate_next_token( - direction: Direction, last_topo_ordering: int, last_stream_ordering: int + direction: Direction, last_topo_ordering: Optional[int], last_stream_ordering: int ) -> RoomStreamToken: """ Generate the next room stream token based on the currently returned data. @@ -734,10 +734,9 @@ async def get_room_events_stream_for_room( limit: int = 0, direction: Direction = Direction.BACKWARDS, ) -> Tuple[List[EventBase], RoomStreamToken]: - """Get new room events in stream ordering since `from_key`. - - When Direction.FORWARDS: from_key < x <= to_key, (ascending order) - When Direction.BACKWARDS: from_key >= x > to_key, (descending order) + """ + Paginate events by `stream_ordering` in the room from the `from_key` in the + given `direction` to the `to_key` or `limit`. Args: room_id @@ -748,15 +747,39 @@ async def get_room_events_stream_for_room( from `from_key`. Returns: - TODO + The results as a list of events and a token that points to the end + of the result set. If no events are returned then the end of the + stream has been reached (i.e. there are no events between `from_key` + and `to_key`). + + When Direction.FORWARDS: from_key < x <= to_key, (ascending order) + When Direction.BACKWARDS: from_key >= x > to_key, (descending order) """ # We should only be working with `stream_ordering` tokens here assert from_key is None or from_key.topological is None assert to_key is None or to_key.topological is None - if from_key == to_key: - return [], from_key + # We can bail early if we're looking forwards, and our `to_key` is already + # before our `from_key`. + if ( + direction == Direction.FORWARDS + and to_key is not None + and to_key.is_before_or_eq(from_key) + ): + # Token selection matches what we do below if there are no rows + return [], to_key if to_key else from_key + # Or vice-versa, if we're looking backwards and our `from_key` is already before + # our `to_key`. + elif ( + direction == Direction.BACKWARDS + and to_key is not None + and from_key.is_before_or_eq(to_key) + ): + # Token selection matches what we do below if there are no rows + return [], to_key if to_key else from_key + # We can do a quick sanity check to see if any events have been sent in the room + # since the earlier token. has_changed = True if direction == Direction.FORWARDS: has_changed = self._events_stream_cache.has_entity_changed( @@ -771,7 +794,8 @@ async def get_room_events_stream_for_room( assert_never(direction) if not has_changed: - return [], from_key + # Token selection matches what we do below if there are no rows + return [], to_key if to_key else from_key order, from_bound, to_bound = generate_pagination_bounds( direction, from_key, to_key @@ -779,7 +803,9 @@ async def get_room_events_stream_for_room( bounds = generate_pagination_where_clause( direction=direction, - column_names=(None, "stream_ordering"), + # The empty string will shortcut downstream code to only use the + # `stream_ordering` column + column_names=("", "stream_ordering"), from_token=from_bound, to_token=to_bound, engine=self.database_engine, @@ -797,8 +823,6 @@ def f(txn: LoggingTransaction) -> List[_EventDictReturn]: """ txn.execute(sql, (room_id, 2 * limit)) - logger.info("asdf sql %s", sql) - rows = [ _EventDictReturn(event_id, None, stream_ordering) for event_id, instance_name, stream_ordering in txn @@ -828,7 +852,8 @@ def f(txn: LoggingTransaction) -> List[_EventDictReturn]: last_stream_ordering=rows[-1].stream_ordering, ) else: - # TODO (erikj): We should work out what to do here instead. + # TODO (erikj): We should work out what to do here instead. (same as + # `_paginate_room_events_txn(...)`) next_key = to_key if to_key else from_key return ret, next_key @@ -1845,6 +1870,24 @@ def _paginate_room_events_txn( been reached (i.e. there are no events between `from_token` and `to_token`), or `limit` is zero. """ + # We can bail early if we're looking forwards, and our `to_key` is already + # before our `from_token`. + if ( + direction == Direction.FORWARDS + and to_token is not None + and to_token.is_before_or_eq(from_token) + ): + # Token selection matches what we do below if there are no rows + return [], to_token if to_token else from_token + # Or vice-versa, if we're looking backwards and our `from_token` is already before + # our `to_token`. + elif ( + direction == Direction.BACKWARDS + and to_token is not None + and from_token.is_before_or_eq(to_token) + ): + # Token selection matches what we do below if there are no rows + return [], to_token if to_token else from_token args: List[Any] = [room_id] @@ -1929,7 +1972,6 @@ def _paginate_room_events_txn( "bounds": bounds, "order": order, } - logger.info("asdf sql %s", sql) txn.execute(sql, args) # Filter the result set. @@ -1990,28 +2032,6 @@ async def paginate_room_events( stream has been reached (i.e. there are no events between `from_key` and `to_key`). """ - - # We can bail early if we're looking forwards, and our `to_key` is already - # before our `from_key`. - if ( - direction == Direction.FORWARDS - and to_key is not None - and to_key.is_before_or_eq(from_key) - ): - # Token selection matches what we do in `_paginate_room_events_txn` if there - # are no rows - return [], to_key if to_key else from_key - # Or vice-versa, if we're looking backwards and our `from_key` is already before - # our `to_key`. - elif ( - direction == Direction.BACKWARDS - and to_key is not None - and from_key.is_before_or_eq(to_key) - ): - # Token selection matches what we do in `_paginate_room_events_txn` if there - # are no rows - return [], to_key if to_key else from_key - rows, token = await self.db_pool.runInteraction( "paginate_room_events", self._paginate_room_events_txn, diff --git a/tests/rest/client/sliding_sync/test_rooms_timeline.py b/tests/rest/client/sliding_sync/test_rooms_timeline.py index 3b193cc7c9a..5b09773995e 100644 --- a/tests/rest/client/sliding_sync/test_rooms_timeline.py +++ b/tests/rest/client/sliding_sync/test_rooms_timeline.py @@ -12,8 +12,8 @@ # . # import logging +from typing import List, Optional -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -48,7 +48,7 @@ def _assertListEqual( actual_items: StrSequence, expected_items: StrSequence, message: Optional[str] = None, - ): + ) -> None: """ Like `self.assertListEqual(...)` but with an actually understandable diff message. """ @@ -82,10 +82,10 @@ def _assertTimelineEqual( self, *, room_id: str, - actual_event_ids: StrSequence, - expected_event_ids: StrSequence, + actual_event_ids: List[str], + expected_event_ids: List[str], message: Optional[str] = None, - ): + ) -> None: """ Like `self.assertListEqual(...)` for event IDs in a room but will give a nicer output with context for what each event_id is (type, stream_ordering, content, From 3423f83f39184fecf4269441a3610662e66b2936 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 31 Jul 2024 15:46:22 -0500 Subject: [PATCH 03/17] Adapt the other places --- synapse/handlers/room.py | 2 +- synapse/handlers/sync.py | 18 ++++++++++++++---- synapse/storage/databases/main/stream.py | 22 ++++++++++++---------- 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 262d9f40449..2c6e672ede8 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1750,7 +1750,7 @@ async def get_new_events( from_key=from_key, to_key=to_key, limit=limit or 10, - order="ASC", + direction=Direction.FORWARDS, ) events = list(room_events) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ede014180c0..aa8040ea317 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -43,6 +43,7 @@ from synapse.api.constants import ( AccountDataTypes, + Direction, EventContentFields, EventTypes, JoinRules, @@ -888,10 +889,15 @@ async def _load_filtered_recents( events, end_key = await self.store.get_room_events_stream_for_room( room_id, limit=load_limit + 1, - from_key=since_key, - to_key=end_key, + from_key=end_key, + to_key=since_key, + direction=Direction.BACKWARDS, ) + # We want to return the events in ascending order (the last event is the + # most recent). + events = events.reverse() else: + # TODO: This should return events in `stream_ordering` order events, end_key = await self.store.get_recent_events_for_room( room_id, limit=load_limit + 1, end_token=end_key ) @@ -2641,9 +2647,10 @@ async def _get_room_changes_for_incremental_sync( # a "gap" in the timeline, as described by the spec for /sync. room_to_events = await self.store.get_room_events_stream_for_rooms( room_ids=sync_result_builder.joined_room_ids, - from_key=since_token.room_key, - to_key=now_token.room_key, + from_key=now_token.room_key, + to_key=since_token.room_key, limit=timeline_limit + 1, + direction=Direction.BACKWARDS, ) # We loop through all room ids, even if there are no new events, in case @@ -2654,6 +2661,9 @@ async def _get_room_changes_for_incremental_sync( newly_joined = room_id in newly_joined_rooms if room_entry: events, start_key = room_entry + # We want to return the events in ascending order (the last event is the + # most recent). + events = events.reverse() prev_batch_token = now_token.copy_and_replace( StreamKeyType.ROOM, start_key diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 7f8945f4773..14f71af39e4 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -663,25 +663,26 @@ async def get_room_events_stream_for_rooms( from_key: RoomStreamToken, to_key: RoomStreamToken, limit: int = 0, - order: str = "DESC", + direction: Direction = Direction.BACKWARDS, ) -> Dict[str, Tuple[List[EventBase], RoomStreamToken]]: """Get new room events in stream ordering since `from_key`. Args: room_ids - from_key: Token from which no events are returned before - to_key: Token from which no events are returned after. (This - is typically the current stream token) + from_key: The token to stream from (starting point and heading in the given + direction) + to_key: The token representing the end stream position (end point) limit: Maximum number of events to return - order: Either "DESC" or "ASC". Determines which events are - returned when the result is limited. If "DESC" then the most - recent `limit` events are returned, otherwise returns the - oldest `limit` events. + direction: Indicates whether we are paginating forwards or backwards + from `from_key`. Returns: A map from room id to a tuple containing: - list of recent events in the room - stream ordering key for the start of the chunk of events returned. + + When Direction.FORWARDS: from_key < x <= to_key, (ascending order) + When Direction.BACKWARDS: from_key >= x > to_key, (descending order) """ room_ids = self._events_stream_cache.get_entities_changed( room_ids, from_key.stream @@ -702,7 +703,7 @@ async def get_room_events_stream_for_rooms( from_key, to_key, limit, - order=order, + direction=direction, ) for room_id in rm_ids ], @@ -740,7 +741,8 @@ async def get_room_events_stream_for_room( Args: room_id - from_key: The token to stream from (starting point and heading in the given direction) + from_key: The token to stream from (starting point and heading in the given + direction) to_key: The token representing the end stream position (end point) limit: Maximum number of events to return direction: Indicates whether we are paginating forwards or backwards From f781e5b3bc8e9c021b439d49cc9329171b95c490 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 31 Jul 2024 15:49:02 -0500 Subject: [PATCH 04/17] Update comment to reflect what we're doing --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index aa8040ea317..37861e3f122 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -884,7 +884,7 @@ async def _load_filtered_recents( # that have happened since `since_key` up to `end_key`, so we # can just use `get_room_events_stream_for_room`. # Otherwise, we want to return the last N events in the room - # in topological ordering. + # in `stream_ordering`. if since_key: events, end_key = await self.store.get_room_events_stream_for_room( room_id, From 4aee1ab2fd8babda0161daaaf6f8709ba6e6f030 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 31 Jul 2024 15:52:58 -0500 Subject: [PATCH 05/17] Use `_assertTimelineEqual(...)` on all of the timeline tests --- .../sliding_sync/test_rooms_timeline.py | 63 ++++++------------- 1 file changed, 20 insertions(+), 43 deletions(-) diff --git a/tests/rest/client/sliding_sync/test_rooms_timeline.py b/tests/rest/client/sliding_sync/test_rooms_timeline.py index 5b09773995e..2e9586ca733 100644 --- a/tests/rest/client/sliding_sync/test_rooms_timeline.py +++ b/tests/rest/client/sliding_sync/test_rooms_timeline.py @@ -162,21 +162,6 @@ def test_rooms_limited_initial_sync(self) -> None: response_body["rooms"][room_id1], ) # Check to make sure the latest events are returned - logger.info( - "list a %s", - [ - event["event_id"] - for event in response_body["rooms"][room_id1]["timeline"] - ], - ) - logger.info( - "list b %s", - [ - event_response4["event_id"], - event_response5["event_id"], - user1_join_response["event_id"], - ], - ) self._assertTimelineEqual( room_id=room_id1, actual_event_ids=[ @@ -190,18 +175,6 @@ def test_rooms_limited_initial_sync(self) -> None: ], message=str(response_body["rooms"][room_id1]["timeline"]), ) - # self.assertListEqual( - # [ - # event["event_id"] - # for event in response_body["rooms"][room_id1]["timeline"] - # ], - # [ - # event_response4["event_id"], - # event_response5["event_id"], - # user1_join_response["event_id"], - # ], - # response_body["rooms"][room_id1]["timeline"], - # ) # Check to make sure the `prev_batch` points at the right place prev_batch_token = self.get_success( @@ -332,16 +305,17 @@ def test_rooms_incremental_sync(self) -> None: + str(response_body["rooms"][room_id1]), ) # Check to make sure the latest events are returned - self.assertEqual( - [ + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[ event["event_id"] for event in response_body["rooms"][room_id1]["timeline"] ], - [ + expected_event_ids=[ event_response2["event_id"], event_response3["event_id"], ], - response_body["rooms"][room_id1]["timeline"], + message=str(response_body["rooms"][room_id1]["timeline"]), ) # All events are "live" @@ -408,18 +382,19 @@ def test_rooms_newly_joined_incremental_sync(self) -> None: + str(response_body["rooms"][room_id1]), ) # Check to make sure that the "live" and historical events are returned - self.assertEqual( - [ + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[ event["event_id"] for event in response_body["rooms"][room_id1]["timeline"] ], - [ + expected_event_ids=[ event_response2["event_id"], user1_join_response["event_id"], event_response3["event_id"], event_response4["event_id"], ], - response_body["rooms"][room_id1]["timeline"], + message=str(response_body["rooms"][room_id1]["timeline"]), ) # Only events after the `from_token` are "live" (join, event3, event4) @@ -466,17 +441,18 @@ def test_rooms_ban_initial_sync(self) -> None: response_body, _ = self.do_sync(sync_body, tok=user1_tok) # We should see events before the ban but not after - self.assertEqual( - [ + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[ event["event_id"] for event in response_body["rooms"][room_id1]["timeline"] ], - [ + expected_event_ids=[ event_response3["event_id"], event_response4["event_id"], user1_ban_response["event_id"], ], - response_body["rooms"][room_id1]["timeline"], + message=str(response_body["rooms"][room_id1]["timeline"]), ) # No "live" events in an initial sync (no `from_token` to define the "live" # range) @@ -533,17 +509,18 @@ def test_rooms_ban_incremental_sync1(self) -> None: response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) # We should see events before the ban but not after - self.assertEqual( - [ + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[ event["event_id"] for event in response_body["rooms"][room_id1]["timeline"] ], - [ + expected_event_ids=[ event_response3["event_id"], event_response4["event_id"], user1_ban_response["event_id"], ], - response_body["rooms"][room_id1]["timeline"], + message=str(response_body["rooms"][room_id1]["timeline"]), ) # All live events in the incremental sync self.assertEqual( From cba4664c620bbdf321a6dc55ad99be4d38c47ea2 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 31 Jul 2024 15:55:18 -0500 Subject: [PATCH 06/17] Add changelog --- changelog.d/17510.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17510.bugfix diff --git a/changelog.d/17510.bugfix b/changelog.d/17510.bugfix new file mode 100644 index 00000000000..3170c284bd5 --- /dev/null +++ b/changelog.d/17510.bugfix @@ -0,0 +1 @@ +Fix timeline ordering (using `stream_ordering` instead of topological ordering) in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. From d6dd34f480596d72e2c4f424cf4f2b379ea9c516 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 31 Jul 2024 16:11:17 -0500 Subject: [PATCH 07/17] Fix lints: --- synapse/handlers/sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 37861e3f122..f1012dbbb6e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -895,7 +895,7 @@ async def _load_filtered_recents( ) # We want to return the events in ascending order (the last event is the # most recent). - events = events.reverse() + events.reverse() else: # TODO: This should return events in `stream_ordering` order events, end_key = await self.store.get_recent_events_for_room( @@ -2663,7 +2663,7 @@ async def _get_room_changes_for_incremental_sync( events, start_key = room_entry # We want to return the events in ascending order (the last event is the # most recent). - events = events.reverse() + events.reverse() prev_batch_token = now_token.copy_and_replace( StreamKeyType.ROOM, start_key From 3075a15a8e8d4ecff87ec8830fa9b33b2718bc98 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 31 Jul 2024 16:20:49 -0500 Subject: [PATCH 08/17] Revert test changes in favor of separate PR See https://github.com/element-hq/synapse/pull/17511 --- .../sliding_sync/test_rooms_timeline.py | 124 +++--------------- 1 file changed, 21 insertions(+), 103 deletions(-) diff --git a/tests/rest/client/sliding_sync/test_rooms_timeline.py b/tests/rest/client/sliding_sync/test_rooms_timeline.py index 2e9586ca733..84a1e0d223a 100644 --- a/tests/rest/client/sliding_sync/test_rooms_timeline.py +++ b/tests/rest/client/sliding_sync/test_rooms_timeline.py @@ -12,14 +12,13 @@ # . # import logging -from typing import List, Optional from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin from synapse.rest.client import login, room, sync from synapse.server import HomeServer -from synapse.types import StreamToken, StrSequence +from synapse.types import StreamToken from synapse.util import Clock from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase @@ -43,82 +42,6 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() - def _assertListEqual( - self, - actual_items: StrSequence, - expected_items: StrSequence, - message: Optional[str] = None, - ) -> None: - """ - Like `self.assertListEqual(...)` but with an actually understandable diff message. - """ - - if actual_items == expected_items: - return - - expected_lines: List[str] = [] - for expected_item in expected_items: - is_expected_in_actual = expected_item in actual_items - expected_lines.append( - "{} {}".format(" " if is_expected_in_actual else "?", expected_item) - ) - - actual_lines: List[str] = [] - for actual_item in actual_items: - is_actual_in_expected = actual_item in expected_items - actual_lines.append( - "{} {}".format("+" if is_actual_in_expected else " ", actual_item) - ) - - newline = "\n" - expected_string = f"Expected items to be in actual ('?' = missing expected items):\n [\n{newline.join(expected_lines)}\n ]" - actual_string = f"Actual ('+' = found expected items):\n [\n{newline.join(actual_lines)}\n ]" - first_message = "Items must" - diff_message = f"{first_message}\n{expected_string}\n{actual_string}" - - self.fail(f"{diff_message}\n{message}") - - def _assertTimelineEqual( - self, - *, - room_id: str, - actual_event_ids: List[str], - expected_event_ids: List[str], - message: Optional[str] = None, - ) -> None: - """ - Like `self.assertListEqual(...)` for event IDs in a room but will give a nicer - output with context for what each event_id is (type, stream_ordering, content, - etc). - """ - if actual_event_ids == expected_event_ids: - return - - event_id_set = set(actual_event_ids + expected_event_ids) - events = self.get_success(self.store.get_events(event_id_set)) - - def event_id_to_string(event_id: str) -> str: - event = events.get(event_id) - if event: - state_key = event.get_state_key() - state_key_piece = f", {state_key}" if state_key is not None else "" - return ( - f"({event.internal_metadata.stream_ordering: >2}, {event.internal_metadata.instance_name}) " - + f"{event.event_id} ({event.type}{state_key_piece}) {event.content.get('membership', '')}{event.content.get('body', '')}" - ) - - return f"{event_id} " - - self._assertListEqual( - actual_items=[ - event_id_to_string(event_id) for event_id in actual_event_ids - ], - expected_items=[ - event_id_to_string(event_id) for event_id in expected_event_ids - ], - message=message, - ) - def test_rooms_limited_initial_sync(self) -> None: """ Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit` @@ -162,18 +85,17 @@ def test_rooms_limited_initial_sync(self) -> None: response_body["rooms"][room_id1], ) # Check to make sure the latest events are returned - self._assertTimelineEqual( - room_id=room_id1, - actual_event_ids=[ + self.assertEqual( + [ event["event_id"] for event in response_body["rooms"][room_id1]["timeline"] ], - expected_event_ids=[ + [ event_response4["event_id"], event_response5["event_id"], user1_join_response["event_id"], ], - message=str(response_body["rooms"][room_id1]["timeline"]), + response_body["rooms"][room_id1]["timeline"], ) # Check to make sure the `prev_batch` points at the right place @@ -305,17 +227,16 @@ def test_rooms_incremental_sync(self) -> None: + str(response_body["rooms"][room_id1]), ) # Check to make sure the latest events are returned - self._assertTimelineEqual( - room_id=room_id1, - actual_event_ids=[ + self.assertEqual( + [ event["event_id"] for event in response_body["rooms"][room_id1]["timeline"] ], - expected_event_ids=[ + [ event_response2["event_id"], event_response3["event_id"], ], - message=str(response_body["rooms"][room_id1]["timeline"]), + response_body["rooms"][room_id1]["timeline"], ) # All events are "live" @@ -382,19 +303,18 @@ def test_rooms_newly_joined_incremental_sync(self) -> None: + str(response_body["rooms"][room_id1]), ) # Check to make sure that the "live" and historical events are returned - self._assertTimelineEqual( - room_id=room_id1, - actual_event_ids=[ + self.assertEqual( + [ event["event_id"] for event in response_body["rooms"][room_id1]["timeline"] ], - expected_event_ids=[ + [ event_response2["event_id"], user1_join_response["event_id"], event_response3["event_id"], event_response4["event_id"], ], - message=str(response_body["rooms"][room_id1]["timeline"]), + response_body["rooms"][room_id1]["timeline"], ) # Only events after the `from_token` are "live" (join, event3, event4) @@ -441,18 +361,17 @@ def test_rooms_ban_initial_sync(self) -> None: response_body, _ = self.do_sync(sync_body, tok=user1_tok) # We should see events before the ban but not after - self._assertTimelineEqual( - room_id=room_id1, - actual_event_ids=[ + self.assertEqual( + [ event["event_id"] for event in response_body["rooms"][room_id1]["timeline"] ], - expected_event_ids=[ + [ event_response3["event_id"], event_response4["event_id"], user1_ban_response["event_id"], ], - message=str(response_body["rooms"][room_id1]["timeline"]), + response_body["rooms"][room_id1]["timeline"], ) # No "live" events in an initial sync (no `from_token` to define the "live" # range) @@ -509,18 +428,17 @@ def test_rooms_ban_incremental_sync1(self) -> None: response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) # We should see events before the ban but not after - self._assertTimelineEqual( - room_id=room_id1, - actual_event_ids=[ + self.assertEqual( + [ event["event_id"] for event in response_body["rooms"][room_id1]["timeline"] ], - expected_event_ids=[ + [ event_response3["event_id"], event_response4["event_id"], user1_ban_response["event_id"], ], - message=str(response_body["rooms"][room_id1]["timeline"]), + response_body["rooms"][room_id1]["timeline"], ) # All live events in the incremental sync self.assertEqual( From d67c9b5e9617d4a7e8e1d5637d6b6c6e96a7897f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 31 Jul 2024 16:45:56 -0500 Subject: [PATCH 09/17] Fix tests --- synapse/storage/databases/main/stream.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 14f71af39e4..34aab0bb740 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -684,9 +684,17 @@ async def get_room_events_stream_for_rooms( When Direction.FORWARDS: from_key < x <= to_key, (ascending order) When Direction.BACKWARDS: from_key >= x > to_key, (descending order) """ - room_ids = self._events_stream_cache.get_entities_changed( - room_ids, from_key.stream - ) + if direction == Direction.FORWARDS: + room_ids = self._events_stream_cache.get_entities_changed( + room_ids, from_key.stream + ) + elif direction == Direction.BACKWARDS: + if to_key is not None: + room_ids = self._events_stream_cache.get_entities_changed( + room_ids, to_key.stream + ) + else: + assert_never(direction) if not room_ids: return {} From efcc91503e682ebb3fbeef2d98ad3733c49f8b9f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 5 Aug 2024 17:10:02 -0500 Subject: [PATCH 10/17] Update comment to describe when `stream_ordering` vs `topological_ordering` See https://github.com/element-hq/synapse/pull/17510#discussion_r1699105569 --- docs/development/room-dag-concepts.md | 6 ++++-- synapse/handlers/sync.py | 19 +++++++++++++------ synapse/storage/databases/main/stream.py | 2 +- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/docs/development/room-dag-concepts.md b/docs/development/room-dag-concepts.md index 76709487f80..35b667831cb 100644 --- a/docs/development/room-dag-concepts.md +++ b/docs/development/room-dag-concepts.md @@ -21,8 +21,10 @@ incrementing integer, but backfilled events start with `stream_ordering=-1` and --- - - `/sync` returns things in the order they arrive at the server (`stream_ordering`). - - `/messages` (and `/backfill` in the federation API) return them in the order determined by the event graph `(topological_ordering, stream_ordering)`. + - Incremental `/sync?since=xxx` returns things in the order they arrive at the server + (`stream_ordering`). + - Initial `/sync`, `/messages` (and `/backfill` in the federation API) return them in + the order determined by the event graph `(topological_ordering, stream_ordering)`. The general idea is that, if you're following a room in real-time (i.e. `/sync`), you probably want to see the messages as they arrive at your server, diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f1012dbbb6e..2262393f6cd 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -880,12 +880,19 @@ async def _load_filtered_recents( since_key = since_token.room_key while limited and len(recents) < timeline_limit and max_repeat: - # If we have a since_key then we are trying to get any events - # that have happened since `since_key` up to `end_key`, so we - # can just use `get_room_events_stream_for_room`. - # Otherwise, we want to return the last N events in the room - # in `stream_ordering`. + # For initial `/sync`, we want to view a historical section of the + # timeline; to fetch events by `topological_ordering` (best + # representation of the room DAG as others were seeing it at the time). + # This also aligns with the order that `/messages` returns events in. + # + # For incremental `/sync`, we want to get all updates for rooms since + # the last `/sync` (regardless if those updates arrived late or happened + # a while ago in the past); to fetch events by `stream_ordering` (in the + # order they were received by the server). + # + # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 if since_key: + # Fetch events by `stream_ordering` for incremental sync events, end_key = await self.store.get_room_events_stream_for_room( room_id, limit=load_limit + 1, @@ -897,7 +904,7 @@ async def _load_filtered_recents( # most recent). events.reverse() else: - # TODO: This should return events in `stream_ordering` order + # Fetch events by `topological_ordering` for initial sync events, end_key = await self.store.get_recent_events_for_room( room_id, limit=load_limit + 1, end_token=end_key ) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 34aab0bb740..2ccdc2ffde3 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -661,7 +661,7 @@ async def get_room_events_stream_for_rooms( self, room_ids: Collection[str], from_key: RoomStreamToken, - to_key: RoomStreamToken, + to_key: Optional[RoomStreamToken] = None, limit: int = 0, direction: Direction = Direction.BACKWARDS, ) -> Dict[str, Tuple[List[EventBase], RoomStreamToken]]: From 0e12dde39cde51e3e902737e53020e7e4f4106c5 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 5 Aug 2024 17:57:05 -0500 Subject: [PATCH 11/17] Add `PaginateFunction` type --- synapse/handlers/admin.py | 6 ++- synapse/handlers/sliding_sync.py | 55 ++++++++++++++++++------ synapse/handlers/sync.py | 2 +- synapse/storage/databases/main/stream.py | 32 ++++++++++---- 4 files changed, 71 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index ec35784c5f5..59616a938bc 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -198,7 +198,11 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") -> # efficient method perhaps but it does guarantee we get everything. while True: events, _ = await self._store.paginate_room_events( - room_id, from_key, to_key, limit=100, direction=Direction.FORWARDS + room_id=room_id, + from_key=from_key, + to_key=to_key, + limit=100, + direction=Direction.FORWARDS, ) if not events: break diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 7ea4cbd0996..d442c4aaefb 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -56,7 +56,10 @@ ROOM_UNKNOWN_SENTINEL, Sentinel as StateSentinel, ) -from synapse.storage.databases.main.stream import CurrentStateDeltaMembership +from synapse.storage.databases.main.stream import ( + CurrentStateDeltaMembership, + PaginateFunction, +) from synapse.storage.roommember import MemberSummary from synapse.types import ( DeviceListUpdates, @@ -1791,10 +1794,13 @@ async def get_room_sync_data( # We should return historical messages (before token range) in the # following cases because we want clients to be able to show a basic # screen of information: + # # - Initial sync (because no `from_token` to limit us anyway) # - When users `newly_joined` # - For an incremental sync where we haven't sent it down this # connection before + # + # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 from_bound = None initial = True if from_token and not room_membership_for_user_at_to_token.newly_joined: @@ -1855,19 +1861,40 @@ async def get_room_sync_data( room_membership_for_user_at_to_token.event_pos.to_room_stream_token() ) - timeline_events, new_room_key = ( - await self.store.get_room_events_stream_for_room( - room_id=room_id, - # The bounds are reversed so we can paginate backwards - # (from newer to older events) starting at to_bound. - # This ensures we fill the `limit` with the newest events first, - from_key=to_bound, - to_key=from_bound, - direction=Direction.BACKWARDS, - # We add one so we can determine if there are enough events to saturate - # the limit or not (see `limited`) - limit=room_sync_config.timeline_limit + 1, - ) + # For initial `/sync` (and other historical scenarios mentioned above), we + # want to view a historical section of the timeline; to fetch events by + # `topological_ordering` (best representation of the room DAG as others were + # seeing it at the time). This also aligns with the order that `/messages` + # returns events in. + # + # For incremental `/sync`, we want to get all updates for rooms since + # the last `/sync` (regardless if those updates arrived late or happened + # a while ago in the past); to fetch events by `stream_ordering` (in the + # order they were received by the server). + # + # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 + paginate_room_events: PaginateFunction = self.store.paginate_room_events + get_room_events_stream_for_room: PaginateFunction = ( + self.store.get_room_events_stream_for_room + ) + pagination_method: PaginateFunction = ( + # Use `topographical_ordering` for historical events + paginate_room_events + if from_bound is None + # Use `stream_ordering` for updates + else get_room_events_stream_for_room + ) + timeline_events, new_room_key = await pagination_method( + room_id=room_id, + # The bounds are reversed so we can paginate backwards + # (from newer to older events) starting at to_bound. + # This ensures we fill the `limit` with the newest events first, + from_key=to_bound, + to_key=from_bound, + direction=Direction.BACKWARDS, + # We add one so we can determine if there are enough events to saturate + # the limit or not (see `limited`) + limit=room_sync_config.timeline_limit + 1, ) # We want to return the events in ascending order (the last event is the diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 2262393f6cd..861b976572d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -894,7 +894,7 @@ async def _load_filtered_recents( if since_key: # Fetch events by `stream_ordering` for incremental sync events, end_key = await self.store.get_room_events_stream_for_room( - room_id, + room_id=room_id, limit=load_limit + 1, from_key=end_key, to_key=since_key, diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 2ccdc2ffde3..14b74d7d0d9 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -51,6 +51,7 @@ Iterable, List, Optional, + Protocol, Set, Tuple, cast, @@ -97,6 +98,18 @@ _TOPOLOGICAL_TOKEN = "topological" +class PaginateFunction(Protocol): + async def __call__( + self, + *, + room_id: str, + from_key: RoomStreamToken, + to_key: Optional[RoomStreamToken] = None, + direction: Direction = Direction.BACKWARDS, + limit: int = 0, + ) -> Tuple[List[EventBase], RoomStreamToken]: ... + + # Used as return values for pagination APIs @attr.s(slots=True, frozen=True, auto_attribs=True) class _EventDictReturn: @@ -659,11 +672,12 @@ def get_events_stream_id_generator(self) -> MultiWriterIdGenerator: async def get_room_events_stream_for_rooms( self, + *, room_ids: Collection[str], from_key: RoomStreamToken, to_key: Optional[RoomStreamToken] = None, - limit: int = 0, direction: Direction = Direction.BACKWARDS, + limit: int = 0, ) -> Dict[str, Tuple[List[EventBase], RoomStreamToken]]: """Get new room events in stream ordering since `from_key`. @@ -707,11 +721,11 @@ async def get_room_events_stream_for_rooms( [ run_in_background( self.get_room_events_stream_for_room, - room_id, - from_key, - to_key, - limit, + room_id=room_id, + from_key=from_key, + to_key=to_key, direction=direction, + limit=limit, ) for room_id in rm_ids ], @@ -737,11 +751,12 @@ def get_rooms_that_changed( async def get_room_events_stream_for_room( self, + *, room_id: str, from_key: RoomStreamToken, to_key: Optional[RoomStreamToken] = None, - limit: int = 0, direction: Direction = Direction.BACKWARDS, + limit: int = 0, ) -> Tuple[List[EventBase], RoomStreamToken]: """ Paginate events by `stream_ordering` in the room from the `from_key` in the @@ -1858,7 +1873,7 @@ def _paginate_room_events_txn( from_token: RoomStreamToken, to_token: Optional[RoomStreamToken] = None, direction: Direction = Direction.BACKWARDS, - limit: int = -1, + limit: int = 0, event_filter: Optional[Filter] = None, ) -> Tuple[List[_EventDictReturn], RoomStreamToken]: """Returns list of events before or after a given token. @@ -2015,11 +2030,12 @@ def _paginate_room_events_txn( @trace async def paginate_room_events( self, + *, room_id: str, from_key: RoomStreamToken, to_key: Optional[RoomStreamToken] = None, direction: Direction = Direction.BACKWARDS, - limit: int = -1, + limit: int = 0, event_filter: Optional[Filter] = None, ) -> Tuple[List[EventBase], RoomStreamToken]: """Returns list of events before or after a given token. From 6231fb08ed367c8a05fc5656241614cbba4d4e63 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 5 Aug 2024 18:03:43 -0500 Subject: [PATCH 12/17] Standardize way we fetch events for sync --- synapse/handlers/sliding_sync.py | 4 +++ synapse/handlers/sync.py | 48 +++++++++++++++++++++----------- 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index d442c4aaefb..d57c433f040 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1873,6 +1873,10 @@ async def get_room_sync_data( # order they were received by the server). # # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 + # + # FIXME: Using workaround for mypy, + # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and + # https://github.com/python/mypy/issues/17479 paginate_room_events: PaginateFunction = self.store.paginate_room_events get_room_events_stream_for_room: PaginateFunction = ( self.store.get_room_events_stream_for_room diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 861b976572d..a049703ae4a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -65,6 +65,7 @@ ) from synapse.storage.databases.main.event_push_actions import RoomNotifCounts from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary +from synapse.storage.databases.main.stream import PaginateFunction from synapse.storage.roommember import MemberSummary from synapse.types import ( DeviceListUpdates, @@ -891,23 +892,36 @@ async def _load_filtered_recents( # order they were received by the server). # # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 - if since_key: - # Fetch events by `stream_ordering` for incremental sync - events, end_key = await self.store.get_room_events_stream_for_room( - room_id=room_id, - limit=load_limit + 1, - from_key=end_key, - to_key=since_key, - direction=Direction.BACKWARDS, - ) - # We want to return the events in ascending order (the last event is the - # most recent). - events.reverse() - else: - # Fetch events by `topological_ordering` for initial sync - events, end_key = await self.store.get_recent_events_for_room( - room_id, limit=load_limit + 1, end_token=end_key - ) + # + # FIXME: Using workaround for mypy, + # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and + # https://github.com/python/mypy/issues/17479 + paginate_room_events: PaginateFunction = self.store.paginate_room_events + get_room_events_stream_for_room: PaginateFunction = ( + self.store.get_room_events_stream_for_room + ) + pagination_method: PaginateFunction = ( + # Use `topographical_ordering` for historical events + paginate_room_events + if since_key is None + # Use `stream_ordering` for updates + else get_room_events_stream_for_room + ) + events, end_key = await pagination_method( + room_id=room_id, + # The bounds are reversed so we can paginate backwards + # (from newer to older events) starting at to_bound. + # This ensures we fill the `limit` with the newest events first, + from_key=end_key, + to_key=since_key, + direction=Direction.BACKWARDS, + # We add one so we can determine if there are enough events to saturate + # the limit or not (see `limited`) + limit=load_limit + 1, + ) + # We want to return the events in ascending order (the last event is the + # most recent). + events.reverse() log_kv({"loaded_recents": len(events)}) From c5d09983e17dc19498bcaa16eb97249f0f7e9c35 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 5 Aug 2024 18:05:12 -0500 Subject: [PATCH 13/17] Rename `paginate_room_events(...)` -> `paginate_room_events_by_topological_ordering(...)` --- synapse/handlers/admin.py | 14 ++++++----- synapse/handlers/pagination.py | 32 +++++++++++++----------- synapse/handlers/sliding_sync.py | 6 +++-- synapse/handlers/sync.py | 6 +++-- synapse/storage/databases/main/stream.py | 16 ++++++------ tests/storage/test_stream.py | 2 +- 6 files changed, 43 insertions(+), 33 deletions(-) diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 59616a938bc..b44e862493d 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -197,12 +197,14 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") -> # events that we have and then filtering, this isn't the most # efficient method perhaps but it does guarantee we get everything. while True: - events, _ = await self._store.paginate_room_events( - room_id=room_id, - from_key=from_key, - to_key=to_key, - limit=100, - direction=Direction.FORWARDS, + events, _ = ( + await self._store.paginate_room_events_by_topological_ordering( + room_id=room_id, + from_key=from_key, + to_key=to_key, + limit=100, + direction=Direction.FORWARDS, + ) ) if not events: break diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 872c85fbadd..6fd7afa2808 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -507,13 +507,15 @@ async def get_messages( # Initially fetch the events from the database. With any luck, we can return # these without blocking on backfill (handled below). - events, next_key = await self.store.paginate_room_events( - room_id=room_id, - from_key=from_token.room_key, - to_key=to_room_key, - direction=pagin_config.direction, - limit=pagin_config.limit, - event_filter=event_filter, + events, next_key = ( + await self.store.paginate_room_events_by_topological_ordering( + room_id=room_id, + from_key=from_token.room_key, + to_key=to_room_key, + direction=pagin_config.direction, + limit=pagin_config.limit, + event_filter=event_filter, + ) ) if pagin_config.direction == Direction.BACKWARDS: @@ -582,13 +584,15 @@ async def get_messages( # If we did backfill something, refetch the events from the database to # catch anything new that might have been added since we last fetched. if did_backfill: - events, next_key = await self.store.paginate_room_events( - room_id=room_id, - from_key=from_token.room_key, - to_key=to_room_key, - direction=pagin_config.direction, - limit=pagin_config.limit, - event_filter=event_filter, + events, next_key = ( + await self.store.paginate_room_events_by_topological_ordering( + room_id=room_id, + from_key=from_token.room_key, + to_key=to_room_key, + direction=pagin_config.direction, + limit=pagin_config.limit, + event_filter=event_filter, + ) ) else: # Otherwise, we can backfill in the background for eventual diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index d57c433f040..b40191b591d 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1877,13 +1877,15 @@ async def get_room_sync_data( # FIXME: Using workaround for mypy, # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and # https://github.com/python/mypy/issues/17479 - paginate_room_events: PaginateFunction = self.store.paginate_room_events + paginate_room_events_by_topological_ordering: PaginateFunction = ( + self.store.paginate_room_events_by_topological_ordering + ) get_room_events_stream_for_room: PaginateFunction = ( self.store.get_room_events_stream_for_room ) pagination_method: PaginateFunction = ( # Use `topographical_ordering` for historical events - paginate_room_events + paginate_room_events_by_topological_ordering if from_bound is None # Use `stream_ordering` for updates else get_room_events_stream_for_room diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a049703ae4a..27315ef6e4a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -896,13 +896,15 @@ async def _load_filtered_recents( # FIXME: Using workaround for mypy, # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and # https://github.com/python/mypy/issues/17479 - paginate_room_events: PaginateFunction = self.store.paginate_room_events + paginate_room_events_by_topological_ordering: PaginateFunction = ( + self.store.paginate_room_events_by_topological_ordering + ) get_room_events_stream_for_room: PaginateFunction = ( self.store.get_room_events_stream_for_room ) pagination_method: PaginateFunction = ( # Use `topographical_ordering` for historical events - paginate_room_events + paginate_room_events_by_topological_ordering if since_key is None # Use `stream_ordering` for updates else get_room_events_stream_for_room diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 14b74d7d0d9..00593966a20 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -878,7 +878,7 @@ def f(txn: LoggingTransaction) -> List[_EventDictReturn]: ) else: # TODO (erikj): We should work out what to do here instead. (same as - # `_paginate_room_events_txn(...)`) + # `_paginate_room_events_by_topological_ordering_txn(...)`) next_key = to_key if to_key else from_key return ret, next_key @@ -1188,7 +1188,7 @@ async def get_recent_event_ids_for_room( rows, token = await self.db_pool.runInteraction( "get_recent_event_ids_for_room", - self._paginate_room_events_txn, + self._paginate_room_events_by_topological_ordering_txn, room_id, from_token=end_token, limit=limit, @@ -1693,7 +1693,7 @@ def _get_events_around_txn( topological=topological_ordering, stream=stream_ordering ) - rows, start_token = self._paginate_room_events_txn( + rows, start_token = self._paginate_room_events_by_topological_ordering_txn( txn, room_id, before_token, @@ -1703,7 +1703,7 @@ def _get_events_around_txn( ) events_before = [r.event_id for r in rows] - rows, end_token = self._paginate_room_events_txn( + rows, end_token = self._paginate_room_events_by_topological_ordering_txn( txn, room_id, after_token, @@ -1866,7 +1866,7 @@ def _reset_federation_positions_txn(self, txn: LoggingTransaction) -> None: def has_room_changed_since(self, room_id: str, stream_id: int) -> bool: return self._events_stream_cache.has_entity_changed(room_id, stream_id) - def _paginate_room_events_txn( + def _paginate_room_events_by_topological_ordering_txn( self, txn: LoggingTransaction, room_id: str, @@ -2028,7 +2028,7 @@ def _paginate_room_events_txn( return rows, next_token @trace - async def paginate_room_events( + async def paginate_room_events_by_topological_ordering( self, *, room_id: str, @@ -2059,8 +2059,8 @@ async def paginate_room_events( and `to_key`). """ rows, token = await self.db_pool.runInteraction( - "paginate_room_events", - self._paginate_room_events_txn, + "paginate_room_events_by_topological_ordering", + self._paginate_room_events_by_topological_ordering_txn, room_id, from_key, to_key, diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 9dea1af8ead..7b7590da76a 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -148,7 +148,7 @@ def _filter_messages(self, filter: JsonDict) -> List[str]: """Make a request to /messages with a filter, returns the chunk of events.""" events, next_key = self.get_success( - self.hs.get_datastores().main.paginate_room_events( + self.hs.get_datastores().main.paginate_room_events_by_topological_ordering( room_id=self.room_id, from_key=self.from_token.room_key, to_key=None, From 3540ac753bb2b2a2e420e6f8b07c05701390c50e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 5 Aug 2024 18:07:08 -0500 Subject: [PATCH 14/17] Standardize docstring --- synapse/storage/databases/main/stream.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 00593966a20..39b64e794af 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -767,9 +767,9 @@ async def get_room_events_stream_for_room( from_key: The token to stream from (starting point and heading in the given direction) to_key: The token representing the end stream position (end point) - limit: Maximum number of events to return direction: Indicates whether we are paginating forwards or backwards from `from_key`. + limit: Maximum number of events to return Returns: The results as a list of events and a token that points to the end @@ -2038,18 +2038,19 @@ async def paginate_room_events_by_topological_ordering( limit: int = 0, event_filter: Optional[Filter] = None, ) -> Tuple[List[EventBase], RoomStreamToken]: - """Returns list of events before or after a given token. - - When Direction.FORWARDS: from_key < x <= to_key - When Direction.BACKWARDS: from_key >= x > to_key + """ + Paginate events by `topological_ordering` (tie-break with `stream_ordering`) in + the room from the `from_key` in the given `direction` to the `to_key` or + `limit`. Args: room_id - from_key: The token used to stream from - to_key: A token which if given limits the results to only those before + from_key: The token to stream from (starting point and heading in the given + direction) + to_key: The token representing the end stream position (end point) direction: Indicates whether we are paginating forwards or backwards from `from_key`. - limit: The maximum number of events to return. + limit: Maximum number of events to return event_filter: If provided filters the events to those that match the filter. Returns: @@ -2057,6 +2058,9 @@ async def paginate_room_events_by_topological_ordering( of the result set. If no events are returned then the end of the stream has been reached (i.e. there are no events between `from_key` and `to_key`). + + When Direction.FORWARDS: from_key < x <= to_key, (ascending order) + When Direction.BACKWARDS: from_key >= x > to_key, (descending order) """ rows, token = await self.db_pool.runInteraction( "paginate_room_events_by_topological_ordering", From f27e145310684727f30680d8786c63dfa7af6e86 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 5 Aug 2024 18:08:41 -0500 Subject: [PATCH 15/17] Rename `get_room_events_stream_for_room(...)` -> `paginate_room_events_by_stream_ordering(...)` --- synapse/handlers/sliding_sync.py | 6 +++--- synapse/handlers/sync.py | 6 +++--- synapse/storage/databases/main/stream.py | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index b40191b591d..a5e49a35e72 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1880,15 +1880,15 @@ async def get_room_sync_data( paginate_room_events_by_topological_ordering: PaginateFunction = ( self.store.paginate_room_events_by_topological_ordering ) - get_room_events_stream_for_room: PaginateFunction = ( - self.store.get_room_events_stream_for_room + paginate_room_events_by_stream_ordering: PaginateFunction = ( + self.store.paginate_room_events_by_stream_ordering ) pagination_method: PaginateFunction = ( # Use `topographical_ordering` for historical events paginate_room_events_by_topological_ordering if from_bound is None # Use `stream_ordering` for updates - else get_room_events_stream_for_room + else paginate_room_events_by_stream_ordering ) timeline_events, new_room_key = await pagination_method( room_id=room_id, diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 27315ef6e4a..6af2eeb75ff 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -899,15 +899,15 @@ async def _load_filtered_recents( paginate_room_events_by_topological_ordering: PaginateFunction = ( self.store.paginate_room_events_by_topological_ordering ) - get_room_events_stream_for_room: PaginateFunction = ( - self.store.get_room_events_stream_for_room + paginate_room_events_by_stream_ordering: PaginateFunction = ( + self.store.paginate_room_events_by_stream_ordering ) pagination_method: PaginateFunction = ( # Use `topographical_ordering` for historical events paginate_room_events_by_topological_ordering if since_key is None # Use `stream_ordering` for updates - else get_room_events_stream_for_room + else paginate_room_events_by_stream_ordering ) events, end_key = await pagination_method( room_id=room_id, diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 39b64e794af..dc707b6b634 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -720,7 +720,7 @@ async def get_room_events_stream_for_rooms( defer.gatherResults( [ run_in_background( - self.get_room_events_stream_for_room, + self.paginate_room_events_by_stream_ordering, room_id=room_id, from_key=from_key, to_key=to_key, @@ -749,7 +749,7 @@ def get_rooms_that_changed( if self._events_stream_cache.has_entity_changed(room_id, from_id) } - async def get_room_events_stream_for_room( + async def paginate_room_events_by_stream_ordering( self, *, room_id: str, From 0874a2ee2340599392bcca913d54c9e05a795ca0 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 6 Aug 2024 13:41:36 -0500 Subject: [PATCH 16/17] Add checks to outside See https://github.com/element-hq/synapse/pull/17510#discussion_r1705410724 --- synapse/storage/databases/main/stream.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index dc707b6b634..210ef93d049 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -2062,6 +2062,30 @@ async def paginate_room_events_by_topological_ordering( When Direction.FORWARDS: from_key < x <= to_key, (ascending order) When Direction.BACKWARDS: from_key >= x > to_key, (descending order) """ + # We have these checks outside of the transaction function (txn) to save getting + # a DB connection and switching threads if we don't need to. + # + # We can bail early if we're looking forwards, and our `to_key` is already + # before our `from_key`. + if ( + direction == Direction.FORWARDS + and to_key is not None + and to_key.is_before_or_eq(from_key) + ): + # Token selection matches what we do in `_paginate_room_events_txn` if there + # are no rows + return [], to_key if to_key else from_key + # Or vice-versa, if we're looking backwards and our `from_key` is already before + # our `to_key`. + elif ( + direction == Direction.BACKWARDS + and to_key is not None + and from_key.is_before_or_eq(to_key) + ): + # Token selection matches what we do in `_paginate_room_events_txn` if there + # are no rows + return [], to_key if to_key else from_key + rows, token = await self.db_pool.runInteraction( "paginate_room_events_by_topological_ordering", self._paginate_room_events_by_topological_ordering_txn, From 4888d44bae6fe38d7e3cc3d129af24d1557b3a59 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 6 Aug 2024 14:05:21 -0500 Subject: [PATCH 17/17] Add future FIXME's to enforce to_key on FORWARDS See https://github.com/element-hq/synapse/pull/17510#discussion_r1705648668 --- synapse/storage/databases/main/stream.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 47c19e85ac5..4989c960a64 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -780,6 +780,11 @@ async def paginate_room_events_by_stream_ordering( When Direction.FORWARDS: from_key < x <= to_key, (ascending order) When Direction.BACKWARDS: from_key >= x > to_key, (descending order) """ + + # FIXME: When going forwards, we should enforce that the `to_key` is not `None` + # because we always need an upper bound when querying the events stream (as + # otherwise we'll potentially pick up events that are not fully persisted). + # We should only be working with `stream_ordering` tokens here assert from_key is None or from_key.topological is None assert to_key is None or to_key.topological is None @@ -2065,6 +2070,11 @@ async def paginate_room_events_by_topological_ordering( When Direction.FORWARDS: from_key < x <= to_key, (ascending order) When Direction.BACKWARDS: from_key >= x > to_key, (descending order) """ + + # FIXME: When going forwards, we should enforce that the `to_key` is not `None` + # because we always need an upper bound when querying the events stream (as + # otherwise we'll potentially pick up events that are not fully persisted). + # We have these checks outside of the transaction function (txn) to save getting # a DB connection and switching threads if we don't need to. #