From da5339dc54933a3360b9fc8ad59377309b13d94b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Aug 2024 15:29:24 +0100 Subject: [PATCH 01/28] Migrate to per-connection state class --- synapse/handlers/sliding_sync.py | 102 +++++++++++++++++++------------ 1 file changed, 62 insertions(+), 40 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 99510254f31..2a104f83fdb 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -571,21 +571,19 @@ async def current_sync_for_user( # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() - if from_token: - # Check that we recognize the connection position, if not tell the - # clients that they need to start again. - # - # If we don't do this and the client asks for the full range of - # rooms, we end up sending down all rooms and their state from - # scratch (which can be very slow). By expiring the connection we - # allow the client a chance to do an initial request with a smaller - # range of rooms to get them some results sooner but will end up - # taking the same amount of time (more with round-trips and - # re-processing) in the end to get everything again. - if not await self.connection_store.is_valid_token( - sync_config, from_token.connection_position - ): - raise SlidingSyncUnknownPosition() + # Get the per-connection state (if any). + # + # Raises an exception if there is a `connection_position` that we don't + # recognize. If we don't do this and the client asks for the full range + # of rooms, we end up sending down all rooms and their state from + # scratch (which can be very slow). By expiring the connection we allow + # the client a chance to do an initial request with a smaller range of + # rooms to get them some results sooner but will end up taking the same + # amount of time (more with round-trips and re-processing) in the end to + # get everything again. + per_connection_state = await self.connection_store.get_per_connection_state( + sync_config, from_token + ) await self.connection_store.mark_token_seen( sync_config=sync_config, @@ -781,11 +779,7 @@ async def current_sync_for_user( # we haven't sent the room down, or we have but there are missing # updates). for room_id in relevant_room_map: - status = await self.connection_store.have_sent_room( - sync_config, - from_token.connection_position, - room_id, - ) + status = per_connection_state.have_sent_room(room_id) if ( # The room was never sent down before so the client needs to know # about it regardless of any updates. @@ -821,6 +815,7 @@ async def current_sync_for_user( async def handle_room(room_id: str) -> None: room_sync_result = await self.get_room_sync_data( sync_config=sync_config, + per_connection_state=per_connection_state, room_id=room_id, room_sync_config=relevant_rooms_to_send_map[room_id], room_membership_for_user_at_to_token=room_membership_for_user_map[ @@ -885,6 +880,7 @@ async def handle_room(room_id: str) -> None: connection_position = await self.connection_store.record_rooms( sync_config=sync_config, from_token=from_token, + per_connection_state=per_connection_state, sent_room_ids=relevant_rooms_to_send_map.keys(), unsent_room_ids=unsent_room_ids, ) @@ -1939,6 +1935,7 @@ async def get_current_state_at( async def get_room_sync_data( self, sync_config: SlidingSyncConfig, + per_connection_state: "PerConnectionState", room_id: str, room_sync_config: RoomSyncConfig, room_membership_for_user_at_to_token: _RoomMembershipForUser, @@ -1986,11 +1983,7 @@ async def get_room_sync_data( from_bound = None initial = True if from_token and not room_membership_for_user_at_to_token.newly_joined: - room_status = await self.connection_store.have_sent_room( - sync_config=sync_config, - connection_token=from_token.connection_position, - room_id=room_id, - ) + room_status = per_connection_state.have_sent_room(room_id) if room_status.status == HaveSentRoomFlag.LIVE: from_bound = from_token.stream_token.room_key initial = False @@ -3034,6 +3027,21 @@ def previously(last_token: RoomStreamToken) -> "HaveSentRoom": HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None) +@attr.s(auto_attribs=True, slots=True, frozen=True) +class PerConnectionState: + """The per-connection state + + Attributes: + rooms: The state of rooms that have been sent to clients. + """ + + rooms: Mapping[str, HaveSentRoom] = {} + + def have_sent_room(self, room_id: str) -> HaveSentRoom: + """Return whether we have previously sent the room down""" + return self.rooms.get(room_id, HAVE_SENT_ROOM_NEVER) + + @attr.s(auto_attribs=True) class SlidingSyncConnectionStore: """In-memory store of per-connection state, including what rooms we have @@ -3063,9 +3071,9 @@ class SlidingSyncConnectionStore: to mapping of room ID to `HaveSentRoom`. """ - # `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom` - _connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = ( - attr.Factory(dict) + # `(user_id, conn_id)` -> `token` -> `PerConnectionState` + _connections: Dict[Tuple[str, str], Dict[int, PerConnectionState]] = attr.Factory( + dict ) async def is_valid_token( @@ -3078,26 +3086,40 @@ async def is_valid_token( conn_key = self._get_connection_key(sync_config) return connection_token in self._connections.get(conn_key, {}) - async def have_sent_room( - self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str - ) -> HaveSentRoom: - """For the given user_id/conn_id/token, return whether we have - previously sent the room down + async def get_per_connection_state( + self, + sync_config: SlidingSyncConfig, + from_token: Optional[SlidingSyncStreamToken], + ) -> PerConnectionState: + """Fetch the per-connection state for the token. + + Raises: + SlidingSyncUnknownPosition if the connection_token is unknown """ + if from_token is None: + return PerConnectionState() + + connection_position = from_token.connection_position + if connection_position == 0: + # The '0' values is a special value to indicate there is no + # per-connection state. + return PerConnectionState() conn_key = self._get_connection_key(sync_config) - sync_statuses = self._connections.setdefault(conn_key, {}) - room_status = sync_statuses.get(connection_token, {}).get( - room_id, HAVE_SENT_ROOM_NEVER - ) + sync_statuses = self._connections.get(conn_key, {}) + connection_state = sync_statuses.get(connection_position) + + if connection_state is None: + raise SlidingSyncUnknownPosition() - return room_status + return connection_state @trace async def record_rooms( self, sync_config: SlidingSyncConfig, from_token: Optional[SlidingSyncStreamToken], + per_connection_state: PerConnectionState, *, sent_room_ids: StrCollection, unsent_room_ids: StrCollection, @@ -3131,7 +3153,7 @@ async def record_rooms( sync_statuses.pop(new_store_token, None) # Copy over and update the room mappings. - new_room_statuses = dict(sync_statuses.get(prev_connection_token, {})) + new_room_statuses = dict(per_connection_state.rooms) # Whether we have updated the `new_room_statuses`, if we don't by the # end we can treat this as a noop. @@ -3165,7 +3187,7 @@ async def record_rooms( if not have_updated: return prev_connection_token - sync_statuses[new_store_token] = new_room_statuses + sync_statuses[new_store_token] = PerConnectionState(rooms=new_room_statuses) return new_store_token From baac6c550e457c5ce79ebb24fae5813b93eeacb9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Aug 2024 18:44:14 +0100 Subject: [PATCH 02/28] Record with new class --- synapse/handlers/sliding_sync.py | 140 ++++++++++++++++++------------- 1 file changed, 81 insertions(+), 59 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 2a104f83fdb..055418e8421 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -19,6 +19,8 @@ # import enum import logging +import typing +from collections import ChainMap from enum import Enum from itertools import chain from typing import ( @@ -848,6 +850,8 @@ async def handle_room(room_id: str) -> None: ) if has_lists or has_room_subscriptions: + new_connection_state = per_connection_state.get_mutable() + # We now calculate if any rooms outside the range have had updates, # which we are not sending down. # @@ -877,12 +881,16 @@ async def handle_room(room_id: str) -> None: ) unsent_room_ids = list(missing_event_map_by_room) - connection_position = await self.connection_store.record_rooms( + new_connection_state.record_unsent_rooms( + unsent_room_ids, from_token.stream_token + ) + + new_connection_state.record_sent_rooms(relevant_rooms_to_send_map.keys()) + + connection_position = await self.connection_store.record_new_state( sync_config=sync_config, from_token=from_token, - per_connection_state=per_connection_state, - sent_room_ids=relevant_rooms_to_send_map.keys(), - unsent_room_ids=unsent_room_ids, + per_connection_state=new_connection_state, ) elif from_token: connection_position = from_token.connection_position @@ -3041,6 +3049,69 @@ def have_sent_room(self, room_id: str) -> HaveSentRoom: """Return whether we have previously sent the room down""" return self.rooms.get(room_id, HAVE_SENT_ROOM_NEVER) + def get_mutable(self) -> "MutablePerConnectionState": + """Get a mutable copy of this state.""" + return MutablePerConnectionState( + rooms=dict(self.rooms), + ) + + +class MutablePerConnectionState(PerConnectionState): + """A mutable version of `PerConnectionState`""" + + rooms: typing.ChainMap[str, HaveSentRoom] + + def __init__( + self, + rooms: Dict[str, HaveSentRoom], + ) -> None: + super().__init__( + rooms=ChainMap({}, rooms), + ) + + def updated_rooms(self) -> Mapping[str, HaveSentRoom]: + """Return the room entries that have been updated""" + return self.rooms.maps[0] + + def has_updates(self) -> bool: + """Are there any updates""" + return bool(self.updated_rooms()) + + def record_sent_rooms(self, room_ids: StrCollection) -> None: + """Record that we have sent these rooms in the response""" + for room_id in room_ids: + current_status = self.rooms.get(room_id) + if ( + current_status is not None + and current_status.status == HaveSentRoomFlag.LIVE + ): + continue + + self.rooms[room_id] = HAVE_SENT_ROOM_LIVE + + def record_unsent_rooms( + self, room_ids: StrCollection, from_token: StreamToken + ) -> None: + """Record that we have not sent these rooms in the response, but there + have been updates. + """ + # Whether we add/update the entries for unsent rooms depends on the + # existing entry: + # - LIVE: We have previously sent down everything up to + # `last_room_token, so we update the entry to be `PREVIOUSLY` with + # `last_room_token`. + # - PREVIOUSLY: We have previously sent down everything up to *a* + # given token, so we don't need to update the entry. + # - NEVER: We have never previously sent down the room, and we haven't + # sent anything down this time either so we leave it as NEVER. + + for room_id in room_ids: + current_status = self.rooms.get(room_id) + if current_status is None or current_status.status != HaveSentRoomFlag.LIVE: + continue + + self.rooms[room_id] = HaveSentRoom.previously(from_token.room_key) + @attr.s(auto_attribs=True) class SlidingSyncConnectionStore: @@ -3115,33 +3186,17 @@ async def get_per_connection_state( return connection_state @trace - async def record_rooms( + async def record_new_state( self, sync_config: SlidingSyncConfig, from_token: Optional[SlidingSyncStreamToken], - per_connection_state: PerConnectionState, - *, - sent_room_ids: StrCollection, - unsent_room_ids: StrCollection, + per_connection_state: MutablePerConnectionState, ) -> int: - """Record which rooms we have/haven't sent down in a new response - - Attributes: - sync_config - from_token: The since token from the request, if any - sent_room_ids: The set of room IDs that we have sent down as - part of this request (only needs to be ones we didn't - previously sent down). - unsent_room_ids: The set of room IDs that have had updates - since the `from_token`, but which were not included in - this request - """ prev_connection_token = 0 if from_token is not None: prev_connection_token = from_token.connection_position - # If there are no changes then this is a noop. - if not sent_room_ids and not unsent_room_ids: + if not per_connection_state.has_updates(): return prev_connection_token conn_key = self._get_connection_key(sync_config) @@ -3152,42 +3207,9 @@ async def record_rooms( new_store_token = prev_connection_token + 1 sync_statuses.pop(new_store_token, None) - # Copy over and update the room mappings. - new_room_statuses = dict(per_connection_state.rooms) - - # Whether we have updated the `new_room_statuses`, if we don't by the - # end we can treat this as a noop. - have_updated = False - for room_id in sent_room_ids: - new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE - have_updated = True - - # Whether we add/update the entries for unsent rooms depends on the - # existing entry: - # - LIVE: We have previously sent down everything up to - # `last_room_token, so we update the entry to be `PREVIOUSLY` with - # `last_room_token`. - # - PREVIOUSLY: We have previously sent down everything up to *a* - # given token, so we don't need to update the entry. - # - NEVER: We have never previously sent down the room, and we haven't - # sent anything down this time either so we leave it as NEVER. - - # Work out the new state for unsent rooms that were `LIVE`. - if from_token: - new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key) - else: - new_unsent_state = HAVE_SENT_ROOM_NEVER - - for room_id in unsent_room_ids: - prev_state = new_room_statuses.get(room_id) - if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE: - new_room_statuses[room_id] = new_unsent_state - have_updated = True - - if not have_updated: - return prev_connection_token - - sync_statuses[new_store_token] = PerConnectionState(rooms=new_room_statuses) + sync_statuses[new_store_token] = PerConnectionState( + rooms=dict(per_connection_state.rooms), + ) return new_store_token From 0561c86c5d29ec61deb0d200356a8ec0b695b06d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Aug 2024 09:25:23 +0100 Subject: [PATCH 03/28] Revamp --- synapse/handlers/sliding_sync.py | 98 +++++++++++++++++++++----------- 1 file changed, 66 insertions(+), 32 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 055418e8421..00d7cb09ffe 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -32,11 +32,13 @@ List, Literal, Mapping, + MutableMapping, Optional, Sequence, Set, Tuple, Union, + cast, ) import attr @@ -781,7 +783,7 @@ async def current_sync_for_user( # we haven't sent the room down, or we have but there are missing # updates). for room_id in relevant_room_map: - status = per_connection_state.have_sent_room(room_id) + status = per_connection_state.rooms.have_sent_room(room_id) if ( # The room was never sent down before so the client needs to know # about it regardless of any updates. @@ -881,11 +883,13 @@ async def handle_room(room_id: str) -> None: ) unsent_room_ids = list(missing_event_map_by_room) - new_connection_state.record_unsent_rooms( + new_connection_state.rooms.record_unsent_rooms( unsent_room_ids, from_token.stream_token ) - new_connection_state.record_sent_rooms(relevant_rooms_to_send_map.keys()) + new_connection_state.rooms.record_sent_rooms( + relevant_rooms_to_send_map.keys() + ) connection_position = await self.connection_store.record_new_state( sync_config=sync_config, @@ -1991,7 +1995,7 @@ async def get_room_sync_data( from_bound = None initial = True if from_token and not room_membership_for_user_at_to_token.newly_joined: - room_status = per_connection_state.have_sent_room(room_id) + room_status = per_connection_state.rooms.have_sent_room(room_id) if room_status.status == HaveSentRoomFlag.LIVE: from_bound = from_token.stream_token.room_key initial = False @@ -3036,58 +3040,61 @@ def previously(last_token: RoomStreamToken) -> "HaveSentRoom": @attr.s(auto_attribs=True, slots=True, frozen=True) -class PerConnectionState: - """The per-connection state - - Attributes: - rooms: The state of rooms that have been sent to clients. - """ +class RoomStatusesForStream: + """For a given stream, e.g. events, records what we have or have not sent + down for that stream in a given room.""" - rooms: Mapping[str, HaveSentRoom] = {} + _statuses: Mapping[str, HaveSentRoom] = attr.Factory(dict) def have_sent_room(self, room_id: str) -> HaveSentRoom: """Return whether we have previously sent the room down""" - return self.rooms.get(room_id, HAVE_SENT_ROOM_NEVER) + return self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) - def get_mutable(self) -> "MutablePerConnectionState": + def get_mutable(self) -> "MutableRoomStatusesForStream": """Get a mutable copy of this state.""" - return MutablePerConnectionState( - rooms=dict(self.rooms), + return MutableRoomStatusesForStream( + statuses=self._statuses, ) + def copy(self) -> "RoomStatusesForStream": + """Make a copy of the class. Useful for converting from a mutable to + immutable version.""" -class MutablePerConnectionState(PerConnectionState): - """A mutable version of `PerConnectionState`""" + return RoomStatusesForStream(statuses=dict(self._statuses)) - rooms: typing.ChainMap[str, HaveSentRoom] + +class MutableRoomStatusesForStream(RoomStatusesForStream): + """A mutable version of `RoomStatusesForStream`""" + + _statuses: typing.ChainMap[str, HaveSentRoom] def __init__( self, - rooms: Dict[str, HaveSentRoom], + statuses: Mapping[str, HaveSentRoom], ) -> None: + # ChainMap requires a mutable mapping, but we're not actually going to + # mutate it. + statuses = cast(MutableMapping, statuses) + super().__init__( - rooms=ChainMap({}, rooms), + statuses=ChainMap({}, statuses), ) - def updated_rooms(self) -> Mapping[str, HaveSentRoom]: - """Return the room entries that have been updated""" - return self.rooms.maps[0] - - def has_updates(self) -> bool: - """Are there any updates""" - return bool(self.updated_rooms()) + def get_updates(self) -> Mapping[str, HaveSentRoom]: + """Return only the changes that were made""" + return self._statuses.maps[0] def record_sent_rooms(self, room_ids: StrCollection) -> None: """Record that we have sent these rooms in the response""" for room_id in room_ids: - current_status = self.rooms.get(room_id) + current_status = self._statuses.get(room_id) if ( current_status is not None and current_status.status == HaveSentRoomFlag.LIVE ): continue - self.rooms[room_id] = HAVE_SENT_ROOM_LIVE + self._statuses[room_id] = HAVE_SENT_ROOM_LIVE def record_unsent_rooms( self, room_ids: StrCollection, from_token: StreamToken @@ -3106,11 +3113,38 @@ def record_unsent_rooms( # sent anything down this time either so we leave it as NEVER. for room_id in room_ids: - current_status = self.rooms.get(room_id) + current_status = self._statuses.get(room_id) if current_status is None or current_status.status != HaveSentRoomFlag.LIVE: continue - self.rooms[room_id] = HaveSentRoom.previously(from_token.room_key) + self._statuses[room_id] = HaveSentRoom.previously(from_token.room_key) + + +@attr.s(auto_attribs=True) +class PerConnectionState: + """The per-connection state + + Attributes: + rooms: The status of each room for the events stream. + """ + + rooms: RoomStatusesForStream = attr.Factory(RoomStatusesForStream) + + def get_mutable(self) -> "MutablePerConnectionState": + """Get a mutable copy of this state.""" + return MutablePerConnectionState( + rooms=self.rooms.get_mutable(), + ) + + +@attr.s(auto_attribs=True) +class MutablePerConnectionState(PerConnectionState): + """A mutable version of `PerConnectionState`""" + + rooms: MutableRoomStatusesForStream + + def has_updates(self) -> bool: + return bool(self.rooms.get_updates()) @attr.s(auto_attribs=True) @@ -3208,7 +3242,7 @@ async def record_new_state( sync_statuses.pop(new_store_token, None) sync_statuses[new_store_token] = PerConnectionState( - rooms=dict(per_connection_state.rooms), + rooms=per_connection_state.rooms.copy(), ) return new_store_token From c15b8b39cd1a6c80883c83aeebc662d435de6338 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Aug 2024 09:01:09 +0100 Subject: [PATCH 04/28] WIP receipts reading --- synapse/handlers/sliding_sync.py | 184 +++++++++++++++++++++---------- 1 file changed, 123 insertions(+), 61 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 00d7cb09ffe..6dc3ab5ed2e 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -29,6 +29,7 @@ Callable, Dict, Final, + Generic, List, Literal, Mapping, @@ -37,6 +38,7 @@ Sequence, Set, Tuple, + TypeVar, Union, cast, ) @@ -55,6 +57,7 @@ from synapse.api.errors import SlidingSyncUnknownPosition from synapse.events import EventBase, StrippedStateEvent from synapse.events.utils import parse_stripped_state_event, strip_event +from synapse.handlers.receipts import ReceiptEventSource from synapse.handlers.relations import BundledAggregations from synapse.logging.opentracing import ( SynapseTags, @@ -840,6 +843,7 @@ async def handle_room(room_id: str) -> None: extensions = await self.get_extensions_response( sync_config=sync_config, actual_lists=lists, + per_connection_state=per_connection_state, # We're purposely using `relevant_room_map` instead of # `relevant_rooms_to_send_map` here. This needs to be all room_ids we could # send regardless of whether they have an event update or not. The @@ -884,7 +888,7 @@ async def handle_room(room_id: str) -> None: unsent_room_ids = list(missing_event_map_by_room) new_connection_state.rooms.record_unsent_rooms( - unsent_room_ids, from_token.stream_token + unsent_room_ids, from_token.stream_token.room_key ) new_connection_state.rooms.record_sent_rooms( @@ -2474,6 +2478,7 @@ async def get_room_sync_data( async def get_extensions_response( self, sync_config: SlidingSyncConfig, + per_connection_state: "PerConnectionState", actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], actual_room_ids: Set[str], actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult], @@ -2528,6 +2533,7 @@ async def get_extensions_response( if sync_config.extensions.receipts is not None: receipts_response = await self.get_receipts_extension_response( sync_config=sync_config, + per_connection_state=per_connection_state, actual_lists=actual_lists, actual_room_ids=actual_room_ids, actual_room_response_map=actual_room_response_map, @@ -2847,6 +2853,7 @@ async def get_account_data_extension_response( async def get_receipts_extension_response( self, sync_config: SlidingSyncConfig, + per_connection_state: "PerConnectionState", actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], actual_room_ids: Set[str], actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult], @@ -2880,48 +2887,90 @@ async def get_receipts_extension_response( room_id_to_receipt_map: Dict[str, JsonMapping] = {} if len(relevant_room_ids) > 0: - # TODO: Take connection tracking into account so that when a room comes back - # into range we can send the receipts that were missed. - receipt_source = self.event_sources.sources.receipt - receipts, _ = await receipt_source.get_new_events( - user=sync_config.user, - from_key=( - from_token.stream_token.receipt_key - if from_token - else MultiWriterStreamToken(stream=0) - ), - to_key=to_token.receipt_key, - # This is a dummy value and isn't used in the function - limit=0, - room_ids=relevant_room_ids, - is_guest=False, + live_rooms = set() + previously_rooms: Dict[str, MultiWriterStreamToken] = {} + initial_rooms = set() + for room_id in actual_room_ids: + if not from_token: + initial_rooms.add(room_id) + continue + + room_result = actual_room_response_map.get(room_id) + if room_result is not None and room_result.initial: + initial_rooms.add(room_id) + continue + + room_status = per_connection_state.receipts.have_sent_room(room_id) + if room_status.status == HaveSentRoomFlag.LIVE: + live_rooms.add(room_id) + elif room_status.status == HaveSentRoomFlag.PREVIOUSLY: + assert room_status.last_token is not None + previously_rooms[room_id] = room_status.last_token + elif room_status.status == HaveSentRoomFlag.NEVER: + initial_rooms.add(room_id) + else: + assert_never(room_status.status) + + fetched_receipts = [] + if live_rooms: + assert from_token is not None + receipts = await self.store.get_linearized_receipts_for_rooms( + room_ids=live_rooms, + from_key=from_token.stream_token.receipt_key, + to_key=to_token.receipt_key, + ) + fetched_receipts.extend(receipts) + + if previously_rooms: + for room_id, receipt_token in previously_rooms.items(): + previously_receipts = ( + await self.store.get_linearized_receipts_for_room( + room_id=room_id, + from_key=receipt_token, + to_key=to_token.receipt_key, + ) + ) + fetched_receipts.extend(previously_receipts) + + for room_id in initial_rooms: + room_result = actual_room_response_map.get(room_id) + if room_result is None: + continue + + relevant_event_ids = [ + event.event_id for event in room_result.timeline_events + ] + + initial_receipts = await self.store.get_linearized_receipts_for_room( + room_id=room_id, + to_key=to_token.receipt_key, + ) + + for receipt in initial_receipts: + content = { + event_id: content_value + for event_id, content_value in receipt["content"].items() + if event_id in relevant_event_ids + } + if content: + fetched_receipts.append( + { + "type": receipt["type"], + "room_id": receipt["room_id"], + "content": content, + } + ) + + fetched_receipts = ReceiptEventSource.filter_out_private_receipts( + fetched_receipts, sync_config.user.to_string() ) - for receipt in receipts: + for receipt in fetched_receipts: # These fields should exist for every receipt room_id = receipt["room_id"] type = receipt["type"] content = receipt["content"] - # For `inital: True` rooms, we only want to include receipts for events - # in the timeline. - room_result = actual_room_response_map.get(room_id) - if room_result is not None: - if room_result.initial: - # TODO: In the future, it would be good to fetch less receipts - # out of the database in the first place but we would need to - # add a new `event_id` index to `receipts_linearized`. - relevant_event_ids = [ - event.event_id for event in room_result.timeline_events - ] - - assert isinstance(content, dict) - content = { - event_id: content_value - for event_id, content_value in content.items() - if event_id in relevant_event_ids - } - room_id_to_receipt_map[room_id] = {"type": type, "content": content} return SlidingSyncResult.Extensions.ReceiptsExtension( @@ -3014,9 +3063,15 @@ class HaveSentRoomFlag(Enum): LIVE = 3 +T = TypeVar("T") + + @attr.s(auto_attribs=True, slots=True, frozen=True) -class HaveSentRoom: - """Whether we have sent the room down a sliding sync connection. +class HaveSentRoom(Generic[T]): + """Whether we have sent the room data down a sliding sync connection. + + We are generic over the type of token used, e.g. `RoomStreamToken` or + `MultiWriterStreamToken`. Attributes: status: Flag of if we have or haven't sent down the room @@ -3027,50 +3082,54 @@ class HaveSentRoom: """ status: HaveSentRoomFlag - last_token: Optional[RoomStreamToken] + last_token: Optional[T] + + @staticmethod + def live() -> "HaveSentRoom[T]": + return HaveSentRoom(HaveSentRoomFlag.LIVE, None) @staticmethod - def previously(last_token: RoomStreamToken) -> "HaveSentRoom": + def previously(last_token: T) -> "HaveSentRoom[T]": """Constructor for `PREVIOUSLY` flag.""" return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token) - -HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None) -HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None) + @staticmethod + def never() -> "HaveSentRoom[T]": + return HaveSentRoom(HaveSentRoomFlag.NEVER, None) @attr.s(auto_attribs=True, slots=True, frozen=True) -class RoomStatusesForStream: +class RoomStatusesForStream(Generic[T]): """For a given stream, e.g. events, records what we have or have not sent down for that stream in a given room.""" - _statuses: Mapping[str, HaveSentRoom] = attr.Factory(dict) + _statuses: Mapping[str, HaveSentRoom[T]] = attr.Factory(dict) - def have_sent_room(self, room_id: str) -> HaveSentRoom: + def have_sent_room(self, room_id: str) -> HaveSentRoom[T]: """Return whether we have previously sent the room down""" - return self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) + return self._statuses.get(room_id, HaveSentRoom.never()) - def get_mutable(self) -> "MutableRoomStatusesForStream": + def get_mutable(self) -> "MutableRoomStatusesForStream[T]": """Get a mutable copy of this state.""" return MutableRoomStatusesForStream( statuses=self._statuses, ) - def copy(self) -> "RoomStatusesForStream": + def copy(self) -> "RoomStatusesForStream[T]": """Make a copy of the class. Useful for converting from a mutable to immutable version.""" return RoomStatusesForStream(statuses=dict(self._statuses)) -class MutableRoomStatusesForStream(RoomStatusesForStream): +class MutableRoomStatusesForStream(RoomStatusesForStream[T]): """A mutable version of `RoomStatusesForStream`""" - _statuses: typing.ChainMap[str, HaveSentRoom] + _statuses: typing.ChainMap[str, HaveSentRoom[T]] def __init__( self, - statuses: Mapping[str, HaveSentRoom], + statuses: Mapping[str, HaveSentRoom[T]], ) -> None: # ChainMap requires a mutable mapping, but we're not actually going to # mutate it. @@ -3080,7 +3139,7 @@ def __init__( statuses=ChainMap({}, statuses), ) - def get_updates(self) -> Mapping[str, HaveSentRoom]: + def get_updates(self) -> Mapping[str, HaveSentRoom[T]]: """Return only the changes that were made""" return self._statuses.maps[0] @@ -3094,11 +3153,9 @@ def record_sent_rooms(self, room_ids: StrCollection) -> None: ): continue - self._statuses[room_id] = HAVE_SENT_ROOM_LIVE + self._statuses[room_id] = HaveSentRoom.live() - def record_unsent_rooms( - self, room_ids: StrCollection, from_token: StreamToken - ) -> None: + def record_unsent_rooms(self, room_ids: StrCollection, from_token: T) -> None: """Record that we have not sent these rooms in the response, but there have been updates. """ @@ -3117,7 +3174,7 @@ def record_unsent_rooms( if current_status is None or current_status.status != HaveSentRoomFlag.LIVE: continue - self._statuses[room_id] = HaveSentRoom.previously(from_token.room_key) + self._statuses[room_id] = HaveSentRoom.previously(from_token) @attr.s(auto_attribs=True) @@ -3128,12 +3185,16 @@ class PerConnectionState: rooms: The status of each room for the events stream. """ - rooms: RoomStatusesForStream = attr.Factory(RoomStatusesForStream) + rooms: RoomStatusesForStream = attr.Factory(RoomStatusesForStream[RoomStreamToken]) + receipts: RoomStatusesForStream = attr.Factory( + RoomStatusesForStream[MultiWriterStreamToken] + ) def get_mutable(self) -> "MutablePerConnectionState": """Get a mutable copy of this state.""" return MutablePerConnectionState( rooms=self.rooms.get_mutable(), + receipts=self.receipts.get_mutable(), ) @@ -3141,10 +3202,11 @@ def get_mutable(self) -> "MutablePerConnectionState": class MutablePerConnectionState(PerConnectionState): """A mutable version of `PerConnectionState`""" - rooms: MutableRoomStatusesForStream + rooms: MutableRoomStatusesForStream[RoomStreamToken] + receipts: MutableRoomStatusesForStream[MultiWriterStreamToken] def has_updates(self) -> bool: - return bool(self.rooms.get_updates()) + return bool(self.rooms.get_updates()) or bool(self.receipts.get_updates()) @attr.s(auto_attribs=True) From a1b75f76f7f125922ac17a50d74732212b97fcde Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Aug 2024 13:46:10 +0100 Subject: [PATCH 05/28] WIP comments --- synapse/handlers/sliding_sync.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 6dc3ab5ed2e..c74370595ca 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2887,9 +2887,13 @@ async def get_receipts_extension_response( room_id_to_receipt_map: Dict[str, JsonMapping] = {} if len(relevant_room_ids) > 0: + # We need to handle the different cases depending on if we have sent + # down receipts previously or not, so we split the relevant rooms + # up into different collections based on status. live_rooms = set() previously_rooms: Dict[str, MultiWriterStreamToken] = {} initial_rooms = set() + for room_id in actual_room_ids: if not from_token: initial_rooms.add(room_id) @@ -2911,7 +2915,12 @@ async def get_receipts_extension_response( else: assert_never(room_status.status) + # The set of receipts that we fetched. Private receipts need to be + # filtered out before returning. fetched_receipts = [] + + # For live rooms we just fetch all receipts in those rooms since the + # `since` token. if live_rooms: assert from_token is not None receipts = await self.store.get_linearized_receipts_for_rooms( @@ -2921,6 +2930,8 @@ async def get_receipts_extension_response( ) fetched_receipts.extend(receipts) + # For rooms we've previously sent down, but aren't up to date, we + # need to use the from token from the room status. if previously_rooms: for room_id, receipt_token in previously_rooms.items(): previously_receipts = ( @@ -2932,6 +2943,8 @@ async def get_receipts_extension_response( ) fetched_receipts.extend(previously_receipts) + # For rooms we haven't previously sent down, we send all receipts + # from that room. for room_id in initial_rooms: room_result = actual_room_response_map.get(room_id) if room_result is None: From 6b9d24451f77bee6baaede7a90cd36afbca7e643 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Aug 2024 09:57:48 +0100 Subject: [PATCH 06/28] Record state --- synapse/handlers/sliding_sync.py | 26 ++++++++++++++++--- .../client/sliding_sync/test_extensions.py | 6 ++--- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index c74370595ca..989ec75da60 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -840,10 +840,13 @@ async def handle_room(room_id: str) -> None: with start_active_span("sliding_sync.generate_room_entries"): await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10) + new_connection_state = per_connection_state.get_mutable() + extensions = await self.get_extensions_response( sync_config=sync_config, actual_lists=lists, per_connection_state=per_connection_state, + mutable_per_connection_state=new_connection_state, # We're purposely using `relevant_room_map` instead of # `relevant_rooms_to_send_map` here. This needs to be all room_ids we could # send regardless of whether they have an event update or not. The @@ -856,8 +859,6 @@ async def handle_room(room_id: str) -> None: ) if has_lists or has_room_subscriptions: - new_connection_state = per_connection_state.get_mutable() - # We now calculate if any rooms outside the range have had updates, # which we are not sending down. # @@ -2479,6 +2480,7 @@ async def get_extensions_response( self, sync_config: SlidingSyncConfig, per_connection_state: "PerConnectionState", + mutable_per_connection_state: "MutablePerConnectionState", actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], actual_room_ids: Set[str], actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult], @@ -2534,6 +2536,7 @@ async def get_extensions_response( receipts_response = await self.get_receipts_extension_response( sync_config=sync_config, per_connection_state=per_connection_state, + mutable_per_connection_state=mutable_per_connection_state, actual_lists=actual_lists, actual_room_ids=actual_room_ids, actual_room_response_map=actual_room_response_map, @@ -2854,6 +2857,7 @@ async def get_receipts_extension_response( self, sync_config: SlidingSyncConfig, per_connection_state: "PerConnectionState", + mutable_per_connection_state: "MutablePerConnectionState", actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], actual_room_ids: Set[str], actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult], @@ -2894,7 +2898,7 @@ async def get_receipts_extension_response( previously_rooms: Dict[str, MultiWriterStreamToken] = {} initial_rooms = set() - for room_id in actual_room_ids: + for room_id in relevant_room_ids: if not from_token: initial_rooms.add(room_id) continue @@ -2986,6 +2990,21 @@ async def get_receipts_extension_response( room_id_to_receipt_map[room_id] = {"type": type, "content": content} + mutable_per_connection_state.receipts.record_sent_rooms(relevant_room_ids) + + if from_token: + receipt_key = from_token.stream_token.receipt_key + rooms_no_receipts = ( + per_connection_state.receipts._statuses.keys() - relevant_room_ids + ) + changed_rooms = self.store._receipts_stream_cache.get_entities_changed( + rooms_no_receipts, + receipt_key.stream, + ) + mutable_per_connection_state.receipts.record_unsent_rooms( + changed_rooms, receipt_key + ) + return SlidingSyncResult.Extensions.ReceiptsExtension( room_id_to_receipt_map=room_id_to_receipt_map, ) @@ -3318,6 +3337,7 @@ async def record_new_state( sync_statuses[new_store_token] = PerConnectionState( rooms=per_connection_state.rooms.copy(), + receipts=per_connection_state.receipts.copy(), ) return new_store_token diff --git a/tests/rest/client/sliding_sync/test_extensions.py b/tests/rest/client/sliding_sync/test_extensions.py index 68f66613347..e4c4e113661 100644 --- a/tests/rest/client/sliding_sync/test_extensions.py +++ b/tests/rest/client/sliding_sync/test_extensions.py @@ -120,19 +120,19 @@ def test_extensions_lists_rooms_relevant_rooms( "foo-list": { "ranges": [[0, 1]], "required_state": [], - "timeline_limit": 0, + "timeline_limit": 1, }, # We expect this list range to include room5, room4, room3 "bar-list": { "ranges": [[0, 2]], "required_state": [], - "timeline_limit": 0, + "timeline_limit": 1, }, }, "room_subscriptions": { room_id1: { "required_state": [], - "timeline_limit": 0, + "timeline_limit": 1, } }, } From 55feaae9ea99494e06d54f1e46ea086346b481fb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Aug 2024 12:12:34 +0100 Subject: [PATCH 07/28] Add tests --- .../sliding_sync/test_extension_receipts.py | 105 ++++++++++++++++++ tests/rest/client/utils.py | 14 ++- 2 files changed, 118 insertions(+), 1 deletion(-) diff --git a/tests/rest/client/sliding_sync/test_extension_receipts.py b/tests/rest/client/sliding_sync/test_extension_receipts.py index 65fbac260ef..39c51b367cd 100644 --- a/tests/rest/client/sliding_sync/test_extension_receipts.py +++ b/tests/rest/client/sliding_sync/test_extension_receipts.py @@ -677,3 +677,108 @@ def test_wait_for_new_data_timeout(self) -> None: set(), exact=True, ) + + def test_receipts_incremental_sync_out_of_range(self) -> None: + """Tests that we don't return read receipts for rooms that fall out of + range, but then do send all read receipts once they're back in range. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id2, user1_id, tok=user1_tok) + + # Send a message and read receipt into room2 + event_response = self.helper.send(room_id2, body="new event", tok=user2_tok) + room2_event_id = event_response["event_id"] + + self.helper.send_read_receipt(room_id2, room2_event_id, tok=user1_tok) + + # Now send a message into room1 so that it is at the top of the list + self.helper.send(room_id1, body="new event", tok=user2_tok) + + # Make a SS request for only the top room. + sync_body = { + "lists": { + "main": { + "ranges": [[0, 0]], + "required_state": [], + "timeline_limit": 5, + } + }, + "extensions": { + "receipts": { + "enabled": True, + } + }, + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # The receipt is in room2, but only room1 is returned, so we don't + # expect to get the receipt. + self.assertIncludes( + response_body["extensions"]["receipts"].get("rooms").keys(), + set(), + exact=True, + ) + + # Move room2 into range. + self.helper.send(room_id2, body="new event", tok=user2_tok) + + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + + # We expect to see the read receipt of room2, as that has the most + # recent update. + self.assertIncludes( + response_body["extensions"]["receipts"].get("rooms").keys(), + {room_id2}, + exact=True, + ) + receipt = response_body["extensions"]["receipts"]["rooms"][room_id2] + self.assertIncludes( + receipt["content"][room2_event_id][ReceiptTypes.READ].keys(), + {user1_id}, + exact=True, + ) + + # Send a message into room1 to bump it to the top, but also send a + # receipt in room2 + self.helper.send(room_id1, body="new event", tok=user2_tok) + self.helper.send_read_receipt(room_id2, room2_event_id, tok=user2_tok) + + # We don't expect to see the new read receipt. + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + self.assertIncludes( + response_body["extensions"]["receipts"].get("rooms").keys(), + set(), + exact=True, + ) + + # But if we send a new message into room2, we expect to get the missing receipts + self.helper.send(room_id2, body="new event", tok=user2_tok) + + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + self.assertIncludes( + response_body["extensions"]["receipts"].get("rooms").keys(), + {room_id2}, + exact=True, + ) + + # We should only see the new receipt + receipt = response_body["extensions"]["receipts"]["rooms"][room_id2] + self.assertIncludes( + receipt["content"][room2_event_id][ReceiptTypes.READ].keys(), + {user2_id}, + exact=True, + ) diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index e43140720db..9614cdd66ac 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -45,7 +45,7 @@ from twisted.test.proto_helpers import MemoryReactorClock from twisted.web.server import Site -from synapse.api.constants import Membership +from synapse.api.constants import Membership, ReceiptTypes from synapse.api.errors import Codes from synapse.server import HomeServer from synapse.types import JsonDict @@ -944,3 +944,15 @@ def initiate_sso_ui_auth( assert len(p.links) == 1, "not exactly one link in confirmation page" oauth_uri = p.links[0] return oauth_uri + + def send_read_receipt(self, room_id: str, event_id: str, *, tok: str) -> None: + """Send a read receipt into the room at the given event""" + channel = make_request( + self.reactor, + self.site, + method="POST", + path=f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_id}", + content={}, + access_token=tok, + ) + assert channel.code == HTTPStatus.OK, channel.text_body From 614c0d73deb8f9c1c786a02b5798c39d99fc9e11 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Aug 2024 12:42:07 +0100 Subject: [PATCH 08/28] Newsfile --- changelog.d/17575.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17575.misc diff --git a/changelog.d/17575.misc b/changelog.d/17575.misc new file mode 100644 index 00000000000..1b4a53ee178 --- /dev/null +++ b/changelog.d/17575.misc @@ -0,0 +1 @@ +Correctly track read receipts that should be sent down in experimental sliding sync. From 100927dde1c92e48f6884faa57dddbc5eff4d98f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Aug 2024 13:15:49 +0100 Subject: [PATCH 09/28] Comments --- synapse/handlers/sliding_sync.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 989ec75da60..274aed3f5af 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2990,9 +2990,13 @@ async def get_receipts_extension_response( room_id_to_receipt_map[room_id] = {"type": type, "content": content} + # Now we update the per-connection state to track which receipts we have + # and haven't sent down. mutable_per_connection_state.receipts.record_sent_rooms(relevant_room_ids) if from_token: + # Use the receipt stream change cache to check which rooms might have + # had receipts that we haven't sent down. receipt_key = from_token.stream_token.receipt_key rooms_no_receipts = ( per_connection_state.receipts._statuses.keys() - relevant_room_ids From 70d32fba832dbcc15a0229725fc4df81ee10600f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Aug 2024 14:31:31 +0100 Subject: [PATCH 10/28] Add proper DB function for getting receipts between things --- synapse/handlers/sliding_sync.py | 12 +++---- synapse/storage/databases/main/receipts.py | 41 ++++++++++++++++++++++ 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 274aed3f5af..dcc82ee0063 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2995,18 +2995,18 @@ async def get_receipts_extension_response( mutable_per_connection_state.receipts.record_sent_rooms(relevant_room_ids) if from_token: - # Use the receipt stream change cache to check which rooms might have - # had receipts that we haven't sent down. - receipt_key = from_token.stream_token.receipt_key + # Now find the set of rooms that may have receipts that we're not + # sending down. rooms_no_receipts = ( per_connection_state.receipts._statuses.keys() - relevant_room_ids ) - changed_rooms = self.store._receipts_stream_cache.get_entities_changed( + changed_rooms = await self.store.get_rooms_with_receipts_between( rooms_no_receipts, - receipt_key.stream, + from_key=from_token.stream_token.receipt_key, + to_key=to_token.receipt_key, ) mutable_per_connection_state.receipts.record_unsent_rooms( - changed_rooms, receipt_key + changed_rooms, from_token.stream_token.receipt_key ) return SlidingSyncResult.Extensions.ReceiptsExtension( diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 3bde0ae0d4f..bc54444ba27 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -51,10 +51,12 @@ JsonMapping, MultiWriterStreamToken, PersistedPosition, + StrCollection, ) from synapse.util import json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.util.iterutils import batch_iter if TYPE_CHECKING: from synapse.server import HomeServer @@ -550,6 +552,45 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str]]: return results + async def get_rooms_with_receipts_between( + self, + room_ids: StrCollection, + from_key: MultiWriterStreamToken, + to_key: MultiWriterStreamToken, + ) -> StrCollection: + """Get the set of rooms that (may) have receipts between the two tokens.""" + + room_ids = self._receipts_stream_cache.get_entities_changed( + room_ids, from_key.stream + ) + if not room_ids: + return [] + + def f(txn: LoggingTransaction, room_ids: StrCollection) -> StrCollection: + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", room_ids + ) + + sql = f""" + SELECT DISTINCT room_id FROM receipts_linearized + WHERE {clause} AND ? < stream_id AND stream_id <= ? + """ + args.append(from_key.stream) + args.append(to_key.get_max_stream_pos()) + + txn.execute(sql, args) + + return [room_id for room_id, in txn] + + results: List[str] = [] + for batch in batch_iter(room_ids, 1000): + batch_result = await self.db_pool.runInteraction( + "get_rooms_with_receipts_between", f, batch + ) + results.extend(batch_result) + + return results + async def get_users_sent_receipts_between( self, last_id: int, current_id: int ) -> List[str]: From ee6efa2c6695a20533b677e0e8360735f240af8c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Aug 2024 13:59:56 +0100 Subject: [PATCH 11/28] Track room configs in per-connection state --- synapse/handlers/sliding_sync.py | 33 +++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index dcc82ee0063..3f553c7c9fb 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -3226,11 +3226,27 @@ class PerConnectionState: RoomStatusesForStream[MultiWriterStreamToken] ) + previous_room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict) + def get_mutable(self) -> "MutablePerConnectionState": """Get a mutable copy of this state.""" + # ChainMap requires a mutable mapping, but we're not actually going to + # mutate it. + previous_room_configs = cast( + MutableMapping[str, RoomSyncConfig], self.previous_room_configs + ) + return MutablePerConnectionState( rooms=self.rooms.get_mutable(), receipts=self.receipts.get_mutable(), + previous_room_configs=ChainMap({}, previous_room_configs), + ) + + def copy(self) -> "PerConnectionState": + return PerConnectionState( + rooms=self.rooms.copy(), + receipts=self.receipts.copy(), + previous_room_configs=dict(self.previous_room_configs), ) @@ -3241,8 +3257,18 @@ class MutablePerConnectionState(PerConnectionState): rooms: MutableRoomStatusesForStream[RoomStreamToken] receipts: MutableRoomStatusesForStream[MultiWriterStreamToken] + previous_room_configs: typing.ChainMap[str, RoomSyncConfig] + def has_updates(self) -> bool: - return bool(self.rooms.get_updates()) or bool(self.receipts.get_updates()) + return ( + bool(self.rooms.get_updates()) + or bool(self.receipts.get_updates()) + or bool(self.get_room_config_updates()) + ) + + def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]: + """Get updates to the room sync config""" + return self.previous_room_configs.maps[0] @attr.s(auto_attribs=True) @@ -3339,10 +3365,7 @@ async def record_new_state( new_store_token = prev_connection_token + 1 sync_statuses.pop(new_store_token, None) - sync_statuses[new_store_token] = PerConnectionState( - rooms=per_connection_state.rooms.copy(), - receipts=per_connection_state.receipts.copy(), - ) + sync_statuses[new_store_token] = per_connection_state.copy() return new_store_token From 009af0e5606136696ebd5a58a8c649268e319cf0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Aug 2024 14:00:07 +0100 Subject: [PATCH 12/28] Handle timeline_limit changes --- synapse/handlers/sliding_sync.py | 116 +++++++++++++++- .../sliding_sync/test_rooms_timeline.py | 124 ++++++++++++++++++ 2 files changed, 234 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 3f553c7c9fb..edf46604247 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -785,7 +785,19 @@ async def current_sync_for_user( # subscription and have updates we need to send (i.e. either because # we haven't sent the room down, or we have but there are missing # updates). - for room_id in relevant_room_map: + for room_id, room_config in relevant_room_map.items(): + prev_room_sync_config = ( + per_connection_state.previous_room_configs.get(room_id) + ) + if prev_room_sync_config is not None: + # Always include rooms whose timeline limit has increased. + if ( + prev_room_sync_config.timeline_limit + < room_config.timeline_limit + ): + rooms_should_send.add(room_id) + continue + status = per_connection_state.rooms.have_sent_room(room_id) if ( # The room was never sent down before so the client needs to know @@ -817,12 +829,17 @@ async def current_sync_for_user( if room_id in rooms_should_send } + new_connection_state = per_connection_state.get_mutable() + @trace @tag_args async def handle_room(room_id: str) -> None: + set_tag("room_id", room_id) + room_sync_result = await self.get_room_sync_data( sync_config=sync_config, per_connection_state=per_connection_state, + mutable_per_connection_state=new_connection_state, room_id=room_id, room_sync_config=relevant_rooms_to_send_map[room_id], room_membership_for_user_at_to_token=room_membership_for_user_map[ @@ -840,8 +857,6 @@ async def handle_room(room_id: str) -> None: with start_active_span("sliding_sync.generate_room_entries"): await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10) - new_connection_state = per_connection_state.get_mutable() - extensions = await self.get_extensions_response( sync_config=sync_config, actual_lists=lists, @@ -1953,6 +1968,7 @@ async def get_room_sync_data( self, sync_config: SlidingSyncConfig, per_connection_state: "PerConnectionState", + mutable_per_connection_state: "MutablePerConnectionState", room_id: str, room_sync_config: RoomSyncConfig, room_membership_for_user_at_to_token: _RoomMembershipForUser, @@ -1997,8 +2013,15 @@ async def get_room_sync_data( # connection before # # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 + # + # We also need to check if the timeline limit has increased, if so we ignore + # the from bound for the timeline to send down a larger chunk of + # history. + # + # TODO: Also handle changes to `required_state` from_bound = None initial = True + ignore_timeline_bound = False if from_token and not room_membership_for_user_at_to_token.newly_joined: room_status = per_connection_state.rooms.have_sent_room(room_id) if room_status.status == HaveSentRoomFlag.LIVE: @@ -2016,7 +2039,39 @@ async def get_room_sync_data( log_kv({"sliding_sync.room_status": room_status}) - log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial}) + prev_room_sync_config = per_connection_state.previous_room_configs.get( + room_id + ) + if prev_room_sync_config is not None: + # Check if the timeline limit has increased, if so ignore the + # timeline bound and record the change. + if ( + prev_room_sync_config.timeline_limit + < room_sync_config.timeline_limit + ): + ignore_timeline_bound = True + mutable_per_connection_state.previous_room_configs[room_id] = ( + room_sync_config + ) + + if ( + room_status.status != HaveSentRoomFlag.LIVE + and prev_room_sync_config.timeline_limit + > room_sync_config.timeline_limit + ): + mutable_per_connection_state.previous_room_configs[room_id] = ( + room_sync_config + ) + + # TODO: Record changes in required_state. + + log_kv( + { + "sliding_sync.from_bound": from_bound, + "sliding_sync.initial": initial, + "sliding_sync.ignore_timeline_bound": ignore_timeline_bound, + } + ) # Assemble the list of timeline events # @@ -2053,6 +2108,10 @@ async def get_room_sync_data( room_membership_for_user_at_to_token.event_pos.to_room_stream_token() ) + timeline_from_bound = from_bound + if ignore_timeline_bound: + timeline_from_bound = None + # For initial `/sync` (and other historical scenarios mentioned above), we # want to view a historical section of the timeline; to fetch events by # `topological_ordering` (best representation of the room DAG as others were @@ -2078,7 +2137,7 @@ async def get_room_sync_data( pagination_method: PaginateFunction = ( # Use `topographical_ordering` for historical events paginate_room_events_by_topological_ordering - if from_bound is None + if timeline_from_bound is None # Use `stream_ordering` for updates else paginate_room_events_by_stream_ordering ) @@ -2088,7 +2147,7 @@ async def get_room_sync_data( # (from newer to older events) starting at to_bound. # This ensures we fill the `limit` with the newest events first, from_key=to_bound, - to_key=from_bound, + to_key=timeline_from_bound, direction=Direction.BACKWARDS, # We add one so we can determine if there are enough events to saturate # the limit or not (see `limited`) @@ -2446,6 +2505,51 @@ async def get_room_sync_data( if new_bump_event_pos.stream > 0: bump_stamp = new_bump_event_pos.stream + prev_room_sync_config = per_connection_state.previous_room_configs.get(room_id) + if ignore_timeline_bound: + # FIXME: We signal the fact that we're sending down more events to + # the client by setting `initial=true` *without* sending down all + # the state/metadata again, which is what the proxy does. We should + # update the protocol to do something less silly. + initial = True + + mutable_per_connection_state.previous_room_configs[room_id] = ( + RoomSyncConfig( + timeline_limit=len(timeline_events), + required_state_map=room_sync_config.required_state_map, + ) + ) + elif prev_room_sync_config is not None: + # If the result isn't limited then we don't need to record that the + # timeline_limit has been reduced, as the *effective* timeline limit + # (i.e. the amount of timeline we have previously sent) is at least + # the previous timeline limit. + # + # This is to handle the case where the timeline limit e.g. goes from + # 10 to 5 to 10 again (without any timeline gaps), where there's no + # point sending down extra events when the timeline limit is + # increased as the client already has the 10 previous events. + # However, if is a gap (i.e. limited is True), then we *do* need to + # record the reduced timeline. + if ( + limited + and prev_room_sync_config.timeline_limit + > room_sync_config.timeline_limit + ): + mutable_per_connection_state.previous_room_configs[room_id] = ( + RoomSyncConfig( + timeline_limit=len(timeline_events), + required_state_map=room_sync_config.required_state_map, + ) + ) + + # TODO: Record changes in required_state. + + else: + mutable_per_connection_state.previous_room_configs[room_id] = ( + room_sync_config + ) + set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) return SlidingSyncResult.RoomResult( diff --git a/tests/rest/client/sliding_sync/test_rooms_timeline.py b/tests/rest/client/sliding_sync/test_rooms_timeline.py index 2e9586ca733..7be3b8af912 100644 --- a/tests/rest/client/sliding_sync/test_rooms_timeline.py +++ b/tests/rest/client/sliding_sync/test_rooms_timeline.py @@ -17,6 +17,7 @@ from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin +from synapse.api.constants import EventTypes from synapse.rest.client import login, room, sync from synapse.server import HomeServer from synapse.types import StreamToken, StrSequence @@ -573,3 +574,126 @@ def test_rooms_ban_incremental_sync2(self) -> None: # Nothing to see for this banned user in the room in the token range self.assertIsNone(response_body["rooms"].get(room_id1)) + + def test_increasing_timeline_range_sends_more_messages(self) -> None: + """ + Test that increasing the timeline limit via room subscriptions sends the + room down with more messages in a limited sync. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [[EventTypes.Create, ""]], + "timeline_limit": 1, + } + } + } + + message_events = [] + for _ in range(10): + resp = self.helper.send(room_id1, "msg", tok=user1_tok) + message_events.append(resp["event_id"]) + + # Make the first Sliding Sync request + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + room_response = response_body["rooms"][room_id1] + + self.assertEqual(room_response["initial"], True) + self.assertEqual(room_response["limited"], True) + + # We only expect the last message at first + self.assertEqual( + [event["event_id"] for event in room_response["timeline"]], + message_events[-1:], + room_response["timeline"], + ) + + # We also expect to get the create event state. + self.assertEqual( + [event["type"] for event in room_response["required_state"]], + [EventTypes.Create], + ) + + # Now do another request with a room subscription with an increased timeline limit + sync_body["room_subscriptions"] = { + room_id1: { + "required_state": [], + "timeline_limit": 10, + } + } + + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + room_response = response_body["rooms"][room_id1] + + self.assertEqual(room_response["initial"], True) + self.assertEqual(room_response["limited"], True) + + # Now we expect all the messages + self.assertEqual( + [event["event_id"] for event in room_response["timeline"]], + message_events, + room_response["timeline"], + ) + + # We don't expect to get the room create down, as nothing has changed. + self.assertNotIn("required_state", room_response) + + # Decreasing the timeline limit shouldn't resend any events + sync_body["room_subscriptions"] = { + room_id1: { + "required_state": [], + "timeline_limit": 5, + } + } + + event_response = self.helper.send(room_id1, "msg", tok=user1_tok) + latest_event_id = event_response["event_id"] + + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + room_response = response_body["rooms"][room_id1] + + self.assertNotIn("initial", room_response) + self.assertEqual(room_response["limited"], False) + + self.assertEqual( + [event["event_id"] for event in room_response["timeline"]], + [latest_event_id], + room_response["timeline"], + ) + + # Increasing the limit to what it was before also should not resend any + # events + sync_body["room_subscriptions"] = { + room_id1: { + "required_state": [], + "timeline_limit": 10, + } + } + + event_response = self.helper.send(room_id1, "msg", tok=user1_tok) + latest_event_id = event_response["event_id"] + + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + room_response = response_body["rooms"][room_id1] + + self.assertNotIn("initial", room_response) + self.assertEqual(room_response["limited"], False) + + self.assertEqual( + [event["event_id"] for event in room_response["timeline"]], + [latest_event_id], + room_response["timeline"], + ) From b23231e9e4121001363eb1982408f9a84824043d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Aug 2024 14:05:33 +0100 Subject: [PATCH 13/28] Newsfile --- changelog.d/17579.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17579.misc diff --git a/changelog.d/17579.misc b/changelog.d/17579.misc new file mode 100644 index 00000000000..5eb3d5c7b43 --- /dev/null +++ b/changelog.d/17579.misc @@ -0,0 +1 @@ +Handle changes in `timeline_limit` in experimental sliding sync. From 33ec15b62ee43f2a4274b879652ae3474956abb2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Aug 2024 21:31:32 +0100 Subject: [PATCH 14/28] Restore comments --- synapse/handlers/sliding_sync.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index bf85eb6485c..f303a38a203 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -3469,8 +3469,8 @@ async def get_per_connection_state( connection_position = from_token.connection_position if connection_position == 0: - # The '0' values is a special value to indicate there is no - # per-connection state. + # Initial sync (request without a `from_token`) starts at `0` so + # there is no existing per-connection state return PerConnectionState() conn_key = self._get_connection_key(sync_config) @@ -3489,6 +3489,11 @@ async def record_new_state( from_token: Optional[SlidingSyncStreamToken], new_connection_state: MutablePerConnectionState, ) -> int: + """Record updated per-connection state, returning the connection + position associated with the new state. + If there are no changes to the state this may return the same token as + the existing per-connection state. + """ prev_connection_token = 0 if from_token is not None: prev_connection_token = from_token.connection_position @@ -3504,6 +3509,8 @@ async def record_new_state( new_store_token = prev_connection_token + 1 sync_statuses.pop(new_store_token, None) + # We copy the `MutablePerConnectionState` so that the inner `ChainMap`s + # don't grow forever. sync_statuses[new_store_token] = new_connection_state.copy() return new_store_token From 768d150b049b20f27403e8246c5c4834ab8afa63 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Aug 2024 21:31:36 +0100 Subject: [PATCH 15/28] Add docstring --- synapse/handlers/sliding_sync.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index f303a38a203..34aee6dbfac 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -3362,6 +3362,8 @@ class PerConnectionState: Attributes: rooms: The status of each room for the events stream. receipts: The status of each room for the receipts stream. + previous_room_configs: Map from room_id to the `RoomSyncConfig` of all + rooms that we have previously sent down. """ rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap) From a63261d83a0699ce60f06a587cc55fe413d71e69 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Aug 2024 21:31:56 +0100 Subject: [PATCH 16/28] Restore comments --- synapse/handlers/sliding_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 34aee6dbfac..f1ecf298ab9 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -3441,7 +3441,7 @@ class SlidingSyncConnectionStore: to mapping of room ID to `HaveSentRoom`. """ - # `(user_id, conn_id)` -> `token` -> `PerConnectionState` + # `(user_id, conn_id)` -> `connection_position` -> `PerConnectionState` _connections: Dict[Tuple[str, str], Dict[int, PerConnectionState]] = attr.Factory( dict ) From 891ce47ab063b96dd12f993d12e61784f9859e9b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Aug 2024 23:17:18 +0100 Subject: [PATCH 17/28] Rename previous_room_configs --- synapse/handlers/sliding_sync.py | 42 ++++++++++++-------------------- 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index f1ecf298ab9..5f0936fa908 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -788,8 +788,8 @@ async def current_sync_for_user( # we haven't sent the room down, or we have but there are missing # updates). for room_id, room_config in relevant_room_map.items(): - prev_room_sync_config = ( - previous_connection_state.previous_room_configs.get(room_id) + prev_room_sync_config = previous_connection_state.room_configs.get( + room_id ) if prev_room_sync_config is not None: # Always include rooms whose timeline limit has increased. @@ -2041,9 +2041,7 @@ async def get_room_sync_data( log_kv({"sliding_sync.room_status": room_status}) - prev_room_sync_config = previous_connection_state.previous_room_configs.get( - room_id - ) + prev_room_sync_config = previous_connection_state.room_configs.get(room_id) if prev_room_sync_config is not None: # Check if the timeline limit has increased, if so ignore the # timeline bound and record the change. @@ -2052,18 +2050,14 @@ async def get_room_sync_data( < room_sync_config.timeline_limit ): ignore_timeline_bound = True - new_connection_state.previous_room_configs[room_id] = ( - room_sync_config - ) + new_connection_state.room_configs[room_id] = room_sync_config if ( room_status.status != HaveSentRoomFlag.LIVE and prev_room_sync_config.timeline_limit > room_sync_config.timeline_limit ): - new_connection_state.previous_room_configs[room_id] = ( - room_sync_config - ) + new_connection_state.room_configs[room_id] = room_sync_config # TODO: Record changes in required_state. @@ -2507,9 +2501,7 @@ async def get_room_sync_data( if new_bump_event_pos.stream > 0: bump_stamp = new_bump_event_pos.stream - prev_room_sync_config = previous_connection_state.previous_room_configs.get( - room_id - ) + prev_room_sync_config = previous_connection_state.room_configs.get(room_id) if ignore_timeline_bound: # FIXME: We signal the fact that we're sending down more events to # the client by setting `initial=true` *without* sending down all @@ -2517,7 +2509,7 @@ async def get_room_sync_data( # update the protocol to do something less silly. initial = True - new_connection_state.previous_room_configs[room_id] = RoomSyncConfig( + new_connection_state.room_configs[room_id] = RoomSyncConfig( timeline_limit=len(timeline_events), required_state_map=room_sync_config.required_state_map, ) @@ -2538,7 +2530,7 @@ async def get_room_sync_data( and prev_room_sync_config.timeline_limit > room_sync_config.timeline_limit ): - new_connection_state.previous_room_configs[room_id] = RoomSyncConfig( + new_connection_state.room_configs[room_id] = RoomSyncConfig( timeline_limit=len(timeline_events), required_state_map=room_sync_config.required_state_map, ) @@ -2546,7 +2538,7 @@ async def get_room_sync_data( # TODO: Record changes in required_state. else: - new_connection_state.previous_room_configs[room_id] = room_sync_config + new_connection_state.room_configs[room_id] = room_sync_config set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) @@ -3362,32 +3354,30 @@ class PerConnectionState: Attributes: rooms: The status of each room for the events stream. receipts: The status of each room for the receipts stream. - previous_room_configs: Map from room_id to the `RoomSyncConfig` of all + room_configs: Map from room_id to the `RoomSyncConfig` of all rooms that we have previously sent down. """ rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap) receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap) - previous_room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict) + room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict) def get_mutable(self) -> "MutablePerConnectionState": """Get a mutable copy of this state.""" - previous_room_configs = cast( - MutableMapping[str, RoomSyncConfig], self.previous_room_configs - ) + room_configs = cast(MutableMapping[str, RoomSyncConfig], self.room_configs) return MutablePerConnectionState( rooms=self.rooms.get_mutable(), receipts=self.receipts.get_mutable(), - previous_room_configs=ChainMap({}, previous_room_configs), + room_configs=ChainMap({}, room_configs), ) def copy(self) -> "PerConnectionState": return PerConnectionState( rooms=self.rooms.copy(), receipts=self.receipts.copy(), - previous_room_configs=dict(self.previous_room_configs), + room_configs=dict(self.room_configs), ) @@ -3398,7 +3388,7 @@ class MutablePerConnectionState(PerConnectionState): rooms: MutableRoomStatusMap[RoomStreamToken] receipts: MutableRoomStatusMap[MultiWriterStreamToken] - previous_room_configs: typing.ChainMap[str, RoomSyncConfig] + room_configs: typing.ChainMap[str, RoomSyncConfig] def has_updates(self) -> bool: return ( @@ -3409,7 +3399,7 @@ def has_updates(self) -> bool: def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]: """Get updates to the room sync config""" - return self.previous_room_configs.maps[0] + return self.room_configs.maps[0] @attr.s(auto_attribs=True) From a4ad443bbfdde36322934ca7fbeed7b2659caa6c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Aug 2024 23:22:14 +0100 Subject: [PATCH 18/28] Use test helpers --- .../sliding_sync/test_rooms_timeline.py | 46 +++++++++++-------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/tests/rest/client/sliding_sync/test_rooms_timeline.py b/tests/rest/client/sliding_sync/test_rooms_timeline.py index 7be3b8af912..01f7203fecd 100644 --- a/tests/rest/client/sliding_sync/test_rooms_timeline.py +++ b/tests/rest/client/sliding_sync/test_rooms_timeline.py @@ -609,16 +609,21 @@ def test_increasing_timeline_range_sends_more_messages(self) -> None: self.assertEqual(room_response["limited"], True) # We only expect the last message at first - self.assertEqual( - [event["event_id"] for event in room_response["timeline"]], - message_events[-1:], - room_response["timeline"], + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[event["event_id"] for event in room_response["timeline"]], + expected_event_ids=message_events[-1:], + message=str(room_response["timeline"]), ) # We also expect to get the create event state. - self.assertEqual( - [event["type"] for event in room_response["required_state"]], - [EventTypes.Create], + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + self._assertRequiredStateIncludes( + room_response["required_state"], + {state_map[(EventTypes.Create, "")]}, + exact=True, ) # Now do another request with a room subscription with an increased timeline limit @@ -638,10 +643,11 @@ def test_increasing_timeline_range_sends_more_messages(self) -> None: self.assertEqual(room_response["limited"], True) # Now we expect all the messages - self.assertEqual( - [event["event_id"] for event in room_response["timeline"]], - message_events, - room_response["timeline"], + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[event["event_id"] for event in room_response["timeline"]], + expected_event_ids=message_events, + message=str(room_response["timeline"]), ) # We don't expect to get the room create down, as nothing has changed. @@ -666,10 +672,11 @@ def test_increasing_timeline_range_sends_more_messages(self) -> None: self.assertNotIn("initial", room_response) self.assertEqual(room_response["limited"], False) - self.assertEqual( - [event["event_id"] for event in room_response["timeline"]], - [latest_event_id], - room_response["timeline"], + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[event["event_id"] for event in room_response["timeline"]], + expected_event_ids=[latest_event_id], + message=str(room_response["timeline"]), ) # Increasing the limit to what it was before also should not resend any @@ -692,8 +699,9 @@ def test_increasing_timeline_range_sends_more_messages(self) -> None: self.assertNotIn("initial", room_response) self.assertEqual(room_response["limited"], False) - self.assertEqual( - [event["event_id"] for event in room_response["timeline"]], - [latest_event_id], - room_response["timeline"], + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[event["event_id"] for event in room_response["timeline"]], + expected_event_ids=[latest_event_id], + message=str(room_response["timeline"]), ) From 0e8feedc8df5ade443253bbb54c6c7f34dbfb23c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Aug 2024 23:22:37 +0100 Subject: [PATCH 19/28] Remove spurious set_tag --- synapse/handlers/sliding_sync.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 5f0936fa908..1b24be07cd7 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -836,8 +836,6 @@ async def current_sync_for_user( @trace @tag_args async def handle_room(room_id: str) -> None: - set_tag("room_id", room_id) - room_sync_result = await self.get_room_sync_data( sync_config=sync_config, previous_connection_state=previous_connection_state, From 49c4645ab67a68617ef483dc998a349060f92885 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Aug 2024 23:29:26 +0100 Subject: [PATCH 20/28] Remove double insertion --- synapse/handlers/sliding_sync.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 1b24be07cd7..7fb183ba3f3 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2048,16 +2048,8 @@ async def get_room_sync_data( < room_sync_config.timeline_limit ): ignore_timeline_bound = True - new_connection_state.room_configs[room_id] = room_sync_config - if ( - room_status.status != HaveSentRoomFlag.LIVE - and prev_room_sync_config.timeline_limit - > room_sync_config.timeline_limit - ): - new_connection_state.room_configs[room_id] = room_sync_config - - # TODO: Record changes in required_state. + # TODO: Check for changes in `required_state`` log_kv( { From 299ab1b945f2ad6b42ad42d5f00b87d278b792fc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Aug 2024 23:30:00 +0100 Subject: [PATCH 21/28] Use timelime_limit not len(timeline) --- synapse/handlers/sliding_sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 7fb183ba3f3..63f8407225b 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2500,7 +2500,7 @@ async def get_room_sync_data( initial = True new_connection_state.room_configs[room_id] = RoomSyncConfig( - timeline_limit=len(timeline_events), + timeline_limit=room_sync_config.timeline_limit, required_state_map=room_sync_config.required_state_map, ) elif prev_room_sync_config is not None: @@ -2521,7 +2521,7 @@ async def get_room_sync_data( > room_sync_config.timeline_limit ): new_connection_state.room_configs[room_id] = RoomSyncConfig( - timeline_limit=len(timeline_events), + timeline_limit=room_sync_config.timeline_limit, required_state_map=room_sync_config.required_state_map, ) From ba4e63b948eb99e1b987c8575b9432e3b8f5c25a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Aug 2024 23:34:43 +0100 Subject: [PATCH 22/28] Add comment explaining the odd behaviour --- synapse/handlers/sliding_sync.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 63f8407225b..2e4cf16ea87 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2012,11 +2012,15 @@ async def get_room_sync_data( # - For an incremental sync where we haven't sent it down this # connection before # - # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 + # Relevant spec issue: + # https://github.com/matrix-org/matrix-spec/issues/1917 # - # We also need to check if the timeline limit has increased, if so we ignore - # the from bound for the timeline to send down a larger chunk of - # history. + # XXX: Odd behavior - We also check if the `timeline_limit` has + # increased, if so we ignore the from bound for the timeline to send + # down a larger chunk of history with `initial: True`. This is matches + # the behavior of the Sliding Sync proxy and is what e.g. ElementX + # currently expects. In future this behavior is almost certianly going + # to change # # TODO: Also handle changes to `required_state` from_bound = None @@ -2042,7 +2046,8 @@ async def get_room_sync_data( prev_room_sync_config = previous_connection_state.room_configs.get(room_id) if prev_room_sync_config is not None: # Check if the timeline limit has increased, if so ignore the - # timeline bound and record the change. + # timeline bound and record the change (see "XXX: Odd behavior" + # above). if ( prev_room_sync_config.timeline_limit < room_sync_config.timeline_limit @@ -2496,7 +2501,8 @@ async def get_room_sync_data( # FIXME: We signal the fact that we're sending down more events to # the client by setting `initial=true` *without* sending down all # the state/metadata again, which is what the proxy does. We should - # update the protocol to do something less silly. + # update the protocol to do something less silly (see "XXX: Odd + # behavior" above). initial = True new_connection_state.room_configs[room_id] = RoomSyncConfig( From 2bba63ef2157d53bf8ccf42808b696f62106d72d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Aug 2024 00:15:44 +0100 Subject: [PATCH 23/28] Replace initial=true with unstable_expanded_timeline=true --- synapse/handlers/sliding_sync.py | 19 ++++++++++--------- synapse/rest/client/sync.py | 5 +++++ synapse/types/handlers/__init__.py | 4 ++++ .../sliding_sync/test_rooms_timeline.py | 6 +++++- 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 2e4cf16ea87..e832c25f65f 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2017,10 +2017,11 @@ async def get_room_sync_data( # # XXX: Odd behavior - We also check if the `timeline_limit` has # increased, if so we ignore the from bound for the timeline to send - # down a larger chunk of history with `initial: True`. This is matches - # the behavior of the Sliding Sync proxy and is what e.g. ElementX - # currently expects. In future this behavior is almost certianly going - # to change + # down a larger chunk of history and set `unstable_expanded_timeline` to + # true. This is a bit different to the behavior of the Sliding Sync + # proxy (which sets initial=true, but then doesn't send down the full + # state again), but existing apps, e.g. ElementX, just need `limited` + # set. In future this behavior is almost certainly going to change. # # TODO: Also handle changes to `required_state` from_bound = None @@ -2496,14 +2497,13 @@ async def get_room_sync_data( if new_bump_event_pos.stream > 0: bump_stamp = new_bump_event_pos.stream + unstable_expanded_timeline = False prev_room_sync_config = previous_connection_state.room_configs.get(room_id) if ignore_timeline_bound: # FIXME: We signal the fact that we're sending down more events to - # the client by setting `initial=true` *without* sending down all - # the state/metadata again, which is what the proxy does. We should - # update the protocol to do something less silly (see "XXX: Odd - # behavior" above). - initial = True + # the client by setting `unstable_expanded_timeline` to true (see + # "XXX: Odd behavior" above). + unstable_expanded_timeline = True new_connection_state.room_configs[room_id] = RoomSyncConfig( timeline_limit=room_sync_config.timeline_limit, @@ -2550,6 +2550,7 @@ async def get_room_sync_data( stripped_state=stripped_state, prev_batch=prev_batch_token, limited=limited, + unstable_expanded_timeline=unstable_expanded_timeline, num_live=num_live, bump_stamp=bump_stamp, joined_count=room_membership_summary.get( diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 8c5db2a513f..21b90b06746 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -1044,6 +1044,11 @@ async def encode_rooms( if room_result.initial: serialized_rooms[room_id]["initial"] = room_result.initial + if room_result.unstable_expanded_timeline: + serialized_rooms[room_id][ + "unstable_expanded_timeline" + ] = room_result.unstable_expanded_timeline + # This will be omitted for invite/knock rooms with `stripped_state` if ( room_result.required_state is not None diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 363f060bef9..580342d98ad 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -171,6 +171,9 @@ class RoomResult: their local state. When there is an update, servers MUST omit this flag entirely and NOT send "initial":false as this is wasteful on bandwidth. The absence of this flag means 'false'. + unstable_expanded_timeline: Flag which is set if we're returning more historic + events due to the timeline limit having increased. See "XXX: Odd behavior" + comment ing `synapse.handlers.sliding_sync`. required_state: The current state of the room timeline: Latest events in the room. The last event is the most recent. bundled_aggregations: A mapping of event ID to the bundled aggregations for @@ -219,6 +222,7 @@ class StrippedHero: heroes: Optional[List[StrippedHero]] is_dm: bool initial: bool + unstable_expanded_timeline: bool # Should be empty for invite/knock rooms with `stripped_state` required_state: List[EventBase] # Should be empty for invite/knock rooms with `stripped_state` diff --git a/tests/rest/client/sliding_sync/test_rooms_timeline.py b/tests/rest/client/sliding_sync/test_rooms_timeline.py index 01f7203fecd..eeac0d6aa99 100644 --- a/tests/rest/client/sliding_sync/test_rooms_timeline.py +++ b/tests/rest/client/sliding_sync/test_rooms_timeline.py @@ -606,6 +606,7 @@ def test_increasing_timeline_range_sends_more_messages(self) -> None: room_response = response_body["rooms"][room_id1] self.assertEqual(room_response["initial"], True) + self.assertNotIn("unstable_expanded_timeline", room_response) self.assertEqual(room_response["limited"], True) # We only expect the last message at first @@ -639,7 +640,8 @@ def test_increasing_timeline_range_sends_more_messages(self) -> None: ) room_response = response_body["rooms"][room_id1] - self.assertEqual(room_response["initial"], True) + self.assertNotIn("initial", room_response) + self.assertEqual(room_response["unstable_expanded_timeline"], True) self.assertEqual(room_response["limited"], True) # Now we expect all the messages @@ -670,6 +672,7 @@ def test_increasing_timeline_range_sends_more_messages(self) -> None: room_response = response_body["rooms"][room_id1] self.assertNotIn("initial", room_response) + self.assertNotIn("unstable_expanded_timeline", room_response) self.assertEqual(room_response["limited"], False) self._assertTimelineEqual( @@ -697,6 +700,7 @@ def test_increasing_timeline_range_sends_more_messages(self) -> None: room_response = response_body["rooms"][room_id1] self.assertNotIn("initial", room_response) + self.assertNotIn("unstable_expanded_timeline", room_response) self.assertEqual(room_response["limited"], False) self._assertTimelineEqual( From 52f4253c50d24de3e321305622a7b8ff7dce2177 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Aug 2024 00:40:08 +0100 Subject: [PATCH 24/28] Improve comment --- synapse/handlers/sliding_sync.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index e832c25f65f..f00bb6d171d 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2510,10 +2510,14 @@ async def get_room_sync_data( required_state_map=room_sync_config.required_state_map, ) elif prev_room_sync_config is not None: - # If the result isn't limited then we don't need to record that the - # timeline_limit has been reduced, as the *effective* timeline limit - # (i.e. the amount of timeline we have previously sent) is at least - # the previous timeline limit. + # If the result is limited then we need to record that the timeline + # limit has been reduced, as if the client later requests more + # timeline then we have more data to send. + # + # Otherwise we don't need to record that the timeline_limit has been + # reduced, as the *effective* timeline limit (i.e. the amount of + # timeline we have previously sent) is at least the previous + # timeline limit. # # This is to handle the case where the timeline limit e.g. goes from # 10 to 5 to 10 again (without any timeline gaps), where there's no From 09538c2afed71c87a3026d7dcccd66714476594a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Aug 2024 10:05:09 +0100 Subject: [PATCH 25/28] Update synapse/handlers/sliding_sync.py Co-authored-by: Eric Eastwood --- synapse/handlers/sliding_sync.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index f00bb6d171d..2002077f1fa 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -793,6 +793,7 @@ async def current_sync_for_user( ) if prev_room_sync_config is not None: # Always include rooms whose timeline limit has increased. + # (see the "XXX: Odd behavior" described below) if ( prev_room_sync_config.timeline_limit < room_config.timeline_limit From 733555baf15681ebd2199702d46af28823dce14a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Aug 2024 10:05:55 +0100 Subject: [PATCH 26/28] Update synapse/handlers/sliding_sync.py Co-authored-by: Eric Eastwood --- synapse/handlers/sliding_sync.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 2002077f1fa..139e6d45454 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2500,6 +2500,8 @@ async def get_room_sync_data( unstable_expanded_timeline = False prev_room_sync_config = previous_connection_state.room_configs.get(room_id) + # Record the `room_sync_config` if we're `ignore_timeline_bound` (which means + # that the `timeline_limit` has increased) if ignore_timeline_bound: # FIXME: We signal the fact that we're sending down more events to # the client by setting `unstable_expanded_timeline` to true (see From 76f882a954a5dd54d1eb03290e38e756236a2f60 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Aug 2024 10:06:22 +0100 Subject: [PATCH 27/28] Update synapse/handlers/sliding_sync.py Co-authored-by: Eric Eastwood --- synapse/handlers/sliding_sync.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 139e6d45454..d8ca76b5d2d 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2513,21 +2513,25 @@ async def get_room_sync_data( required_state_map=room_sync_config.required_state_map, ) elif prev_room_sync_config is not None: - # If the result is limited then we need to record that the timeline - # limit has been reduced, as if the client later requests more - # timeline then we have more data to send. + # If the result is `limited` then we need to record that the + # `timeline_limit` has been reduced, as when/if the client later requests + # more timeline then we have more data to send. # - # Otherwise we don't need to record that the timeline_limit has been - # reduced, as the *effective* timeline limit (i.e. the amount of - # timeline we have previously sent) is at least the previous - # timeline limit. + # Otherwise (when not `limited`) we don't need to record that the + # `timeline_limit` has been reduced, as the *effective* `timeline_limit` + # (i.e. the amount of timeline we have previously sent to the client) is at + # least the previous `timeline_limit`. # - # This is to handle the case where the timeline limit e.g. goes from - # 10 to 5 to 10 again (without any timeline gaps), where there's no - # point sending down extra events when the timeline limit is - # increased as the client already has the 10 previous events. - # However, if is a gap (i.e. limited is True), then we *do* need to - # record the reduced timeline. + # This is to handle the case where the `timeline_limit` e.g. goes from 10 to + # 5 to 10 again (without any timeline gaps), where there's no point sending + # down the initial historical chunk events when the `timeline_limit` is + # increased as the client already has the 10 previous events. However, if + # client has a gap in the timeline (i.e. `limited` is True), then we *do* + # need to record the reduced timeline. + # + # TODO: Handle timeline gaps (`get_timeline_gaps()`) - This is separate from + # the gaps we might see on the client because a response was `limited` we're + # talking about above. if ( limited and prev_room_sync_config.timeline_limit From bcaf4e6a18b07012a47583ff6c6aac408406375d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Aug 2024 10:06:53 +0100 Subject: [PATCH 28/28] Update synapse/handlers/sliding_sync.py Co-authored-by: Eric Eastwood --- synapse/handlers/sliding_sync.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index d8ca76b5d2d..1d4a16f853f 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2016,13 +2016,19 @@ async def get_room_sync_data( # Relevant spec issue: # https://github.com/matrix-org/matrix-spec/issues/1917 # - # XXX: Odd behavior - We also check if the `timeline_limit` has - # increased, if so we ignore the from bound for the timeline to send - # down a larger chunk of history and set `unstable_expanded_timeline` to - # true. This is a bit different to the behavior of the Sliding Sync - # proxy (which sets initial=true, but then doesn't send down the full - # state again), but existing apps, e.g. ElementX, just need `limited` - # set. In future this behavior is almost certainly going to change. + # XXX: Odd behavior - We also check if the `timeline_limit` has increased, if so + # we ignore the from bound for the timeline to send down a larger chunk of + # history and set `unstable_expanded_timeline` to true. This is only being added + # to match the behavior of the Sliding Sync proxy as we expect the ElementX + # client to feel a certain way and be able to trickle in a full page of timeline + # messages to fill up the screen. This is a bit different to the behavior of the + # Sliding Sync proxy (which sets initial=true, but then doesn't send down the + # full state again), but existing apps, e.g. ElementX, just need `limited` set. + # We don't explicitly set `limited` but this will be the case for any room that + # has more history than we're trying to pull out. Using + # `unstable_expanded_timeline` allows us to avoid contaminating what `initial` + # or `limited` mean for clients that interpret them correctly. In future this + # behavior is almost certainly going to change. # # TODO: Also handle changes to `required_state` from_bound = None