diff --git a/changelog.d/17342.feature b/changelog.d/17342.feature new file mode 100644 index 00000000000..b2671ea14a3 --- /dev/null +++ b/changelog.d/17342.feature @@ -0,0 +1 @@ +Return "required state" in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 0cebeea5922..a1ddac903ea 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -18,7 +18,7 @@ # # import logging -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Any, Dict, Final, List, Optional, Set, Tuple import attr from immutabledict import immutabledict @@ -39,6 +39,7 @@ PersistedEventPosition, Requester, RoomStreamToken, + StateMap, StreamKeyType, StreamToken, UserID, @@ -90,14 +91,186 @@ class RoomSyncConfig: Attributes: timeline_limit: The maximum number of events to return in the timeline. - required_state: The set of state events requested for the room. The - values are close to `StateKey` but actually use a syntax where you can - provide `*` wildcard and `$LAZY` for lazy room members as the `state_key` part - of the tuple (type, state_key). + + required_state_map: Map from state event type to state_keys requested for the + room. The values are close to `StateKey` but actually use a syntax where you + can provide `*` wildcard and `$LAZY` for lazy-loading room members. """ timeline_limit: int - required_state: Set[Tuple[str, str]] + required_state_map: Dict[str, Set[str]] + + @classmethod + def from_room_config( + cls, + room_params: SlidingSyncConfig.CommonRoomParameters, + ) -> "RoomSyncConfig": + """ + Create a `RoomSyncConfig` from a `SlidingSyncList`/`RoomSubscription` config. + + Args: + room_params: `SlidingSyncConfig.SlidingSyncList` or `SlidingSyncConfig.RoomSubscription` + """ + required_state_map: Dict[str, Set[str]] = {} + for ( + state_type, + state_key, + ) in room_params.required_state: + # If we already have a wildcard for this specific `state_key`, we don't need + # to add it since the wildcard already covers it. + if state_key in required_state_map.get(StateValues.WILDCARD, set()): + continue + + # If we already have a wildcard `state_key` for this `state_type`, we don't need + # to add anything else + if StateValues.WILDCARD in required_state_map.get(state_type, set()): + continue + + # If we're getting wildcards for the `state_type` and `state_key`, that's + # all that matters so get rid of any other entries + if state_type == StateValues.WILDCARD and state_key == StateValues.WILDCARD: + required_state_map = {StateValues.WILDCARD: {StateValues.WILDCARD}} + # We can break, since we don't need to add anything else + break + + # If we're getting a wildcard for the `state_type`, get rid of any other + # entries with the same `state_key`, since the wildcard will cover it already. + elif state_type == StateValues.WILDCARD: + # Get rid of any entries that match the `state_key` + # + # Make a copy so we don't run into an error: `dictionary changed size + # during iteration`, when we remove items + for ( + existing_state_type, + existing_state_key_set, + ) in list(required_state_map.items()): + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for existing_state_key in existing_state_key_set.copy(): + if existing_state_key == state_key: + existing_state_key_set.remove(state_key) + + # If we've the left the `set()` empty, remove it from the map + if existing_state_key_set == set(): + required_state_map.pop(existing_state_type, None) + + # If we're getting a wildcard `state_key`, get rid of any other state_keys + # for this `state_type` since the wildcard will cover it already. + if state_key == StateValues.WILDCARD: + required_state_map[state_type] = {state_key} + # Otherwise, just add it to the set + else: + if required_state_map.get(state_type) is None: + required_state_map[state_type] = {state_key} + else: + required_state_map[state_type].add(state_key) + + return cls( + timeline_limit=room_params.timeline_limit, + required_state_map=required_state_map, + ) + + def deep_copy(self) -> "RoomSyncConfig": + required_state_map: Dict[str, Set[str]] = { + state_type: state_key_set.copy() + for state_type, state_key_set in self.required_state_map.items() + } + + return RoomSyncConfig( + timeline_limit=self.timeline_limit, + required_state_map=required_state_map, + ) + + def combine_room_sync_config( + self, other_room_sync_config: "RoomSyncConfig" + ) -> None: + """ + Combine this `RoomSyncConfig` with another `RoomSyncConfig` and take the + superset union of the two. + """ + # Take the highest timeline limit + if self.timeline_limit < other_room_sync_config.timeline_limit: + self.timeline_limit = other_room_sync_config.timeline_limit + + # Union the required state + for ( + state_type, + state_key_set, + ) in other_room_sync_config.required_state_map.items(): + # If we already have a wildcard for everything, we don't need to add + # anything else + if StateValues.WILDCARD in self.required_state_map.get( + StateValues.WILDCARD, set() + ): + break + + # If we already have a wildcard `state_key` for this `state_type`, we don't need + # to add anything else + if StateValues.WILDCARD in self.required_state_map.get(state_type, set()): + continue + + # If we're getting wildcards for the `state_type` and `state_key`, that's + # all that matters so get rid of any other entries + if ( + state_type == StateValues.WILDCARD + and StateValues.WILDCARD in state_key_set + ): + self.required_state_map = {state_type: {StateValues.WILDCARD}} + # We can break, since we don't need to add anything else + break + + for state_key in state_key_set: + # If we already have a wildcard for this specific `state_key`, we don't need + # to add it since the wildcard already covers it. + if state_key in self.required_state_map.get( + StateValues.WILDCARD, set() + ): + continue + + # If we're getting a wildcard for the `state_type`, get rid of any other + # entries with the same `state_key`, since the wildcard will cover it already. + if state_type == StateValues.WILDCARD: + # Get rid of any entries that match the `state_key` + # + # Make a copy so we don't run into an error: `dictionary changed size + # during iteration`, when we remove items + for existing_state_type, existing_state_key_set in list( + self.required_state_map.items() + ): + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for existing_state_key in existing_state_key_set.copy(): + if existing_state_key == state_key: + existing_state_key_set.remove(state_key) + + # If we've the left the `set()` empty, remove it from the map + if existing_state_key_set == set(): + self.required_state_map.pop(existing_state_type, None) + + # If we're getting a wildcard `state_key`, get rid of any other state_keys + # for this `state_type` since the wildcard will cover it already. + if state_key == StateValues.WILDCARD: + self.required_state_map[state_type] = {state_key} + break + # Otherwise, just add it to the set + else: + if self.required_state_map.get(state_type) is None: + self.required_state_map[state_type] = {state_key} + else: + self.required_state_map[state_type].add(state_key) + + +class StateValues: + """ + Understood values of the (type, state_key) tuple in `required_state`. + """ + + # Include all state events of the given type + WILDCARD: Final = "*" + # Lazy-load room membership events (include room membership events for any event + # `sender` in the timeline). We only give special meaning to this value when it's a + # `state_key`. + LAZY: Final = "$LAZY" @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -242,6 +415,8 @@ async def current_sync_for_user( # Assemble sliding window lists lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} + # Keep track of the rooms that we're going to display and need to fetch more + # info about relevant_room_map: Dict[str, RoomSyncConfig] = {} if sync_config.lists: # Get all of the room IDs that the user should be able to see in the sync @@ -260,49 +435,76 @@ async def current_sync_for_user( sync_config.user, sync_room_map, list_config.filters, to_token ) + # Sort the list sorted_room_info = await self.sort_rooms( filtered_sync_room_map, to_token ) + # Find which rooms are partially stated and may need to be filtered out + # depending on the `required_state` requested (see below). + partial_state_room_map = await self.store.is_partial_state_room_batched( + filtered_sync_room_map.keys() + ) + + # Since creating the `RoomSyncConfig` takes some work, let's just do it + # once and make a copy whenever we need it. + room_sync_config = RoomSyncConfig.from_room_config(list_config) + membership_state_keys = room_sync_config.required_state_map.get( + EventTypes.Member + ) + lazy_loading = ( + membership_state_keys is not None + and len(membership_state_keys) == 1 + and StateValues.LAZY in membership_state_keys + ) + ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] if list_config.ranges: for range in list_config.ranges: - sliced_room_ids = [ - room_id - # Both sides of range are inclusive - for room_id, _ in sorted_room_info[range[0] : range[1] + 1] - ] + room_ids_in_list: List[str] = [] + + # We're going to loop through the sorted list of rooms starting + # at the range start index and keep adding rooms until we fill + # up the range or run out of rooms. + # + # Both sides of range are inclusive so we `+ 1` + max_num_rooms = range[1] - range[0] + 1 + for room_id, _ in sorted_room_info[range[0] :]: + if len(room_ids_in_list) >= max_num_rooms: + break + + # Exclude partially-stated rooms unless the `required_state` + # only has `["m.room.member", "$LAZY"]` for membership + # (lazy-loading room members). + if partial_state_room_map.get(room_id) and not lazy_loading: + continue + + # Take the superset of the `RoomSyncConfig` for each room. + # + # Update our `relevant_room_map` with the room we're going + # to display and need to fetch more info about. + existing_room_sync_config = relevant_room_map.get(room_id) + if existing_room_sync_config is not None: + existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) + else: + # Make a copy so if we modify it later, it doesn't + # affect all references. + relevant_room_map[room_id] = ( + room_sync_config.deep_copy() + ) + + room_ids_in_list.append(room_id) ops.append( SlidingSyncResult.SlidingWindowList.Operation( op=OperationType.SYNC, range=range, - room_ids=sliced_room_ids, + room_ids=room_ids_in_list, ) ) - # Take the superset of the `RoomSyncConfig` for each room - for room_id in sliced_room_ids: - if relevant_room_map.get(room_id) is not None: - # Take the highest timeline limit - if ( - relevant_room_map[room_id].timeline_limit - < list_config.timeline_limit - ): - relevant_room_map[room_id].timeline_limit = ( - list_config.timeline_limit - ) - - # Union the required state - relevant_room_map[room_id].required_state.update( - list_config.required_state - ) - else: - relevant_room_map[room_id] = RoomSyncConfig( - timeline_limit=list_config.timeline_limit, - required_state=set(list_config.required_state), - ) - lists[list_key] = SlidingSyncResult.SlidingWindowList( count=len(sorted_room_info), ops=ops, @@ -651,9 +853,6 @@ async def filter_rooms( user_id = user.to_string() # TODO: Apply filters - # - # TODO: Exclude partially stated rooms unless the `required_state` has - # `["m.room.member", "$LAZY"]` filtered_room_id_set = set(sync_room_map.keys()) @@ -694,16 +893,18 @@ async def filter_rooms( if filters.is_encrypted is not None: # Make a copy so we don't run into an error: `Set changed size during # iteration`, when we filter out and remove items - for room_id in list(filtered_room_id_set): + for room_id in filtered_room_id_set.copy(): state_at_to_token = await self.storage_controllers.state.get_state_at( room_id, to_token, state_filter=StateFilter.from_types( [(EventTypes.RoomEncryption, "")] ), - # Partially stated rooms should have all state events except for the - # membership events so we don't need to wait. Plus we don't want to - # block the whole sync waiting for this one room. + # Partially-stated rooms should have all state events except for the + # membership events so we don't need to wait because we only care + # about retrieving the `EventTypes.RoomEncryption` state event here. + # Plus we don't want to block the whole sync waiting for this one + # room. await_full_state=False, ) is_encrypted = state_at_to_token.get((EventTypes.RoomEncryption, "")) @@ -719,7 +920,7 @@ async def filter_rooms( if filters.is_invite is not None: # Make a copy so we don't run into an error: `Set changed size during # iteration`, when we filter out and remove items - for room_id in list(filtered_room_id_set): + for room_id in filtered_room_id_set.copy(): room_for_user = sync_room_map[room_id] # If we're looking for invite rooms, filter out rooms that the user is # not invited to and vice versa @@ -737,7 +938,7 @@ async def filter_rooms( if filters.room_types is not None or filters.not_room_types is not None: # Make a copy so we don't run into an error: `Set changed size during # iteration`, when we filter out and remove items - for room_id in list(filtered_room_id_set): + for room_id in filtered_room_id_set.copy(): create_event = await self.store.get_create_event_for_room(room_id) room_type = create_event.content.get(EventContentFields.ROOM_TYPE) if ( @@ -843,7 +1044,7 @@ async def get_room_sync_data( # Assemble the list of timeline events # - # It would be nice to make the `rooms` response more uniform regardless of + # FIXME: It would be nice to make the `rooms` response more uniform regardless of # membership. Currently, we have to make all of these optional because # `invite`/`knock` rooms only have `stripped_state`. See # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 @@ -1010,6 +1211,136 @@ async def get_room_sync_data( # state reset happened. Perhaps we should indicate this by setting `initial: # True` and empty `required_state`. + # TODO: Since we can't determine whether we've already sent a room down this + # Sliding Sync connection before (we plan to add this optimization in the + # future), we're always returning the requested room state instead of + # updates. + initial = True + + # Fetch the required state for the room + # + # No `required_state` for invite/knock rooms (just `stripped_state`) + # + # FIXME: It would be nice to make the `rooms` response more uniform regardless + # of membership. Currently, we have to make this optional because + # `invite`/`knock` rooms only have `stripped_state`. See + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 + room_state: Optional[StateMap[EventBase]] = None + if rooms_membership_for_user_at_to_token.membership not in ( + Membership.INVITE, + Membership.KNOCK, + ): + # Calculate the `StateFilter` based on the `required_state` for the room + state_filter: Optional[StateFilter] = StateFilter.none() + # If we have a double wildcard ("*", "*") in the `required_state`, we need + # to fetch all state for the room + # + # Note: MSC3575 describes different behavior to how we're handling things + # here but since it's not wrong to return more state than requested + # (`required_state` is just the minimum requested), it doesn't matter if we + # include more than client wanted. This complexity is also under scrutiny, + # see + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1185109050 + # + # > One unique exception is when you request all state events via ["*", "*"]. When used, + # > all state events are returned by default, and additional entries FILTER OUT the returned set + # > of state events. These additional entries cannot use '*' themselves. + # > For example, ["*", "*"], ["m.room.member", "@alice:example.com"] will _exclude_ every m.room.member + # > event _except_ for @alice:example.com, and include every other state event. + # > In addition, ["*", "*"], ["m.space.child", "*"] is an error, the m.space.child filter is not + # > required as it would have been returned anyway. + # > + # > -- MSC3575 (https://github.com/matrix-org/matrix-spec-proposals/pull/3575) + if StateValues.WILDCARD in room_sync_config.required_state_map.get( + StateValues.WILDCARD, set() + ): + state_filter = StateFilter.all() + # TODO: `StateFilter` currently doesn't support wildcard event types. We're + # currently working around this by returning all state to the client but it + # would be nice to fetch less from the database and return just what the + # client wanted. + elif ( + room_sync_config.required_state_map.get(StateValues.WILDCARD) + is not None + ): + state_filter = StateFilter.all() + else: + required_state_types: List[Tuple[str, Optional[str]]] = [] + for ( + state_type, + state_key_set, + ) in room_sync_config.required_state_map.items(): + for state_key in state_key_set: + if state_key == StateValues.WILDCARD: + # `None` is a wildcard in the `StateFilter` + required_state_types.append((state_type, None)) + # We need to fetch all relevant people when we're lazy-loading membership + elif ( + state_type == EventTypes.Member + and state_key == StateValues.LAZY + ): + # Everyone in the timeline is relevant + timeline_membership: Set[str] = set() + if timeline_events is not None: + for timeline_event in timeline_events: + timeline_membership.add(timeline_event.sender) + + for user_id in timeline_membership: + required_state_types.append( + (EventTypes.Member, user_id) + ) + + # FIXME: We probably also care about invite, ban, kick, targets, etc + # but the spec only mentions "senders". + else: + required_state_types.append((state_type, state_key)) + + state_filter = StateFilter.from_types(required_state_types) + + # We can skip fetching state if we don't need any + if state_filter != StateFilter.none(): + # We can return all of the state that was requested if we're doing an + # initial sync + if initial: + # People shouldn't see past their leave/ban event + if rooms_membership_for_user_at_to_token.membership in ( + Membership.LEAVE, + Membership.BAN, + ): + room_state = await self.storage_controllers.state.get_state_at( + room_id, + stream_position=to_token.copy_and_replace( + StreamKeyType.ROOM, + rooms_membership_for_user_at_to_token.event_pos.to_room_stream_token(), + ), + state_filter=state_filter, + # Partially-stated rooms should have all state events except for + # the membership events and since we've already excluded + # partially-stated rooms unless `required_state` only has + # `["m.room.member", "$LAZY"]` for membership, we should be able + # to retrieve everything requested. Plus we don't want to block + # the whole sync waiting for this one room. + await_full_state=False, + ) + # Otherwise, we can get the latest current state in the room + else: + room_state = await self.storage_controllers.state.get_current_state( + room_id, + state_filter, + # Partially-stated rooms should have all state events except for + # the membership events and since we've already excluded + # partially-stated rooms unless `required_state` only has + # `["m.room.member", "$LAZY"]` for membership, we should be able + # to retrieve everything requested. Plus we don't want to block + # the whole sync waiting for this one room. + await_full_state=False, + ) + # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token` + else: + # TODO: Once we can figure out if we've sent a room down this connection before, + # we can return updates instead of the full required state. + raise NotImplementedError() + return SlidingSyncResult.RoomResult( # TODO: Dummy value name=None, @@ -1017,20 +1348,16 @@ async def get_room_sync_data( avatar=None, # TODO: Dummy value heroes=None, - # TODO: Since we can't determine whether we've already sent a room down this - # Sliding Sync connection before (we plan to add this optimization in the - # future), we're always returning the requested room state instead of - # updates. - initial=True, # TODO: Dummy value - required_state=[], + is_dm=False, + initial=initial, + required_state=list(room_state.values()) if room_state else None, timeline_events=timeline_events, bundled_aggregations=bundled_aggregations, - # TODO: Dummy value - is_dm=False, stripped_state=stripped_state, prev_batch=prev_batch_token, limited=limited, + num_live=num_live, # TODO: Dummy values joined_count=0, invited_count=0, @@ -1039,5 +1366,4 @@ async def get_room_sync_data( # (encrypted rooms). notification_count=0, highlight_count=0, - num_live=num_live, ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e2563428d2e..de227faec3f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1352,7 +1352,7 @@ async def _compute_state_delta_for_full_sync( await_full_state = True lazy_load_members = False - state_at_timeline_end = await self._state_storage_controller.get_state_at( + state_at_timeline_end = await self._state_storage_controller.get_state_ids_at( room_id, stream_position=end_token, state_filter=state_filter, @@ -1480,11 +1480,13 @@ async def _compute_state_delta_for_incremental_sync( else: # We can get here if the user has ignored the senders of all # the recent events. - state_at_timeline_start = await self._state_storage_controller.get_state_at( - room_id, - stream_position=end_token, - state_filter=state_filter, - await_full_state=await_full_state, + state_at_timeline_start = ( + await self._state_storage_controller.get_state_ids_at( + room_id, + stream_position=end_token, + state_filter=state_filter, + await_full_state=await_full_state, + ) ) if batch.limited: @@ -1502,14 +1504,14 @@ async def _compute_state_delta_for_incremental_sync( # about them). state_filter = StateFilter.all() - state_at_previous_sync = await self._state_storage_controller.get_state_at( + state_at_previous_sync = await self._state_storage_controller.get_state_ids_at( room_id, stream_position=since_token, state_filter=state_filter, await_full_state=await_full_state, ) - state_at_timeline_end = await self._state_storage_controller.get_state_at( + state_at_timeline_end = await self._state_storage_controller.get_state_ids_at( room_id, stream_position=end_token, state_filter=state_filter, @@ -2508,7 +2510,7 @@ async def _get_room_changes_for_incremental_sync( continue if room_id in sync_result_builder.joined_room_ids or has_join: - old_state_ids = await self._state_storage_controller.get_state_at( + old_state_ids = await self._state_storage_controller.get_state_ids_at( room_id, since_token, state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]), @@ -2539,7 +2541,7 @@ async def _get_room_changes_for_incremental_sync( else: if not old_state_ids: old_state_ids = ( - await self._state_storage_controller.get_state_at( + await self._state_storage_controller.get_state_ids_at( room_id, since_token, state_filter=StateFilter.from_types( diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 1d955a2e893..e52e771538b 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -996,7 +996,7 @@ async def encode_rooms( if room_result.initial: serialized_rooms[room_id]["initial"] = room_result.initial - # This will omitted for invite/knock rooms with `stripped_state` + # This will be omitted for invite/knock rooms with `stripped_state` if room_result.required_state is not None: serialized_required_state = ( await self.event_serializer.serialize_events( @@ -1007,7 +1007,7 @@ async def encode_rooms( ) serialized_rooms[room_id]["required_state"] = serialized_required_state - # This will omitted for invite/knock rooms with `stripped_state` + # This will be omitted for invite/knock rooms with `stripped_state` if room_result.timeline_events is not None: serialized_timeline = await self.event_serializer.serialize_events( room_result.timeline_events, @@ -1017,17 +1017,17 @@ async def encode_rooms( ) serialized_rooms[room_id]["timeline"] = serialized_timeline - # This will omitted for invite/knock rooms with `stripped_state` + # This will be omitted for invite/knock rooms with `stripped_state` if room_result.limited is not None: serialized_rooms[room_id]["limited"] = room_result.limited - # This will omitted for invite/knock rooms with `stripped_state` + # This will be omitted for invite/knock rooms with `stripped_state` if room_result.prev_batch is not None: serialized_rooms[room_id]["prev_batch"] = ( await room_result.prev_batch.to_string(self.store) ) - # This will omitted for invite/knock rooms with `stripped_state` + # This will be omitted for invite/knock rooms with `stripped_state` if room_result.num_live is not None: serialized_rooms[room_id]["num_live"] = room_result.num_live diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index f3630fbbf1a..b50eb8868ec 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -409,7 +409,7 @@ async def get_state_after_event( return state_ids - async def get_state_at( + async def get_state_ids_at( self, room_id: str, stream_position: StreamToken, @@ -460,6 +460,30 @@ async def get_state_at( ) return state + @trace + @tag_args + async def get_state_at( + self, + room_id: str, + stream_position: StreamToken, + state_filter: Optional[StateFilter] = None, + await_full_state: bool = True, + ) -> StateMap[EventBase]: + """Same as `get_state_ids_at` but also fetches the events""" + state_map_ids = await self.get_state_ids_at( + room_id, stream_position, state_filter, await_full_state + ) + + event_map = await self.stores.main.get_events(list(state_map_ids.values())) + + state_map = {} + for key, event_id in state_map_ids.items(): + event = event_map.get(event_id) + if event: + state_map[key] = event + + return state_map + @trace @tag_args async def get_state_for_groups( diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 3cd3c8fb0fa..3bd3268e595 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -156,6 +156,8 @@ class RoomResult: avatar: Room avatar heroes: List of stripped membership events (containing `user_id` and optionally `avatar_url` and `displayname`) for the users used to calculate the room name. + is_dm: Flag to specify whether the room is a direct-message room (most likely + between two people). initial: Flag which is set when this is the first time the server is sending this data on this connection. Clients can use this flag to replace or update their local state. When there is an update, servers MUST omit this flag @@ -167,8 +169,6 @@ class RoomResult: the timeline events above. This allows clients to show accurate reaction counts (or edits, threads), even if some of the reaction events were skipped over in a gappy sync. - is_dm: Flag to specify whether the room is a direct-message room (most likely - between two people). stripped_state: Stripped state events (for rooms where the usre is invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2, absent on joined/left rooms @@ -176,6 +176,13 @@ class RoomResult: `/rooms//messages` API to retrieve earlier messages. limited: True if their are more events than fit between the given position and now. Sync again to get more. + num_live: The number of timeline events which have just occurred and are not historical. + The last N events are 'live' and should be treated as such. This is mostly + useful to determine whether a given @mention event should make a noise or not. + Clients cannot rely solely on the absence of `initial: true` to determine live + events because if a room not in the sliding window bumps into the window because + of an @mention it will have `initial: true` yet contain a single live event + (with potentially other old events in the timeline). joined_count: The number of users with membership of join, including the client's own user ID. (same as sync `v2 m.joined_member_count`) invited_count: The number of users with membership of invite. (same as sync v2 @@ -184,37 +191,30 @@ class RoomResult: as sync v2) highlight_count: The number of unread notifications for this room with the highlight flag set. (same as sync v2) - num_live: The number of timeline events which have just occurred and are not historical. - The last N events are 'live' and should be treated as such. This is mostly - useful to determine whether a given @mention event should make a noise or not. - Clients cannot rely solely on the absence of `initial: true` to determine live - events because if a room not in the sliding window bumps into the window because - of an @mention it will have `initial: true` yet contain a single live event - (with potentially other old events in the timeline). """ name: Optional[str] avatar: Optional[str] heroes: Optional[List[EventBase]] + is_dm: bool initial: bool # Only optional because it won't be included for invite/knock rooms with `stripped_state` required_state: Optional[List[EventBase]] # Only optional because it won't be included for invite/knock rooms with `stripped_state` timeline_events: Optional[List[EventBase]] bundled_aggregations: Optional[Dict[str, "BundledAggregations"]] - is_dm: bool # Optional because it's only relevant to invite/knock rooms stripped_state: Optional[List[JsonDict]] # Only optional because it won't be included for invite/knock rooms with `stripped_state` prev_batch: Optional[StreamToken] # Only optional because it won't be included for invite/knock rooms with `stripped_state` limited: Optional[bool] + # Only optional because it won't be included for invite/knock rooms with `stripped_state` + num_live: Optional[int] joined_count: int invited_count: int notification_count: int highlight_count: int - # Only optional because it won't be included for invite/knock rooms with `stripped_state` - num_live: Optional[int] @attr.s(slots=True, frozen=True, auto_attribs=True) class SlidingWindowList: diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 713a7987033..5f83b637c59 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -18,6 +18,8 @@ # # import logging +from copy import deepcopy +from typing import Optional from unittest.mock import patch from parameterized import parameterized @@ -33,20 +35,550 @@ RoomTypes, ) from synapse.api.room_versions import RoomVersions -from synapse.handlers.sliding_sync import SlidingSyncConfig +from synapse.handlers.sliding_sync import RoomSyncConfig, StateValues from synapse.rest import admin from synapse.rest.client import knock, login, room from synapse.server import HomeServer from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import JsonDict, UserID +from synapse.types.handlers import SlidingSyncConfig from synapse.util import Clock from tests.replication._base import BaseMultiWorkerStreamTestCase -from tests.unittest import HomeserverTestCase +from tests.unittest import HomeserverTestCase, TestCase logger = logging.getLogger(__name__) +class RoomSyncConfigTestCase(TestCase): + def _assert_room_config_equal( + self, + actual: RoomSyncConfig, + expected: RoomSyncConfig, + message_prefix: Optional[str] = None, + ) -> None: + self.assertEqual(actual.timeline_limit, expected.timeline_limit, message_prefix) + + # `self.assertEqual(...)` works fine to catch differences but the output is + # almost impossible to read because of the way it truncates the output and the + # order doesn't actually matter. + self.assertCountEqual( + actual.required_state_map, expected.required_state_map, message_prefix + ) + for event_type, expected_state_keys in expected.required_state_map.items(): + self.assertCountEqual( + actual.required_state_map[event_type], + expected_state_keys, + f"{message_prefix}: Mismatch for {event_type}", + ) + + @parameterized.expand( + [ + ( + "from_list_config", + """ + Test that we can convert a `SlidingSyncConfig.SlidingSyncList` to a + `RoomSyncConfig`. + """, + # Input + SlidingSyncConfig.SlidingSyncList( + timeline_limit=10, + required_state=[ + (EventTypes.Name, ""), + (EventTypes.Member, "@foo"), + (EventTypes.Member, "@bar"), + (EventTypes.Member, "@baz"), + (EventTypes.CanonicalAlias, ""), + ], + ), + # Expected + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + EventTypes.Name: {""}, + EventTypes.Member: { + "@foo", + "@bar", + "@baz", + }, + EventTypes.CanonicalAlias: {""}, + }, + ), + ), + ( + "from_room_subscription", + """ + Test that we can convert a `SlidingSyncConfig.RoomSubscription` to a + `RoomSyncConfig`. + """, + # Input + SlidingSyncConfig.RoomSubscription( + timeline_limit=10, + required_state=[ + (EventTypes.Name, ""), + (EventTypes.Member, "@foo"), + (EventTypes.Member, "@bar"), + (EventTypes.Member, "@baz"), + (EventTypes.CanonicalAlias, ""), + ], + ), + # Expected + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + EventTypes.Name: {""}, + EventTypes.Member: { + "@foo", + "@bar", + "@baz", + }, + EventTypes.CanonicalAlias: {""}, + }, + ), + ), + ( + "wildcard", + """ + Test that a wildcard (*) for both the `event_type` and `state_key` will override + all other values. + + Note: MSC3575 describes different behavior to how we're handling things here but + since it's not wrong to return more state than requested (`required_state` is + just the minimum requested), it doesn't matter if we include things that the + client wanted excluded. This complexity is also under scrutiny, see + https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1185109050 + + > One unique exception is when you request all state events via ["*", "*"]. When used, + > all state events are returned by default, and additional entries FILTER OUT the returned set + > of state events. These additional entries cannot use '*' themselves. + > For example, ["*", "*"], ["m.room.member", "@alice:example.com"] will _exclude_ every m.room.member + > event _except_ for @alice:example.com, and include every other state event. + > In addition, ["*", "*"], ["m.space.child", "*"] is an error, the m.space.child filter is not + > required as it would have been returned anyway. + > + > -- MSC3575 (https://github.com/matrix-org/matrix-spec-proposals/pull/3575) + """, + # Input + SlidingSyncConfig.SlidingSyncList( + timeline_limit=10, + required_state=[ + (EventTypes.Name, ""), + (StateValues.WILDCARD, StateValues.WILDCARD), + (EventTypes.Member, "@foo"), + (EventTypes.CanonicalAlias, ""), + ], + ), + # Expected + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + StateValues.WILDCARD: {StateValues.WILDCARD}, + }, + ), + ), + ( + "wildcard_type", + """ + Test that a wildcard (*) as a `event_type` will override all other values for the + same `state_key`. + """, + # Input + SlidingSyncConfig.SlidingSyncList( + timeline_limit=10, + required_state=[ + (EventTypes.Name, ""), + (StateValues.WILDCARD, ""), + (EventTypes.Member, "@foo"), + (EventTypes.CanonicalAlias, ""), + ], + ), + # Expected + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + StateValues.WILDCARD: {""}, + EventTypes.Member: {"@foo"}, + }, + ), + ), + ( + "multiple_wildcard_type", + """ + Test that multiple wildcard (*) as a `event_type` will override all other values + for the same `state_key`. + """, + # Input + SlidingSyncConfig.SlidingSyncList( + timeline_limit=10, + required_state=[ + (EventTypes.Name, ""), + (StateValues.WILDCARD, ""), + (EventTypes.Member, "@foo"), + (StateValues.WILDCARD, "@foo"), + ("org.matrix.personal_count", "@foo"), + (EventTypes.Member, "@bar"), + (EventTypes.CanonicalAlias, ""), + ], + ), + # Expected + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + StateValues.WILDCARD: { + "", + "@foo", + }, + EventTypes.Member: {"@bar"}, + }, + ), + ), + ( + "wildcard_state_key", + """ + Test that a wildcard (*) as a `state_key` will override all other values for the + same `event_type`. + """, + # Input + SlidingSyncConfig.SlidingSyncList( + timeline_limit=10, + required_state=[ + (EventTypes.Name, ""), + (EventTypes.Member, "@foo"), + (EventTypes.Member, StateValues.WILDCARD), + (EventTypes.Member, "@bar"), + (EventTypes.Member, StateValues.LAZY), + (EventTypes.Member, "@baz"), + (EventTypes.CanonicalAlias, ""), + ], + ), + # Expected + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + EventTypes.Name: {""}, + EventTypes.Member: { + StateValues.WILDCARD, + }, + EventTypes.CanonicalAlias: {""}, + }, + ), + ), + ( + "wildcard_merge", + """ + Test that a wildcard (*) entries for the `event_type` and another one for + `state_key` will play together. + """, + # Input + SlidingSyncConfig.SlidingSyncList( + timeline_limit=10, + required_state=[ + (EventTypes.Name, ""), + (StateValues.WILDCARD, ""), + (EventTypes.Member, "@foo"), + (EventTypes.Member, StateValues.WILDCARD), + (EventTypes.Member, "@bar"), + (EventTypes.CanonicalAlias, ""), + ], + ), + # Expected + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + StateValues.WILDCARD: {""}, + EventTypes.Member: {StateValues.WILDCARD}, + }, + ), + ), + ( + "wildcard_merge2", + """ + Test that an all wildcard ("*", "*") entry will override any other + values (including other wildcards). + """, + # Input + SlidingSyncConfig.SlidingSyncList( + timeline_limit=10, + required_state=[ + (EventTypes.Name, ""), + (StateValues.WILDCARD, ""), + (EventTypes.Member, StateValues.WILDCARD), + (EventTypes.Member, "@foo"), + # One of these should take precedence over everything else + (StateValues.WILDCARD, StateValues.WILDCARD), + (StateValues.WILDCARD, StateValues.WILDCARD), + (EventTypes.CanonicalAlias, ""), + ], + ), + # Expected + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + StateValues.WILDCARD: {StateValues.WILDCARD}, + }, + ), + ), + ( + "lazy_members", + """ + `$LAZY` room members should just be another additional key next to other + explicit keys. We will unroll the special `$LAZY` meaning later. + """, + # Input + SlidingSyncConfig.SlidingSyncList( + timeline_limit=10, + required_state=[ + (EventTypes.Name, ""), + (EventTypes.Member, "@foo"), + (EventTypes.Member, "@bar"), + (EventTypes.Member, StateValues.LAZY), + (EventTypes.Member, "@baz"), + (EventTypes.CanonicalAlias, ""), + ], + ), + # Expected + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + EventTypes.Name: {""}, + EventTypes.Member: { + "@foo", + "@bar", + StateValues.LAZY, + "@baz", + }, + EventTypes.CanonicalAlias: {""}, + }, + ), + ), + ] + ) + def test_from_room_config( + self, + _test_label: str, + _test_description: str, + room_params: SlidingSyncConfig.CommonRoomParameters, + expected_room_sync_config: RoomSyncConfig, + ) -> None: + """ + Test `RoomSyncConfig.from_room_config(room_params)` will result in the `expected_room_sync_config`. + """ + room_sync_config = RoomSyncConfig.from_room_config(room_params) + + self._assert_room_config_equal( + room_sync_config, + expected_room_sync_config, + ) + + @parameterized.expand( + [ + ( + "no_direct_overlap", + # A + RoomSyncConfig( + timeline_limit=9, + required_state_map={ + EventTypes.Name: {""}, + EventTypes.Member: { + "@foo", + "@bar", + }, + }, + ), + # B + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + EventTypes.Member: { + StateValues.LAZY, + "@baz", + }, + EventTypes.CanonicalAlias: {""}, + }, + ), + # Expected + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + EventTypes.Name: {""}, + EventTypes.Member: { + "@foo", + "@bar", + StateValues.LAZY, + "@baz", + }, + EventTypes.CanonicalAlias: {""}, + }, + ), + ), + ( + "wildcard_overlap", + # A + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + StateValues.WILDCARD: {StateValues.WILDCARD}, + }, + ), + # B + RoomSyncConfig( + timeline_limit=9, + required_state_map={ + EventTypes.Dummy: {StateValues.WILDCARD}, + StateValues.WILDCARD: {"@bar"}, + EventTypes.Member: {"@foo"}, + }, + ), + # Expected + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + StateValues.WILDCARD: {StateValues.WILDCARD}, + }, + ), + ), + ( + "state_type_wildcard_overlap", + # A + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + EventTypes.Dummy: {"dummy"}, + StateValues.WILDCARD: { + "", + "@foo", + }, + EventTypes.Member: {"@bar"}, + }, + ), + # B + RoomSyncConfig( + timeline_limit=9, + required_state_map={ + EventTypes.Dummy: {"dummy2"}, + StateValues.WILDCARD: { + "", + "@bar", + }, + EventTypes.Member: {"@foo"}, + }, + ), + # Expected + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + EventTypes.Dummy: { + "dummy", + "dummy2", + }, + StateValues.WILDCARD: { + "", + "@foo", + "@bar", + }, + }, + ), + ), + ( + "state_key_wildcard_overlap", + # A + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + EventTypes.Dummy: {"dummy"}, + EventTypes.Member: {StateValues.WILDCARD}, + "org.matrix.flowers": {StateValues.WILDCARD}, + }, + ), + # B + RoomSyncConfig( + timeline_limit=9, + required_state_map={ + EventTypes.Dummy: {StateValues.WILDCARD}, + EventTypes.Member: {StateValues.WILDCARD}, + "org.matrix.flowers": {"tulips"}, + }, + ), + # Expected + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + EventTypes.Dummy: {StateValues.WILDCARD}, + EventTypes.Member: {StateValues.WILDCARD}, + "org.matrix.flowers": {StateValues.WILDCARD}, + }, + ), + ), + ( + "state_type_and_state_key_wildcard_merge", + # A + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + EventTypes.Dummy: {"dummy"}, + StateValues.WILDCARD: { + "", + "@foo", + }, + EventTypes.Member: {"@bar"}, + }, + ), + # B + RoomSyncConfig( + timeline_limit=9, + required_state_map={ + EventTypes.Dummy: {"dummy2"}, + StateValues.WILDCARD: {""}, + EventTypes.Member: {StateValues.WILDCARD}, + }, + ), + # Expected + RoomSyncConfig( + timeline_limit=10, + required_state_map={ + EventTypes.Dummy: { + "dummy", + "dummy2", + }, + StateValues.WILDCARD: { + "", + "@foo", + }, + EventTypes.Member: {StateValues.WILDCARD}, + }, + ), + ), + ] + ) + def test_combine_room_sync_config( + self, + _test_label: str, + a: RoomSyncConfig, + b: RoomSyncConfig, + expected: RoomSyncConfig, + ) -> None: + """ + Combine A into B and B into A to make sure we get the same result. + """ + # Since we're mutating these in place, make a copy for each of our trials + room_sync_config_a = deepcopy(a) + room_sync_config_b = deepcopy(b) + + # Combine B into A + room_sync_config_a.combine_room_sync_config(room_sync_config_b) + + self._assert_room_config_equal(room_sync_config_a, expected, "B into A") + + # Since we're mutating these in place, make a copy for each of our trials + room_sync_config_a = deepcopy(a) + room_sync_config_b = deepcopy(b) + + # Combine A into B + room_sync_config_b.combine_room_sync_config(room_sync_config_a) + + self._assert_room_config_equal(room_sync_config_b, expected, "A into B") + + class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): """ Tests Sliding Sync handler `get_sync_room_ids_for_user()` to make sure it returns diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 966c622e145..cb2888409e6 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -20,7 +20,7 @@ # import json import logging -from typing import Dict, List +from typing import AbstractSet, Any, Dict, Iterable, List, Optional from parameterized import parameterized, parameterized_class @@ -32,9 +32,12 @@ EventContentFields, EventTypes, HistoryVisibility, + Membership, ReceiptTypes, RelationTypes, ) +from synapse.events import EventBase +from synapse.handlers.sliding_sync import StateValues from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync from synapse.server import HomeServer from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID @@ -45,6 +48,7 @@ KnockingStrippedStateEventHelperMixin, ) from tests.server import TimedOutException +from tests.test_utils.event_injection import mark_event_as_partial_state logger = logging.getLogger(__name__) @@ -1237,6 +1241,94 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: ) self.store = hs.get_datastores().main self.event_sources = hs.get_event_sources() + self.storage_controllers = hs.get_storage_controllers() + + def _assertRequiredStateIncludes( + self, + actual_required_state: Any, + expected_state_events: Iterable[EventBase], + exact: bool = False, + ) -> None: + """ + Wrapper around `_assertIncludes` to give slightly better looking diff error + messages that include some context "$event_id (type, state_key)". + + Args: + actual_required_state: The "required_state" of a room from a Sliding Sync + request response. + expected_state_events: The expected state events to be included in the + `actual_required_state`. + exact: Whether the actual state should be exactly equal to the expected + state (no extras). + """ + + assert isinstance(actual_required_state, list) + for event in actual_required_state: + assert isinstance(event, dict) + + self._assertIncludes( + { + f'{event["event_id"]} ("{event["type"]}", "{event["state_key"]}")' + for event in actual_required_state + }, + { + f'{event.event_id} ("{event.type}", "{event.state_key}")' + for event in expected_state_events + }, + exact=exact, + # Message to help understand the diff in context + message=str(actual_required_state), + ) + + def _assertIncludes( + self, + actual_items: AbstractSet[str], + expected_items: AbstractSet[str], + exact: bool = False, + message: Optional[str] = None, + ) -> None: + """ + Assert that all of the `expected_items` are included in the `actual_items`. + + This assert could also be called `assertContains`, `assertItemsInSet` + + Args: + actual_items: The container + expected_items: The items to check for in the container + exact: Whether the actual state should be exactly equal to the expected + state (no extras). + message: Optional message to include in the failure message. + """ + # Check that each set has the same items + if exact and actual_items == expected_items: + return + # Check for a superset + elif not exact and actual_items >= expected_items: + return + + expected_lines: List[str] = [] + for expected_item in expected_items: + is_expected_in_actual = expected_item in actual_items + expected_lines.append( + "{} {}".format(" " if is_expected_in_actual else "?", expected_item) + ) + + actual_lines: List[str] = [] + for actual_item in actual_items: + is_actual_in_expected = actual_item in expected_items + actual_lines.append( + "{} {}".format("+" if is_actual_in_expected else " ", actual_item) + ) + + newline = "\n" + expected_string = f"Expected items to be in actual ('?' = missing expected items):\n {{\n{newline.join(expected_lines)}\n }}" + actual_string = f"Actual ('+' = found expected items):\n {{\n{newline.join(actual_lines)}\n }}" + first_message = ( + "Items must match exactly" if exact else "Some expected items are missing." + ) + diff_message = f"{first_message}\n{expected_string}\n{actual_string}" + + self.fail(f"{diff_message}\n{message}") def _add_new_dm_to_global_account_data( self, source_user_id: str, target_user_id: str, target_room_id: str @@ -2091,6 +2183,11 @@ def test_rooms_invite_shared_history_initial_sync(self) -> None: channel.json_body["rooms"][room_id1].get("prev_batch"), channel.json_body["rooms"][room_id1], ) + # `required_state` is omitted for `invite` rooms with `stripped_state` + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("required_state"), + channel.json_body["rooms"][room_id1], + ) # We should have some `stripped_state` so the potential joiner can identify the # room (we don't care about the order). self.assertCountEqual( @@ -2200,6 +2297,11 @@ def test_rooms_invite_shared_history_incremental_sync(self) -> None: channel.json_body["rooms"][room_id1].get("prev_batch"), channel.json_body["rooms"][room_id1], ) + # `required_state` is omitted for `invite` rooms with `stripped_state` + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("required_state"), + channel.json_body["rooms"][room_id1], + ) # We should have some `stripped_state` so the potential joiner can identify the # room (we don't care about the order). self.assertCountEqual( @@ -2321,6 +2423,11 @@ def test_rooms_invite_world_readable_history_initial_sync(self) -> None: channel.json_body["rooms"][room_id1].get("prev_batch"), channel.json_body["rooms"][room_id1], ) + # `required_state` is omitted for `invite` rooms with `stripped_state` + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("required_state"), + channel.json_body["rooms"][room_id1], + ) # We should have some `stripped_state` so the potential joiner can identify the # room (we don't care about the order). self.assertCountEqual( @@ -2448,6 +2555,11 @@ def test_rooms_invite_world_readable_history_incremental_sync(self) -> None: channel.json_body["rooms"][room_id1].get("prev_batch"), channel.json_body["rooms"][room_id1], ) + # `required_state` is omitted for `invite` rooms with `stripped_state` + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("required_state"), + channel.json_body["rooms"][room_id1], + ) # We should have some `stripped_state` so the potential joiner can identify the # room (we don't care about the order). self.assertCountEqual( @@ -2681,3 +2793,602 @@ def test_rooms_ban_incremental_sync2(self) -> None: False, channel.json_body["rooms"][room_id1], ) + + def test_rooms_no_required_state(self) -> None: + """ + Empty `rooms.required_state` should not return any state events in the room + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + # Empty `required_state` + "required_state": [], + "timeline_limit": 0, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # No `required_state` in response + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("required_state"), + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_required_state_initial_sync(self) -> None: + """ + Test `rooms.required_state` returns requested state events in the room during an + initial sync. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Tombstone, ""], + ], + "timeline_limit": 0, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + self._assertRequiredStateIncludes( + channel.json_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Create, "")], + state_map[(EventTypes.RoomHistoryVisibility, "")], + }, + exact=True, + ) + + def test_rooms_required_state_incremental_sync(self) -> None: + """ + Test `rooms.required_state` returns requested state events in the room during an + incremental sync. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + after_room_token = self.event_sources.get_current_token() + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint + + f"?pos={self.get_success(after_room_token.to_string(self.store))}", + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Tombstone, ""], + ], + "timeline_limit": 0, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + # The returned state doesn't change from initial to incremental sync. In the + # future, we will only return updates but only if we've sent the room down the + # connection before. + self._assertRequiredStateIncludes( + channel.json_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Create, "")], + state_map[(EventTypes.RoomHistoryVisibility, "")], + }, + exact=True, + ) + + def test_rooms_required_state_wildcard(self) -> None: + """ + Test `rooms.required_state` returns all state events when using wildcard `["*", "*"]`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + self.helper.send_state( + room_id1, + event_type="org.matrix.foo_state", + state_key="", + body={"foo": "bar"}, + tok=user2_tok, + ) + self.helper.send_state( + room_id1, + event_type="org.matrix.foo_state", + state_key="namespaced", + body={"foo": "bar"}, + tok=user2_tok, + ) + + # Make the Sliding Sync request with wildcards for the `event_type` and `state_key` + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [StateValues.WILDCARD, StateValues.WILDCARD], + ], + "timeline_limit": 0, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + self._assertRequiredStateIncludes( + channel.json_body["rooms"][room_id1]["required_state"], + # We should see all the state events in the room + state_map.values(), + exact=True, + ) + + def test_rooms_required_state_wildcard_event_type(self) -> None: + """ + Test `rooms.required_state` returns relevant state events when using wildcard in + the event_type `["*", "foobarbaz"]`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + self.helper.send_state( + room_id1, + event_type="org.matrix.foo_state", + state_key="", + body={"foo": "bar"}, + tok=user2_tok, + ) + self.helper.send_state( + room_id1, + event_type="org.matrix.foo_state", + state_key=user2_id, + body={"foo": "bar"}, + tok=user2_tok, + ) + + # Make the Sliding Sync request with wildcards for the `event_type` + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [StateValues.WILDCARD, user2_id], + ], + "timeline_limit": 0, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + # We expect at-least any state event with the `user2_id` as the `state_key` + self._assertRequiredStateIncludes( + channel.json_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Member, user2_id)], + state_map[("org.matrix.foo_state", user2_id)], + }, + # Ideally, this would be exact but we're currently returning all state + # events when the `event_type` is a wildcard. + exact=False, + ) + + def test_rooms_required_state_wildcard_state_key(self) -> None: + """ + Test `rooms.required_state` returns relevant state events when using wildcard in + the state_key `["foobarbaz","*"]`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Make the Sliding Sync request with wildcards for the `state_key` + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Member, StateValues.WILDCARD], + ], + "timeline_limit": 0, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + self._assertRequiredStateIncludes( + channel.json_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Member, user1_id)], + state_map[(EventTypes.Member, user2_id)], + }, + exact=True, + ) + + def test_rooms_required_state_lazy_loading_room_members(self) -> None: + """ + Test `rooms.required_state` returns people relevant to the timeline when + lazy-loading room members, `["m.room.member","$LAZY"]`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user3_id = self.register_user("user3", "pass") + user3_tok = self.login(user3_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user3_id, tok=user3_tok) + + self.helper.send(room_id1, "1", tok=user2_tok) + self.helper.send(room_id1, "2", tok=user3_tok) + self.helper.send(room_id1, "3", tok=user2_tok) + + # Make the Sliding Sync request with lazy loading for the room members + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 3, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + # Only user2 and user3 sent events in the 3 events we see in the `timeline` + self._assertRequiredStateIncludes( + channel.json_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Create, "")], + state_map[(EventTypes.Member, user2_id)], + state_map[(EventTypes.Member, user3_id)], + }, + exact=True, + ) + + @parameterized.expand([(Membership.LEAVE,), (Membership.BAN,)]) + def test_rooms_required_state_leave_ban(self, stop_membership: str) -> None: + """ + Test `rooms.required_state` should not return state past a leave/ban event. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user3_id = self.register_user("user3", "pass") + user3_tok = self.login(user3_id, "pass") + + from_token = self.event_sources.get_current_token() + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user3_id, tok=user3_tok) + + self.helper.send_state( + room_id1, + event_type="org.matrix.foo_state", + state_key="", + body={"foo": "bar"}, + tok=user2_tok, + ) + + if stop_membership == Membership.LEAVE: + # User 1 leaves + self.helper.leave(room_id1, user1_id, tok=user1_tok) + elif stop_membership == Membership.BAN: + # User 1 is banned + self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + # Change the state after user 1 leaves + self.helper.send_state( + room_id1, + event_type="org.matrix.foo_state", + state_key="", + body={"foo": "qux"}, + tok=user2_tok, + ) + self.helper.leave(room_id1, user3_id, tok=user3_tok) + + # Make the Sliding Sync request with lazy loading for the room members + channel = self.make_request( + "POST", + self.sync_endpoint + + f"?pos={self.get_success(from_token.to_string(self.store))}", + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.Member, "*"], + ["org.matrix.foo_state", ""], + ], + "timeline_limit": 3, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Only user2 and user3 sent events in the 3 events we see in the `timeline` + self._assertRequiredStateIncludes( + channel.json_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Create, "")], + state_map[(EventTypes.Member, user1_id)], + state_map[(EventTypes.Member, user2_id)], + state_map[(EventTypes.Member, user3_id)], + state_map[("org.matrix.foo_state", "")], + }, + exact=True, + ) + + def test_rooms_required_state_combine_superset(self) -> None: + """ + Test `rooms.required_state` is combined across lists and room subscriptions. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + self.helper.send_state( + room_id1, + event_type="org.matrix.foo_state", + state_key="", + body={"foo": "bar"}, + tok=user2_tok, + ) + + # Make the Sliding Sync request with wildcards for the `state_key` + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.Member, user1_id], + ], + "timeline_limit": 0, + }, + "bar-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Member, StateValues.WILDCARD], + ["org.matrix.foo_state", ""], + ], + "timeline_limit": 0, + }, + } + # TODO: Room subscription should also combine with the `required_state` + # "room_subscriptions": { + # room_id1: { + # "required_state": [ + # ["org.matrix.bar_state", ""] + # ], + # "timeline_limit": 0, + # } + # } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + self._assertRequiredStateIncludes( + channel.json_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Create, "")], + state_map[(EventTypes.Member, user1_id)], + state_map[(EventTypes.Member, user2_id)], + state_map[("org.matrix.foo_state", "")], + }, + exact=True, + ) + + def test_rooms_required_state_partial_state(self) -> None: + """ + Test partially-stated room are excluded unless `rooms.required_state` is + lazy-loading room members. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok) + _join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) + join_response2 = self.helper.join(room_id2, user1_id, tok=user1_tok) + + # Mark room2 as partial state + self.get_success( + mark_event_as_partial_state(self.hs, join_response2["event_id"], room_id2) + ) + + # Make the Sliding Sync request (NOT lazy-loading room members) + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + ], + "timeline_limit": 0, + }, + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Make sure the list includes room1 but room2 is excluded because it's still + # partially-stated + self.assertListEqual( + list(channel.json_body["lists"]["foo-list"]["ops"]), + [ + { + "op": "SYNC", + "range": [0, 1], + "room_ids": [room_id1], + } + ], + channel.json_body["lists"]["foo-list"], + ) + + # Make the Sliding Sync request (with lazy-loading room members) + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + # Lazy-load room members + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 0, + }, + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # The list should include both rooms now because we're lazy-loading room members + self.assertListEqual( + list(channel.json_body["lists"]["foo-list"]["ops"]), + [ + { + "op": "SYNC", + "range": [0, 1], + "room_ids": [room_id2, room_id1], + } + ], + channel.json_body["lists"]["foo-list"], + ) diff --git a/tests/test_utils/event_injection.py b/tests/test_utils/event_injection.py index fd03c23b890..35b3245708e 100644 --- a/tests/test_utils/event_injection.py +++ b/tests/test_utils/event_injection.py @@ -125,13 +125,15 @@ async def mark_event_as_partial_state( in this table). """ store = hs.get_datastores().main - await store.db_pool.simple_upsert( - table="partial_state_rooms", - keyvalues={"room_id": room_id}, - values={}, - insertion_values={"room_id": room_id}, + # Use the store helper to insert into the database so the caches are busted + await store.store_partial_state_room( + room_id=room_id, + servers={hs.hostname}, + device_lists_stream_id=0, + joined_via=hs.hostname, ) + # FIXME: Bust the cache await store.db_pool.simple_insert( table="partial_state_events", values={