From e2ade852500ff2c2872927440f7f8f0f4c20cb4a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Aug 2024 11:10:51 +0100 Subject: [PATCH 01/23] Move sliding sync types --- synapse/handlers/sliding_sync/__init__.py | 18 +- synapse/handlers/sliding_sync/extensions.py | 14 +- synapse/handlers/sliding_sync/store.py | 8 +- synapse/types/handlers/__init__.py | 358 +----------------- .../handlers/sliding_sync.py} | 355 ++++++++++++++++- tests/handlers/test_sliding_sync.py | 2 +- 6 files changed, 377 insertions(+), 378 deletions(-) rename synapse/{handlers/sliding_sync/types.py => types/handlers/sliding_sync.py} (56%) diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 1fcf2d149b7..d10b5997db5 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -45,13 +45,6 @@ from synapse.handlers.relations import BundledAggregations from synapse.handlers.sliding_sync.extensions import SlidingSyncExtensionHandler from synapse.handlers.sliding_sync.store import SlidingSyncConnectionStore -from synapse.handlers.sliding_sync.types import ( - HaveSentRoomFlag, - MutablePerConnectionState, - PerConnectionState, - RoomSyncConfig, - StateValues, -) from synapse.logging.opentracing import ( SynapseTags, log_kv, @@ -83,7 +76,16 @@ StreamToken, UserID, ) -from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult +from synapse.types.handlers.sliding_sync import ( + HaveSentRoomFlag, + MutablePerConnectionState, + OperationType, + PerConnectionState, + RoomSyncConfig, + SlidingSyncConfig, + SlidingSyncResult, + StateValues, +) from synapse.types.state import StateFilter from synapse.util.async_helpers import concurrently_execute from synapse.visibility import filter_events_for_client diff --git a/synapse/handlers/sliding_sync/extensions.py b/synapse/handlers/sliding_sync/extensions.py index 599c74429e8..42f6034e39a 100644 --- a/synapse/handlers/sliding_sync/extensions.py +++ b/synapse/handlers/sliding_sync/extensions.py @@ -19,11 +19,6 @@ from synapse.api.constants import AccountDataTypes from synapse.handlers.receipts import ReceiptEventSource -from synapse.handlers.sliding_sync.types import ( - HaveSentRoomFlag, - MutablePerConnectionState, - PerConnectionState, -) from synapse.logging.opentracing import trace from synapse.types import ( DeviceListUpdates, @@ -32,7 +27,14 @@ SlidingSyncStreamToken, StreamToken, ) -from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult +from synapse.types.handlers.sliding_sync import ( + HaveSentRoomFlag, + MutablePerConnectionState, + OperationType, + PerConnectionState, + SlidingSyncConfig, + SlidingSyncResult, +) if TYPE_CHECKING: from synapse.server import HomeServer diff --git a/synapse/handlers/sliding_sync/store.py b/synapse/handlers/sliding_sync/store.py index 3b727432fb1..e38fe3556ff 100644 --- a/synapse/handlers/sliding_sync/store.py +++ b/synapse/handlers/sliding_sync/store.py @@ -18,13 +18,13 @@ import attr from synapse.api.errors import SlidingSyncUnknownPosition -from synapse.handlers.sliding_sync.types import ( +from synapse.logging.opentracing import trace +from synapse.types import SlidingSyncStreamToken +from synapse.types.handlers.sliding_sync import ( MutablePerConnectionState, PerConnectionState, + SlidingSyncConfig, ) -from synapse.logging.opentracing import trace -from synapse.types import SlidingSyncStreamToken -from synapse.types.handlers import SlidingSyncConfig if TYPE_CHECKING: pass diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 580342d98ad..463de1a814f 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -17,33 +17,9 @@ # [This file includes modifications made by New Vector Limited] # # -from enum import Enum -from typing import TYPE_CHECKING, Dict, Final, List, Mapping, Optional, Sequence, Tuple -import attr -from typing_extensions import TypedDict -from synapse._pydantic_compat import HAS_PYDANTIC_V2 - -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import Extra -else: - from pydantic import Extra - -from synapse.events import EventBase -from synapse.types import ( - DeviceListUpdates, - JsonDict, - JsonMapping, - Requester, - SlidingSyncStreamToken, - StreamToken, - UserID, -) -from synapse.types.rest.client import SlidingSyncBody - -if TYPE_CHECKING: - from synapse.handlers.relations import BundledAggregations +from typing import List, Optional, TypedDict class ShutdownRoomParams(TypedDict): @@ -101,335 +77,3 @@ class ShutdownRoomResponse(TypedDict): failed_to_kick_users: List[str] local_aliases: List[str] new_room_id: Optional[str] - - -class SlidingSyncConfig(SlidingSyncBody): - """ - Inherit from `SlidingSyncBody` since we need all of the same fields and add a few - extra fields that we need in the handler - """ - - user: UserID - requester: Requester - - # Pydantic config - class Config: - # By default, ignore fields that we don't recognise. - extra = Extra.ignore - # By default, don't allow fields to be reassigned after parsing. - allow_mutation = False - # Allow custom types like `UserID` to be used in the model - arbitrary_types_allowed = True - - -class OperationType(Enum): - """ - Represents the operation types in a Sliding Sync window. - - Attributes: - SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about - entries in this range. - INSERT: Sets a single entry. If the position is not empty then clients MUST move - entries to the left or the right depending on where the closest empty space is. - DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move - places. - INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for - offline support, but they should be treated as empty when additional operations - which concern indexes in the range arrive from the server. - """ - - SYNC: Final = "SYNC" - INSERT: Final = "INSERT" - DELETE: Final = "DELETE" - INVALIDATE: Final = "INVALIDATE" - - -@attr.s(slots=True, frozen=True, auto_attribs=True) -class SlidingSyncResult: - """ - The Sliding Sync result to be serialized to JSON for a response. - - Attributes: - next_pos: The next position token in the sliding window to request (next_batch). - lists: Sliding window API. A map of list key to list results. - rooms: Room subscription API. A map of room ID to room results. - extensions: Extensions API. A map of extension key to extension results. - """ - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class RoomResult: - """ - Attributes: - name: Room name or calculated room name. - avatar: Room avatar - heroes: List of stripped membership events (containing `user_id` and optionally - `avatar_url` and `displayname`) for the users used to calculate the room name. - is_dm: Flag to specify whether the room is a direct-message room (most likely - between two people). - initial: Flag which is set when this is the first time the server is sending this - data on this connection. Clients can use this flag to replace or update - their local state. When there is an update, servers MUST omit this flag - entirely and NOT send "initial":false as this is wasteful on bandwidth. The - absence of this flag means 'false'. - unstable_expanded_timeline: Flag which is set if we're returning more historic - events due to the timeline limit having increased. See "XXX: Odd behavior" - comment ing `synapse.handlers.sliding_sync`. - required_state: The current state of the room - timeline: Latest events in the room. The last event is the most recent. - bundled_aggregations: A mapping of event ID to the bundled aggregations for - the timeline events above. This allows clients to show accurate reaction - counts (or edits, threads), even if some of the reaction events were skipped - over in a gappy sync. - stripped_state: Stripped state events (for rooms where the usre is - invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2, - absent on joined/left rooms - prev_batch: A token that can be passed as a start parameter to the - `/rooms//messages` API to retrieve earlier messages. - limited: True if there are more events than `timeline_limit` looking - backwards from the `response.pos` to the `request.pos`. - num_live: The number of timeline events which have just occurred and are not historical. - The last N events are 'live' and should be treated as such. This is mostly - useful to determine whether a given @mention event should make a noise or not. - Clients cannot rely solely on the absence of `initial: true` to determine live - events because if a room not in the sliding window bumps into the window because - of an @mention it will have `initial: true` yet contain a single live event - (with potentially other old events in the timeline). - bump_stamp: The `stream_ordering` of the last event according to the - `bump_event_types`. This helps clients sort more readily without them - needing to pull in a bunch of the timeline to determine the last activity. - `bump_event_types` is a thing because for example, we don't want display - name changes to mark the room as unread and bump it to the top. For - encrypted rooms, we just have to consider any activity as a bump because we - can't see the content and the client has to figure it out for themselves. - joined_count: The number of users with membership of join, including the client's - own user ID. (same as sync `v2 m.joined_member_count`) - invited_count: The number of users with membership of invite. (same as sync v2 - `m.invited_member_count`) - notification_count: The total number of unread notifications for this room. (same - as sync v2) - highlight_count: The number of unread notifications for this room with the highlight - flag set. (same as sync v2) - """ - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class StrippedHero: - user_id: str - display_name: Optional[str] - avatar_url: Optional[str] - - name: Optional[str] - avatar: Optional[str] - heroes: Optional[List[StrippedHero]] - is_dm: bool - initial: bool - unstable_expanded_timeline: bool - # Should be empty for invite/knock rooms with `stripped_state` - required_state: List[EventBase] - # Should be empty for invite/knock rooms with `stripped_state` - timeline_events: List[EventBase] - bundled_aggregations: Optional[Dict[str, "BundledAggregations"]] - # Optional because it's only relevant to invite/knock rooms - stripped_state: List[JsonDict] - # Only optional because it won't be included for invite/knock rooms with `stripped_state` - prev_batch: Optional[StreamToken] - # Only optional because it won't be included for invite/knock rooms with `stripped_state` - limited: Optional[bool] - # Only optional because it won't be included for invite/knock rooms with `stripped_state` - num_live: Optional[int] - bump_stamp: int - joined_count: int - invited_count: int - notification_count: int - highlight_count: int - - def __bool__(self) -> bool: - return ( - # If this is the first time the client is seeing the room, we should not filter it out - # under any circumstance. - self.initial - # We need to let the client know if there are any new events - or bool(self.required_state) - or bool(self.timeline_events) - or bool(self.stripped_state) - ) - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class SlidingWindowList: - """ - Attributes: - count: The total number of entries in the list. Always present if this list - is. - ops: The sliding list operations to perform. - """ - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class Operation: - """ - Attributes: - op: The operation type to perform. - range: Which index positions are affected by this operation. These are - both inclusive. - room_ids: Which room IDs are affected by this operation. These IDs match - up to the positions in the `range`, so the last room ID in this list - matches the 9th index. The room data is held in a separate object. - """ - - op: OperationType - range: Tuple[int, int] - room_ids: List[str] - - count: int - ops: List[Operation] - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class Extensions: - """Responses for extensions - - Attributes: - to_device: The to-device extension (MSC3885) - e2ee: The E2EE device extension (MSC3884) - """ - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class ToDeviceExtension: - """The to-device extension (MSC3885) - - Attributes: - next_batch: The to-device stream token the client should use - to get more results - events: A list of to-device messages for the client - """ - - next_batch: str - events: Sequence[JsonMapping] - - def __bool__(self) -> bool: - return bool(self.events) - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class E2eeExtension: - """The E2EE device extension (MSC3884) - - Attributes: - device_list_updates: List of user_ids whose devices have changed or left (only - present on incremental syncs). - device_one_time_keys_count: Map from key algorithm to the number of - unclaimed one-time keys currently held on the server for this device. If - an algorithm is unlisted, the count for that algorithm is assumed to be - zero. If this entire parameter is missing, the count for all algorithms - is assumed to be zero. - device_unused_fallback_key_types: List of unused fallback key algorithms - for this device. - """ - - # Only present on incremental syncs - device_list_updates: Optional[DeviceListUpdates] - device_one_time_keys_count: Mapping[str, int] - device_unused_fallback_key_types: Sequence[str] - - def __bool__(self) -> bool: - # Note that "signed_curve25519" is always returned in key count responses - # regardless of whether we uploaded any keys for it. This is necessary until - # https://github.com/matrix-org/matrix-doc/issues/3298 is fixed. - # - # Also related: - # https://github.com/element-hq/element-android/issues/3725 and - # https://github.com/matrix-org/synapse/issues/10456 - default_otk = self.device_one_time_keys_count.get("signed_curve25519") - more_than_default_otk = len(self.device_one_time_keys_count) > 1 or ( - default_otk is not None and default_otk > 0 - ) - - return bool( - more_than_default_otk - or self.device_list_updates - or self.device_unused_fallback_key_types - ) - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class AccountDataExtension: - """The Account Data extension (MSC3959) - - Attributes: - global_account_data_map: Mapping from `type` to `content` of global account - data events. - account_data_by_room_map: Mapping from room_id to mapping of `type` to - `content` of room account data events. - """ - - global_account_data_map: Mapping[str, JsonMapping] - account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] - - def __bool__(self) -> bool: - return bool( - self.global_account_data_map or self.account_data_by_room_map - ) - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class ReceiptsExtension: - """The Receipts extension (MSC3960) - - Attributes: - room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral - event (type, content) - """ - - room_id_to_receipt_map: Mapping[str, JsonMapping] - - def __bool__(self) -> bool: - return bool(self.room_id_to_receipt_map) - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class TypingExtension: - """The Typing Notification extension (MSC3961) - - Attributes: - room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral - event (type, content) - """ - - room_id_to_typing_map: Mapping[str, JsonMapping] - - def __bool__(self) -> bool: - return bool(self.room_id_to_typing_map) - - to_device: Optional[ToDeviceExtension] = None - e2ee: Optional[E2eeExtension] = None - account_data: Optional[AccountDataExtension] = None - receipts: Optional[ReceiptsExtension] = None - typing: Optional[TypingExtension] = None - - def __bool__(self) -> bool: - return bool( - self.to_device - or self.e2ee - or self.account_data - or self.receipts - or self.typing - ) - - next_pos: SlidingSyncStreamToken - lists: Dict[str, SlidingWindowList] - rooms: Dict[str, RoomResult] - extensions: Extensions - - def __bool__(self) -> bool: - """Make the result appear empty if there are no updates. This is used - to tell if the notifier needs to wait for more events when polling for - events. - """ - # We don't include `self.lists` here, as a) `lists` is always non-empty even if - # there are no changes, and b) since we're sorting rooms by `stream_ordering` of - # the latest activity, anything that would cause the order to change would end - # up in `self.rooms` and cause us to send down the change. - return bool(self.rooms or self.extensions) - - @staticmethod - def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult": - "Return a new empty result" - return SlidingSyncResult( - next_pos=next_pos, - lists={}, - rooms={}, - extensions=SlidingSyncResult.Extensions(), - ) diff --git a/synapse/handlers/sliding_sync/types.py b/synapse/types/handlers/sliding_sync.py similarity index 56% rename from synapse/handlers/sliding_sync/types.py rename to synapse/types/handlers/sliding_sync.py index 003419d40a5..250f363bebe 100644 --- a/synapse/handlers/sliding_sync/types.py +++ b/synapse/types/handlers/sliding_sync.py @@ -22,26 +22,377 @@ Dict, Final, Generic, + List, Mapping, MutableMapping, Optional, + Sequence, Set, + Tuple, TypeVar, cast, ) import attr +from synapse._pydantic_compat import HAS_PYDANTIC_V2 from synapse.api.constants import EventTypes from synapse.types import MultiWriterStreamToken, RoomStreamToken, StrCollection, UserID -from synapse.types.handlers import SlidingSyncConfig + +if TYPE_CHECKING or HAS_PYDANTIC_V2: + from pydantic.v1 import Extra +else: + from pydantic import Extra + +from synapse.events import EventBase +from synapse.types import ( + DeviceListUpdates, + JsonDict, + JsonMapping, + Requester, + SlidingSyncStreamToken, + StreamToken, +) +from synapse.types.rest.client import SlidingSyncBody if TYPE_CHECKING: - pass + from synapse.handlers.relations import BundledAggregations logger = logging.getLogger(__name__) +class SlidingSyncConfig(SlidingSyncBody): + """ + Inherit from `SlidingSyncBody` since we need all of the same fields and add a few + extra fields that we need in the handler + """ + + user: UserID + requester: Requester + + # Pydantic config + class Config: + # By default, ignore fields that we don't recognise. + extra = Extra.ignore + # By default, don't allow fields to be reassigned after parsing. + allow_mutation = False + # Allow custom types like `UserID` to be used in the model + arbitrary_types_allowed = True + + +class OperationType(Enum): + """ + Represents the operation types in a Sliding Sync window. + + Attributes: + SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about + entries in this range. + INSERT: Sets a single entry. If the position is not empty then clients MUST move + entries to the left or the right depending on where the closest empty space is. + DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move + places. + INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for + offline support, but they should be treated as empty when additional operations + which concern indexes in the range arrive from the server. + """ + + SYNC: Final = "SYNC" + INSERT: Final = "INSERT" + DELETE: Final = "DELETE" + INVALIDATE: Final = "INVALIDATE" + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class SlidingSyncResult: + """ + The Sliding Sync result to be serialized to JSON for a response. + + Attributes: + next_pos: The next position token in the sliding window to request (next_batch). + lists: Sliding window API. A map of list key to list results. + rooms: Room subscription API. A map of room ID to room results. + extensions: Extensions API. A map of extension key to extension results. + """ + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class RoomResult: + """ + Attributes: + name: Room name or calculated room name. + avatar: Room avatar + heroes: List of stripped membership events (containing `user_id` and optionally + `avatar_url` and `displayname`) for the users used to calculate the room name. + is_dm: Flag to specify whether the room is a direct-message room (most likely + between two people). + initial: Flag which is set when this is the first time the server is sending this + data on this connection. Clients can use this flag to replace or update + their local state. When there is an update, servers MUST omit this flag + entirely and NOT send "initial":false as this is wasteful on bandwidth. The + absence of this flag means 'false'. + unstable_expanded_timeline: Flag which is set if we're returning more historic + events due to the timeline limit having increased. See "XXX: Odd behavior" + comment ing `synapse.handlers.sliding_sync`. + required_state: The current state of the room + timeline: Latest events in the room. The last event is the most recent. + bundled_aggregations: A mapping of event ID to the bundled aggregations for + the timeline events above. This allows clients to show accurate reaction + counts (or edits, threads), even if some of the reaction events were skipped + over in a gappy sync. + stripped_state: Stripped state events (for rooms where the usre is + invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2, + absent on joined/left rooms + prev_batch: A token that can be passed as a start parameter to the + `/rooms//messages` API to retrieve earlier messages. + limited: True if there are more events than `timeline_limit` looking + backwards from the `response.pos` to the `request.pos`. + num_live: The number of timeline events which have just occurred and are not historical. + The last N events are 'live' and should be treated as such. This is mostly + useful to determine whether a given @mention event should make a noise or not. + Clients cannot rely solely on the absence of `initial: true` to determine live + events because if a room not in the sliding window bumps into the window because + of an @mention it will have `initial: true` yet contain a single live event + (with potentially other old events in the timeline). + bump_stamp: The `stream_ordering` of the last event according to the + `bump_event_types`. This helps clients sort more readily without them + needing to pull in a bunch of the timeline to determine the last activity. + `bump_event_types` is a thing because for example, we don't want display + name changes to mark the room as unread and bump it to the top. For + encrypted rooms, we just have to consider any activity as a bump because we + can't see the content and the client has to figure it out for themselves. + joined_count: The number of users with membership of join, including the client's + own user ID. (same as sync `v2 m.joined_member_count`) + invited_count: The number of users with membership of invite. (same as sync v2 + `m.invited_member_count`) + notification_count: The total number of unread notifications for this room. (same + as sync v2) + highlight_count: The number of unread notifications for this room with the highlight + flag set. (same as sync v2) + """ + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class StrippedHero: + user_id: str + display_name: Optional[str] + avatar_url: Optional[str] + + name: Optional[str] + avatar: Optional[str] + heroes: Optional[List[StrippedHero]] + is_dm: bool + initial: bool + unstable_expanded_timeline: bool + # Should be empty for invite/knock rooms with `stripped_state` + required_state: List[EventBase] + # Should be empty for invite/knock rooms with `stripped_state` + timeline_events: List[EventBase] + bundled_aggregations: Optional[Dict[str, "BundledAggregations"]] + # Optional because it's only relevant to invite/knock rooms + stripped_state: List[JsonDict] + # Only optional because it won't be included for invite/knock rooms with `stripped_state` + prev_batch: Optional[StreamToken] + # Only optional because it won't be included for invite/knock rooms with `stripped_state` + limited: Optional[bool] + # Only optional because it won't be included for invite/knock rooms with `stripped_state` + num_live: Optional[int] + bump_stamp: int + joined_count: int + invited_count: int + notification_count: int + highlight_count: int + + def __bool__(self) -> bool: + return ( + # If this is the first time the client is seeing the room, we should not filter it out + # under any circumstance. + self.initial + # We need to let the client know if there are any new events + or bool(self.required_state) + or bool(self.timeline_events) + or bool(self.stripped_state) + ) + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class SlidingWindowList: + """ + Attributes: + count: The total number of entries in the list. Always present if this list + is. + ops: The sliding list operations to perform. + """ + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class Operation: + """ + Attributes: + op: The operation type to perform. + range: Which index positions are affected by this operation. These are + both inclusive. + room_ids: Which room IDs are affected by this operation. These IDs match + up to the positions in the `range`, so the last room ID in this list + matches the 9th index. The room data is held in a separate object. + """ + + op: OperationType + range: Tuple[int, int] + room_ids: List[str] + + count: int + ops: List[Operation] + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class Extensions: + """Responses for extensions + + Attributes: + to_device: The to-device extension (MSC3885) + e2ee: The E2EE device extension (MSC3884) + """ + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class ToDeviceExtension: + """The to-device extension (MSC3885) + + Attributes: + next_batch: The to-device stream token the client should use + to get more results + events: A list of to-device messages for the client + """ + + next_batch: str + events: Sequence[JsonMapping] + + def __bool__(self) -> bool: + return bool(self.events) + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class E2eeExtension: + """The E2EE device extension (MSC3884) + + Attributes: + device_list_updates: List of user_ids whose devices have changed or left (only + present on incremental syncs). + device_one_time_keys_count: Map from key algorithm to the number of + unclaimed one-time keys currently held on the server for this device. If + an algorithm is unlisted, the count for that algorithm is assumed to be + zero. If this entire parameter is missing, the count for all algorithms + is assumed to be zero. + device_unused_fallback_key_types: List of unused fallback key algorithms + for this device. + """ + + # Only present on incremental syncs + device_list_updates: Optional[DeviceListUpdates] + device_one_time_keys_count: Mapping[str, int] + device_unused_fallback_key_types: Sequence[str] + + def __bool__(self) -> bool: + # Note that "signed_curve25519" is always returned in key count responses + # regardless of whether we uploaded any keys for it. This is necessary until + # https://github.com/matrix-org/matrix-doc/issues/3298 is fixed. + # + # Also related: + # https://github.com/element-hq/element-android/issues/3725 and + # https://github.com/matrix-org/synapse/issues/10456 + default_otk = self.device_one_time_keys_count.get("signed_curve25519") + more_than_default_otk = len(self.device_one_time_keys_count) > 1 or ( + default_otk is not None and default_otk > 0 + ) + + return bool( + more_than_default_otk + or self.device_list_updates + or self.device_unused_fallback_key_types + ) + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class AccountDataExtension: + """The Account Data extension (MSC3959) + + Attributes: + global_account_data_map: Mapping from `type` to `content` of global account + data events. + account_data_by_room_map: Mapping from room_id to mapping of `type` to + `content` of room account data events. + """ + + global_account_data_map: Mapping[str, JsonMapping] + account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] + + def __bool__(self) -> bool: + return bool( + self.global_account_data_map or self.account_data_by_room_map + ) + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class ReceiptsExtension: + """The Receipts extension (MSC3960) + + Attributes: + room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral + event (type, content) + """ + + room_id_to_receipt_map: Mapping[str, JsonMapping] + + def __bool__(self) -> bool: + return bool(self.room_id_to_receipt_map) + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class TypingExtension: + """The Typing Notification extension (MSC3961) + + Attributes: + room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral + event (type, content) + """ + + room_id_to_typing_map: Mapping[str, JsonMapping] + + def __bool__(self) -> bool: + return bool(self.room_id_to_typing_map) + + to_device: Optional[ToDeviceExtension] = None + e2ee: Optional[E2eeExtension] = None + account_data: Optional[AccountDataExtension] = None + receipts: Optional[ReceiptsExtension] = None + typing: Optional[TypingExtension] = None + + def __bool__(self) -> bool: + return bool( + self.to_device + or self.e2ee + or self.account_data + or self.receipts + or self.typing + ) + + next_pos: SlidingSyncStreamToken + lists: Dict[str, SlidingWindowList] + rooms: Dict[str, RoomResult] + extensions: Extensions + + def __bool__(self) -> bool: + """Make the result appear empty if there are no updates. This is used + to tell if the notifier needs to wait for more events when polling for + events. + """ + # We don't include `self.lists` here, as a) `lists` is always non-empty even if + # there are no changes, and b) since we're sorting rooms by `stream_ordering` of + # the latest activity, anything that would cause the order to change would end + # up in `self.rooms` and cause us to send down the change. + return bool(self.rooms or self.extensions) + + @staticmethod + def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult": + "Return a new empty result" + return SlidingSyncResult( + next_pos=next_pos, + lists={}, + rooms={}, + extensions=SlidingSyncResult.Extensions(), + ) + + class StateValues: """ Understood values of the (type, state_key) tuple in `required_state`. diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 96da47f3b9c..1fd4f8c9cde 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -47,7 +47,7 @@ from synapse.server import HomeServer from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import JsonDict, StreamToken, UserID -from synapse.types.handlers import SlidingSyncConfig +from synapse.types.handlers.sliding_sync import SlidingSyncConfig from synapse.util import Clock from tests.replication._base import BaseMultiWorkerStreamTestCase From 5b77f4a67a2d40a82b706772ca5766f28b93ae9e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Aug 2024 16:45:52 +0100 Subject: [PATCH 02/23] Update mypy plugin to handle enums and typevars --- scripts-dev/mypy_synapse_plugin.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/scripts-dev/mypy_synapse_plugin.py b/scripts-dev/mypy_synapse_plugin.py index 877b831751f..509047b41b1 100644 --- a/scripts-dev/mypy_synapse_plugin.py +++ b/scripts-dev/mypy_synapse_plugin.py @@ -38,6 +38,7 @@ NoneType, TupleType, TypeAliasType, + TypeVarType, UninhabitedType, UnionType, ) @@ -233,6 +234,7 @@ def check_is_cacheable( "synapse.synapse_rust.push.FilteredPushRules", # This is technically not immutable, but close enough. "signedjson.types.VerifyKey", + "synapse.types.StrCollection", } # Immutable containers only if the values are also immutable. @@ -298,7 +300,7 @@ def is_cacheable( elif rt.type.fullname in MUTABLE_CONTAINER_TYPES: # Mutable containers are mutable regardless of their underlying type. - return False, None + return False, f"container {rt.type.fullname} is mutable" elif "attrs" in rt.type.metadata: # attrs classes are only cachable iff it is frozen (immutable itself) @@ -318,6 +320,9 @@ def is_cacheable( else: return False, "non-frozen attrs class" + elif rt.type.is_enum: + # We assume Enum values are immutable + return True, None else: # Ensure we fail for unknown types, these generally means that the # above code is not complete. @@ -326,6 +331,18 @@ def is_cacheable( f"Don't know how to handle {rt.type.fullname} return type instance", ) + elif isinstance(rt, TypeVarType): + # We consider TypeVars immutable if they are bound to a set of immutable + # types. + if rt.values: + for value in rt.values: + ok, note = is_cacheable(value, signature, verbose) + if not ok: + return False, f"TypeVar bound not cacheable {value}" + return True, None + + return False, "TypeVar is unbound" + elif isinstance(rt, NoneType): # None is cachable. return True, None From 7087c7c3d58f4b235532db4eb013ff190e5990b0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Aug 2024 16:46:17 +0100 Subject: [PATCH 03/23] Make RoomSyncConfig immutable --- synapse/handlers/sliding_sync/__init__.py | 20 +++++------ synapse/types/handlers/sliding_sync.py | 43 +++++++++++++---------- tests/handlers/test_sliding_sync.py | 21 +++-------- 3 files changed, 38 insertions(+), 46 deletions(-) diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index d10b5997db5..8ca4784d8bc 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -432,15 +432,11 @@ async def current_sync_for_user( room_id ) if existing_room_sync_config is not None: - existing_room_sync_config.combine_room_sync_config( + room_sync_config = existing_room_sync_config.combine_room_sync_config( room_sync_config ) - else: - # Make a copy so if we modify it later, it doesn't - # affect all references. - relevant_room_map[room_id] = ( - room_sync_config.deep_copy() - ) + + relevant_room_map[room_id] = room_sync_config room_ids_in_list.append(room_id) @@ -505,11 +501,13 @@ async def current_sync_for_user( # and need to fetch more info about. existing_room_sync_config = relevant_room_map.get(room_id) if existing_room_sync_config is not None: - existing_room_sync_config.combine_room_sync_config( - room_sync_config + room_sync_config = ( + existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) ) - else: - relevant_room_map[room_id] = room_sync_config + + relevant_room_map[room_id] = room_sync_config # Fetch room data rooms: Dict[str, SlidingSyncResult.RoomResult] = {} diff --git a/synapse/types/handlers/sliding_sync.py b/synapse/types/handlers/sliding_sync.py index 250f363bebe..084aedea4aa 100644 --- a/synapse/types/handlers/sliding_sync.py +++ b/synapse/types/handlers/sliding_sync.py @@ -18,6 +18,7 @@ from enum import Enum from typing import ( TYPE_CHECKING, + AbstractSet, Callable, Dict, Final, @@ -411,7 +412,7 @@ class StateValues: # We can't freeze this class because we want to update it in place with the # de-duplicated data. -@attr.s(slots=True, auto_attribs=True) +@attr.s(slots=True, auto_attribs=True, frozen=True) class RoomSyncConfig: """ Holds the config for what data we should fetch for a room in the sync response. @@ -425,7 +426,7 @@ class RoomSyncConfig: """ timeline_limit: int - required_state_map: Dict[str, Set[str]] + required_state_map: Mapping[str, AbstractSet[str]] @classmethod def from_room_config( @@ -499,7 +500,7 @@ def from_room_config( def deep_copy(self) -> "RoomSyncConfig": required_state_map: Dict[str, Set[str]] = { - state_type: state_key_set.copy() + state_type: set(state_key_set) for state_type, state_key_set in self.required_state_map.items() } @@ -510,14 +511,20 @@ def deep_copy(self) -> "RoomSyncConfig": def combine_room_sync_config( self, other_room_sync_config: "RoomSyncConfig" - ) -> None: + ) -> "RoomSyncConfig": """ - Combine this `RoomSyncConfig` with another `RoomSyncConfig` and take the + Combine this `RoomSyncConfig` with another `RoomSyncConfig` and return the superset union of the two. """ + timeline_limit = self.timeline_limit + required_state_map = { + event_type: set(state_keys) + for event_type, state_keys in self.required_state_map.items() + } + # Take the highest timeline limit if self.timeline_limit < other_room_sync_config.timeline_limit: - self.timeline_limit = other_room_sync_config.timeline_limit + timeline_limit = other_room_sync_config.timeline_limit # Union the required state for ( @@ -526,14 +533,14 @@ def combine_room_sync_config( ) in other_room_sync_config.required_state_map.items(): # If we already have a wildcard for everything, we don't need to add # anything else - if StateValues.WILDCARD in self.required_state_map.get( + if StateValues.WILDCARD in required_state_map.get( StateValues.WILDCARD, set() ): break # If we already have a wildcard `state_key` for this `state_type`, we don't need # to add anything else - if StateValues.WILDCARD in self.required_state_map.get(state_type, set()): + if StateValues.WILDCARD in required_state_map.get(state_type, set()): continue # If we're getting wildcards for the `state_type` and `state_key`, that's @@ -542,16 +549,14 @@ def combine_room_sync_config( state_type == StateValues.WILDCARD and StateValues.WILDCARD in state_key_set ): - self.required_state_map = {state_type: {StateValues.WILDCARD}} + required_state_map = {state_type: {StateValues.WILDCARD}} # We can break, since we don't need to add anything else break for state_key in state_key_set: # If we already have a wildcard for this specific `state_key`, we don't need # to add it since the wildcard already covers it. - if state_key in self.required_state_map.get( - StateValues.WILDCARD, set() - ): + if state_key in required_state_map.get(StateValues.WILDCARD, set()): continue # If we're getting a wildcard for the `state_type`, get rid of any other @@ -562,7 +567,7 @@ def combine_room_sync_config( # Make a copy so we don't run into an error: `dictionary changed size # during iteration`, when we remove items for existing_state_type, existing_state_key_set in list( - self.required_state_map.items() + required_state_map.items() ): # Make a copy so we don't run into an error: `Set changed size during # iteration`, when we filter out and remove items @@ -572,19 +577,21 @@ def combine_room_sync_config( # If we've the left the `set()` empty, remove it from the map if existing_state_key_set == set(): - self.required_state_map.pop(existing_state_type, None) + required_state_map.pop(existing_state_type, None) # If we're getting a wildcard `state_key`, get rid of any other state_keys # for this `state_type` since the wildcard will cover it already. if state_key == StateValues.WILDCARD: - self.required_state_map[state_type] = {state_key} + required_state_map[state_type] = {state_key} break # Otherwise, just add it to the set else: - if self.required_state_map.get(state_type) is None: - self.required_state_map[state_type] = {state_key} + if required_state_map.get(state_type) is None: + required_state_map[state_type] = {state_key} else: - self.required_state_map[state_type].add(state_key) + required_state_map[state_type].add(state_key) + + return RoomSyncConfig(timeline_limit, required_state_map) def must_await_full_state( self, diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 1fd4f8c9cde..40c26992432 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -18,7 +18,6 @@ # # import logging -from copy import deepcopy from typing import Dict, List, Optional from unittest.mock import patch @@ -566,23 +565,11 @@ def test_combine_room_sync_config( """ Combine A into B and B into A to make sure we get the same result. """ - # Since we're mutating these in place, make a copy for each of our trials - room_sync_config_a = deepcopy(a) - room_sync_config_b = deepcopy(b) + combined_config = a.combine_room_sync_config(b) + self._assert_room_config_equal(combined_config, expected, "B into A") - # Combine B into A - room_sync_config_a.combine_room_sync_config(room_sync_config_b) - - self._assert_room_config_equal(room_sync_config_a, expected, "B into A") - - # Since we're mutating these in place, make a copy for each of our trials - room_sync_config_a = deepcopy(a) - room_sync_config_b = deepcopy(b) - - # Combine A into B - room_sync_config_b.combine_room_sync_config(room_sync_config_a) - - self._assert_room_config_equal(room_sync_config_b, expected, "A into B") + combined_config = a.combine_room_sync_config(b) + self._assert_room_config_equal(combined_config, expected, "A into B") class GetRoomMembershipForUserAtToTokenTestCase(HomeserverTestCase): From e34d634778d2e4a6a1e489263760090d5931c31b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Aug 2024 16:46:35 +0100 Subject: [PATCH 04/23] Make PerConnectionState immutable --- synapse/types/handlers/sliding_sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/types/handlers/sliding_sync.py b/synapse/types/handlers/sliding_sync.py index 084aedea4aa..a79dcb815ee 100644 --- a/synapse/types/handlers/sliding_sync.py +++ b/synapse/types/handlers/sliding_sync.py @@ -682,7 +682,7 @@ class HaveSentRoomFlag(Enum): LIVE = "live" -T = TypeVar("T") +T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken) @attr.s(auto_attribs=True, slots=True, frozen=True) @@ -797,7 +797,7 @@ def record_unsent_rooms(self, room_ids: StrCollection, from_token: T) -> None: self._statuses[room_id] = HaveSentRoom.previously(from_token) -@attr.s(auto_attribs=True) +@attr.s(auto_attribs=True, frozen=True) class PerConnectionState: """The per-connection state. A snapshot of what we've sent down the connection before. From 87d53368d7766685bc63714846e951390193f227 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Aug 2024 17:24:19 +0100 Subject: [PATCH 05/23] Newsfile --- changelog.d/17600.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17600.misc diff --git a/changelog.d/17600.misc b/changelog.d/17600.misc new file mode 100644 index 00000000000..a81c67f6d18 --- /dev/null +++ b/changelog.d/17600.misc @@ -0,0 +1 @@ +Make the sliding sync `PerConnectionState` class immutable. From d1ee253bef442816e7ba0238149bbb2df940acf0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Aug 2024 14:08:19 +0100 Subject: [PATCH 06/23] Allow making columns AUTOINCREMENT --- synapse/storage/engines/_base.py | 5 +++++ synapse/storage/engines/postgres.py | 7 +++++++ synapse/storage/engines/sqlite.py | 6 ++++++ 3 files changed, 18 insertions(+) diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index ad222e7e2d6..9d82c59384e 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -28,6 +28,11 @@ from synapse.storage.database import LoggingDatabaseConnection +# A string that will be replaced with the appropriate auto increment directive +# for the database engine, expands to an auto incrementing integer primary key. +AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER = "$%AUTO_INCREMENT_PRIMARY_KEY%$" + + class IsolationLevel(IntEnum): READ_COMMITTED: int = 1 REPEATABLE_READ: int = 2 diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 90641d5a181..8c8c6d04144 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -25,6 +25,7 @@ import psycopg2.extensions from synapse.storage.engines._base import ( + AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER, BaseDatabaseEngine, IncorrectDatabaseSetup, IsolationLevel, @@ -256,4 +257,10 @@ def executescript(cursor: psycopg2.extensions.cursor, script: str) -> None: executing the script in its own transaction. The script transaction is left open and it is the responsibility of the caller to commit it. """ + # Replace auto increment placeholder with the appropriate directive + script = script.replace( + AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER, + "BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY", + ) + cursor.execute(f"COMMIT; BEGIN TRANSACTION; {script}") diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index b11094c5c17..9d1795ebe59 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Any, List, Mapping, Optional from synapse.storage.engines import BaseDatabaseEngine +from synapse.storage.engines._base import AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER from synapse.storage.types import Cursor if TYPE_CHECKING: @@ -168,6 +169,11 @@ def executescript(cursor: sqlite3.Cursor, script: str) -> None: > first. No other implicit transaction control is performed; any transaction > control must be added to sql_script. """ + # Replace auto increment placeholder with the appropriate directive + script = script.replace( + AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER, "INTEGER PRIMARY KEY AUTOINCREMENT" + ) + # The implementation of `executescript` can be found at # https://github.com/python/cpython/blob/3.11/Modules/_sqlite/cursor.c#L1035. cursor.executescript(f"BEGIN TRANSACTION; {script}") From b3d8e2d2bdc74bf565a3d9a280884109226845ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Aug 2024 17:18:08 +0100 Subject: [PATCH 07/23] Add simple_insert_returning_txn --- synapse/storage/database.py | 45 +++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 569f6181939..132fcf3db3f 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -64,6 +64,7 @@ from synapse.storage.background_updates import BackgroundUpdater from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.types import Connection, Cursor, SQLQueryParameters +from synapse.types import StrCollection from synapse.util.async_helpers import delay_cancellation from synapse.util.iterutils import batch_iter @@ -1095,6 +1096,50 @@ def simple_insert_txn( txn.execute(sql, vals) + @staticmethod + def simple_insert_returning_txn( + txn: LoggingTransaction, + table: str, + values: Dict[str, Any], + returning: StrCollection, + ) -> Tuple[Any, ...]: + """Executes a `INSERT INTO... RETURNING...` statement (or equivalent for + SQLite versions that don't support it). + """ + + if txn.database_engine.supports_returning: + keys, vals = zip(*values.items()) + + sql = "INSERT INTO %s (%s) VALUES(%s) RETURNING %s" % ( + table, + ", ".join(k for k in keys), + ", ".join("?" for _ in keys), + ", ".join(k for k in returning), + ) + + txn.execute(sql, vals) + row = txn.fetchone() + assert row is not None + return row + else: + # For old versions of SQLite we do a standard insert and then can + # use `last_insert_rowid` to get at the row we just inserted + DatabasePool.simple_insert_txn( + txn, + table=table, + values=values, + ) + txn.execute("SELECT last_insert_rowid()") + row = txn.fetchone() + assert row is not None + (rowid,) = row + + row = DatabasePool.simple_select_one_txn( + txn, table=table, keyvalues={"rowid": rowid}, retcols=returning + ) + assert row is not None + return row + async def simple_insert_many( self, table: str, From 3838b18d3b833be96fb8869f6279e3c6edb8954b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Aug 2024 08:50:02 +0100 Subject: [PATCH 08/23] Store state --- synapse/app/generic_worker.py | 2 + synapse/handlers/sliding_sync/__init__.py | 2 +- synapse/handlers/sliding_sync/store.py | 129 ++--- synapse/storage/databases/main/__init__.py | 2 + .../storage/databases/main/sliding_sync.py | 473 ++++++++++++++++++ synapse/storage/schema/__init__.py | 2 +- .../main/delta/87/02_per_connection_state.sql | 78 +++ synapse/types/handlers/sliding_sync.py | 6 + .../sliding_sync/test_rooms_required_state.py | 10 +- 9 files changed, 601 insertions(+), 103 deletions(-) create mode 100644 synapse/storage/databases/main/sliding_sync.py create mode 100644 synapse/storage/schema/main/delta/87/02_per_connection_state.sql diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 53f18592563..18d294f2b2a 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -98,6 +98,7 @@ from synapse.storage.databases.main.search import SearchStore from synapse.storage.databases.main.session import SessionStore from synapse.storage.databases.main.signatures import SignatureWorkerStore +from synapse.storage.databases.main.sliding_sync import SlidingSyncStore from synapse.storage.databases.main.state import StateGroupWorkerStore from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.stream import StreamWorkerStore @@ -159,6 +160,7 @@ class GenericWorkerStore( SessionStore, TaskSchedulerWorkerStore, ExperimentalFeaturesStore, + SlidingSyncStore, ): # Properties that multiple storage classes define. Tell mypy what the # expected type is. diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 8ca4784d8bc..683281a8a2c 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -208,7 +208,7 @@ def __init__(self, hs: "HomeServer"): self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync self.is_mine_id = hs.is_mine_id - self.connection_store = SlidingSyncConnectionStore() + self.connection_store = SlidingSyncConnectionStore(self.store) self.extensions = SlidingSyncExtensionHandler(hs) async def wait_for_sync_for_user( diff --git a/synapse/handlers/sliding_sync/store.py b/synapse/handlers/sliding_sync/store.py index e38fe3556ff..21511ae5b80 100644 --- a/synapse/handlers/sliding_sync/store.py +++ b/synapse/handlers/sliding_sync/store.py @@ -13,12 +13,12 @@ # import logging -from typing import TYPE_CHECKING, Dict, Optional, Tuple +from typing import TYPE_CHECKING, Optional import attr -from synapse.api.errors import SlidingSyncUnknownPosition from synapse.logging.opentracing import trace +from synapse.storage.databases.main import DataStore from synapse.types import SlidingSyncStreamToken from synapse.types.handlers.sliding_sync import ( MutablePerConnectionState, @@ -61,20 +61,7 @@ class SlidingSyncConnectionStore: to mapping of room ID to `HaveSentRoom`. """ - # `(user_id, conn_id)` -> `connection_position` -> `PerConnectionState` - _connections: Dict[Tuple[str, str], Dict[int, PerConnectionState]] = attr.Factory( - dict - ) - - async def is_valid_token( - self, sync_config: SlidingSyncConfig, connection_token: int - ) -> bool: - """Return whether the connection token is valid/recognized""" - if connection_token == 0: - return True - - conn_key = self._get_connection_key(sync_config) - return connection_token in self._connections.get(conn_key, {}) + store: "DataStore" async def get_per_connection_state( self, @@ -86,23 +73,20 @@ async def get_per_connection_state( Raises: SlidingSyncUnknownPosition if the connection_token is unknown """ - if from_token is None: - return PerConnectionState() - - connection_position = from_token.connection_position - if connection_position == 0: - # Initial sync (request without a `from_token`) starts at `0` so - # there is no existing per-connection state + if from_token is None or from_token.connection_position == 0: return PerConnectionState() - conn_key = self._get_connection_key(sync_config) - sync_statuses = self._connections.get(conn_key, {}) - connection_state = sync_statuses.get(connection_position) + conn_id = sync_config.conn_id or "" - if connection_state is None: - raise SlidingSyncUnknownPosition() + device_id = sync_config.requester.device_id + assert device_id is not None - return connection_state + return await self.store.get_per_connection_state( + sync_config.user.to_string(), + device_id, + conn_id, + from_token.connection_position, + ) @trace async def record_new_state( @@ -116,26 +100,27 @@ async def record_new_state( If there are no changes to the state this may return the same token as the existing per-connection state. """ - prev_connection_token = 0 - if from_token is not None: - prev_connection_token = from_token.connection_position - if not new_connection_state.has_updates(): - return prev_connection_token + if from_token is not None: + return from_token.connection_position + else: + return 0 - conn_key = self._get_connection_key(sync_config) - sync_statuses = self._connections.setdefault(conn_key, {}) + if from_token is not None and from_token.connection_position == 0: + from_token = None - # Generate a new token, removing any existing entries in that token - # (which can happen if requests get resent). - new_store_token = prev_connection_token + 1 - sync_statuses.pop(new_store_token, None) + conn_id = sync_config.conn_id or "" - # We copy the `MutablePerConnectionState` so that the inner `ChainMap`s - # don't grow forever. - sync_statuses[new_store_token] = new_connection_state.copy() + device_id = sync_config.requester.device_id + assert device_id is not None - return new_store_token + return await self.store.persist_per_connection_state( + sync_config.user.to_string(), + device_id, + conn_id, + from_token.connection_position if from_token else None, + new_connection_state, + ) @trace async def mark_token_seen( @@ -143,58 +128,4 @@ async def mark_token_seen( sync_config: SlidingSyncConfig, from_token: Optional[SlidingSyncStreamToken], ) -> None: - """We have received a request with the given token, so we can clear out - any other tokens associated with the connection. - - If there is no from token then we have started afresh, and so we delete - all tokens associated with the device. - """ - # Clear out any tokens for the connection that doesn't match the one - # from the request. - - conn_key = self._get_connection_key(sync_config) - sync_statuses = self._connections.pop(conn_key, {}) - if from_token is None: - return - - sync_statuses = { - connection_token: room_statuses - for connection_token, room_statuses in sync_statuses.items() - if connection_token == from_token.connection_position - } - if sync_statuses: - self._connections[conn_key] = sync_statuses - - @staticmethod - def _get_connection_key(sync_config: SlidingSyncConfig) -> Tuple[str, str]: - """Return a unique identifier for this connection. - - The first part is simply the user ID. - - The second part is generally a combination of device ID and conn_id. - However, both these two are optional (e.g. puppet access tokens don't - have device IDs), so this handles those edge cases. - - We use this over the raw `conn_id` to avoid clashes between different - clients that use the same `conn_id`. Imagine a user uses a web client - that uses `conn_id: main_sync_loop` and an Android client that also has - a `conn_id: main_sync_loop`. - """ - - user_id = sync_config.user.to_string() - - # Only one sliding sync connection is allowed per given conn_id (empty - # or not). - conn_id = sync_config.conn_id or "" - - if sync_config.requester.device_id: - return (user_id, f"D/{sync_config.requester.device_id}/{conn_id}") - - if sync_config.requester.access_token_id: - # If we don't have a device, then the access token ID should be a - # stable ID. - return (user_id, f"A/{sync_config.requester.access_token_id}/{conn_id}") - - # If we have neither then its likely an AS or some weird token. Either - # way we can just fail here. - raise Exception("Cannot use sliding sync with access token type") + pass diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 586e84f2a4d..9a43ab63e82 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -33,6 +33,7 @@ LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.databases.main.sliding_sync import SlidingSyncStore from synapse.storage.databases.main.stats import UserSortOrder from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.types import Cursor @@ -156,6 +157,7 @@ class DataStore( LockStore, SessionStore, TaskSchedulerWorkerStore, + SlidingSyncStore, ): def __init__( self, diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py new file mode 100644 index 00000000000..2166758a4c1 --- /dev/null +++ b/synapse/storage/databases/main/sliding_sync.py @@ -0,0 +1,473 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2023 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# + + +from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Set, cast + +import attr + +from synapse.api.errors import SlidingSyncUnknownPosition +from synapse.logging.opentracing import log_kv +from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage.database import LoggingTransaction +from synapse.types import MultiWriterStreamToken, RoomStreamToken +from synapse.types.handlers.sliding_sync import ( + HaveSentRoom, + HaveSentRoomFlag, + MutablePerConnectionState, + PerConnectionState, + RoomStatusMap, + RoomSyncConfig, +) +from synapse.util import json_encoder +from synapse.util.caches.descriptors import cached + +if TYPE_CHECKING: + from synapse.storage.databases.main import DataStore + + +class SlidingSyncStore(SQLBaseStore): + async def persist_per_connection_state( + self, + user_id: str, + device_id: str, + conn_id: str, + previous_connection_position: Optional[int], + per_connection_state: "MutablePerConnectionState", + ) -> int: + """Persist updates to the per-connection state for a sliding sync + connection. + + Returns: + The connection position of the newly persisted state. + """ + + store = cast("DataStore", self) + return await self.db_pool.runInteraction( + "persist_per_connection_state", + self.persist_per_connection_state_txn, + user_id=user_id, + device_id=device_id, + conn_id=conn_id, + previous_connection_position=previous_connection_position, + per_connection_state=await PerConnectionStateDB.from_state( + per_connection_state, store + ), + ) + + def persist_per_connection_state_txn( + self, + txn: LoggingTransaction, + user_id: str, + device_id: str, + conn_id: str, + previous_connection_position: Optional[int], + per_connection_state: "PerConnectionStateDB", + ) -> int: + # First we fetch the (or create) the connection key associated with the + # previous connection position. + if previous_connection_position is not None: + # The `previous_connection_position` is a user-supplied value, so we + # need to make sure that the one they supplied is actually theirs. + sql = """ + SELECT connection_key + FROM sliding_sync_connection_positions + INNER JOIN sliding_sync_connections USING (connection_key) + WHERE + connection_position = ? + AND user_id = ? AND device_id = ? AND conn_id = ? + """ + txn.execute( + sql, (previous_connection_position, user_id, device_id, conn_id) + ) + row = txn.fetchone() + if row is None: + raise SlidingSyncUnknownPosition() + + (connection_key,) = row + else: + # We're restarting the connection, so we clear all existing + # connections. We do this here to ensure that if we get lots of + # one-shot requests we don't stack up lots of entries. + self.db_pool.simple_delete_txn( + txn, + table="sliding_sync_connections", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + "conn_id": conn_id, + }, + ) + + (connection_key,) = self.db_pool.simple_insert_returning_txn( + txn, + table="sliding_sync_connections", + values={ + "user_id": user_id, + "device_id": device_id, + "conn_id": conn_id, + "created_ts": self._clock.time_msec(), + }, + returning=("connection_key",), + ) + + # Define a new connection position for the updates + (connection_position,) = self.db_pool.simple_insert_returning_txn( + txn, + table="sliding_sync_connection_positions", + values={ + "connection_key": connection_key, + "created_ts": self._clock.time_msec(), + }, + returning=("connection_position",), + ) + + # We need to deduplicate the `required_state` JSON. We do this by + # fetching all JSON associated with the connection and comparing that + # with the updates to `required_state` + + # Dict from required state json -> required state ID + required_state_to_id: Dict[str, int] = {} + if previous_connection_position is not None: + rows = self.db_pool.simple_select_list_txn( + txn, + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=("required_state_id", "required_state"), + ) + for required_state_id, required_state in rows: + required_state_to_id[required_state] = required_state_id + + room_to_state_ids: Dict[str, int] = {} + unique_required_state: Dict[str, List[str]] = {} + for room_id, room_state in per_connection_state.room_configs.items(): + serialized_state = json_encoder.encode( + # We store the required state as a sorted list of event type / + # state key tuples. + sorted( + (event_type, state_key) + for event_type, state_keys in room_state.required_state_map.items() + for state_key in state_keys + ) + ) + + existing_state_id = required_state_to_id.get(serialized_state) + if existing_state_id is not None: + room_to_state_ids[room_id] = existing_state_id + else: + unique_required_state.setdefault(serialized_state, []).append(room_id) + + # Insert any new `required_state` json we haven't previously seen. + for serialized_required_state, room_ids in unique_required_state.items(): + (required_state_id,) = self.db_pool.simple_insert_returning_txn( + txn, + table="sliding_sync_connection_required_state", + values={ + "connection_key": connection_key, + "required_state": serialized_required_state, + }, + returning=("required_state_id",), + ) + for room_id in room_ids: + room_to_state_ids[room_id] = required_state_id + + # Copy over state from the previous connection position (we'll overwrite + # these rows with any changes). + if previous_connection_position is not None: + sql = """ + INSERT INTO sliding_sync_connection_streams + (connection_position, stream, room_id, room_status, last_position) + SELECT ?, stream, room_id, room_status, last_position + FROM sliding_sync_connection_streams + WHERE connection_position = ? + """ + txn.execute(sql, (connection_position, previous_connection_position)) + + sql = """ + INSERT INTO sliding_sync_connection_room_configs + (connection_position, room_id, timeline_limit, required_state_id) + SELECT ?, room_id, timeline_limit, required_state_id + FROM sliding_sync_connection_room_configs + WHERE connection_position = ? + """ + txn.execute(sql, (connection_position, previous_connection_position)) + + # We now upsert the changes to the various streams. + key_values = [] + value_values = [] + for room_id, have_sent_room in per_connection_state.rooms._statuses.items(): + key_values.append((connection_position, "rooms", room_id)) + value_values.append( + (have_sent_room.status.value, have_sent_room.last_token) + ) + + for room_id, have_sent_room in per_connection_state.receipts._statuses.items(): + key_values.append((connection_position, "receipts", room_id)) + value_values.append( + (have_sent_room.status.value, have_sent_room.last_token) + ) + + self.db_pool.simple_upsert_many_txn( + txn, + table="sliding_sync_connection_streams", + key_names=( + "connection_position", + "stream", + "room_id", + ), + key_values=key_values, + value_names=( + "room_status", + "last_position", + ), + value_values=value_values, + ) + + # ... and upsert changes to the room configs. + keys = [] + values = [] + for room_id, room_config in per_connection_state.room_configs.items(): + keys.append((connection_position, room_id)) + values.append((room_config.timeline_limit, room_to_state_ids[room_id])) + + self.db_pool.simple_upsert_many_txn( + txn, + table="sliding_sync_connection_room_configs", + key_names=( + "connection_position", + "room_id", + ), + key_values=keys, + value_names=( + "timeline_limit", + "required_state_id", + ), + value_values=values, + ) + + return connection_position + + @cached(iterable=True, max_entries=100000) + async def get_per_connection_state( + self, user_id: str, device_id: str, conn_id: str, connection_position: int + ) -> "PerConnectionState": + """Get the per-connection state for the given connection position.""" + + per_connection_state_db = await self.db_pool.runInteraction( + "get_per_connection_state", + self._get_per_connection_state_txn, + user_id=user_id, + device_id=device_id, + conn_id=conn_id, + connection_position=connection_position, + ) + store = cast("DataStore", self) + return await per_connection_state_db.to_state(store) + + def _get_per_connection_state_txn( + self, + txn: LoggingTransaction, + user_id: str, + device_id: str, + conn_id: str, + connection_position: int, + ) -> "PerConnectionStateDB": + # The `previous_connection_position` is a user-supplied value, so we + # need to make sure that the one they supplied is actually theirs. + sql = """ + SELECT connection_key + FROM sliding_sync_connection_positions + INNER JOIN sliding_sync_connections USING (connection_key) + WHERE + connection_position = ? + AND user_id = ? AND device_id = ? AND conn_id = ? + """ + txn.execute(sql, (connection_position, user_id, device_id, conn_id)) + row = txn.fetchone() + if row is None: + raise SlidingSyncUnknownPosition() + + (connection_key,) = row + + # Now that we have seen the client has received and used the connection + # position, we can delete all the other connection positions. + sql = """ + DELETE FROM sliding_sync_connection_positions + WHERE connection_key = ? AND connection_position != ? + """ + txn.execute(sql, (connection_key, connection_position)) + + # Fetch and create a mapping from required state ID to the actual + # required state for the connection. + rows = self.db_pool.simple_select_list_txn( + txn, + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=( + "required_state_id", + "required_state", + ), + ) + + required_state_map: Dict[int, Dict[str, Set[str]]] = {} + for row in rows: + state = required_state_map[row[0]] = {} + for event_type, state_keys in db_to_json(row[1]): + state[event_type] = set(state_keys) + + # Get all the room configs, looking up the required state from the map + # above. + room_config_rows = self.db_pool.simple_select_list_txn( + txn, + table="sliding_sync_connection_room_configs", + keyvalues={"connection_position": connection_position}, + retcols=( + "room_id", + "timeline_limit", + "required_state_id", + ), + ) + + room_configs: Dict[str, RoomSyncConfig] = {} + for ( + room_id, + timeline_limit, + required_state_id, + ) in room_config_rows: + room_configs[room_id] = RoomSyncConfig( + timeline_limit=timeline_limit, + required_state_map=required_state_map[required_state_id], + ) + + # Now look up the per-room stream data. + rooms: Dict[str, HaveSentRoom[str]] = {} + receipts: Dict[str, HaveSentRoom[str]] = {} + + receipt_rows = self.db_pool.simple_select_list_txn( + txn, + table="sliding_sync_connection_streams", + keyvalues={"connection_position": connection_position}, + retcols=( + "stream", + "room_id", + "room_status", + "last_position", + ), + ) + for stream, room_id, room_status, last_position in receipt_rows: + have_sent_room: HaveSentRoom[str] = HaveSentRoom( + status=HaveSentRoomFlag(room_status), last_token=last_position + ) + if stream == "rooms": + rooms[room_id] = have_sent_room + elif stream == "receipts": + receipts[room_id] = have_sent_room + + return PerConnectionStateDB( + rooms=RoomStatusMap(rooms), + receipts=RoomStatusMap(receipts), + room_configs=room_configs, + ) + + +@attr.s(auto_attribs=True, frozen=True) +class PerConnectionStateDB: + """An equivalent to `PerConnectionState` that holds data in a format stored + in the DB. + + The principle difference is that the tokens for the different streams are + serialized to strings. + + When persisting this *only* contains updates to the state. + """ + + rooms: "RoomStatusMap[str]" + receipts: "RoomStatusMap[str]" + + room_configs: Mapping[str, "RoomSyncConfig"] + + @staticmethod + async def from_state( + per_connection_state: "MutablePerConnectionState", store: "DataStore" + ) -> "PerConnectionStateDB": + """Convert from a standard `PerConnectionState`""" + rooms = { + room_id: HaveSentRoom( + status=status.status, + last_token=( + await status.last_token.to_string(store) + if status.last_token is not None + else None + ), + ) + for room_id, status in per_connection_state.rooms.get_updates().items() + } + + receipts = { + room_id: HaveSentRoom( + status=status.status, + last_token=( + await status.last_token.to_string(store) + if status.last_token is not None + else None + ), + ) + for room_id, status in per_connection_state.receipts.get_updates().items() + } + + log_kv( + { + "rooms": rooms, + "receipts": receipts, + "room_configs": per_connection_state.room_configs.maps[0], + } + ) + + return PerConnectionStateDB( + rooms=RoomStatusMap(rooms), + receipts=RoomStatusMap(receipts), + room_configs=per_connection_state.room_configs.maps[0], + ) + + async def to_state(self, store: "DataStore") -> "PerConnectionState": + """Convert into a standard `PerConnectionState`""" + rooms = { + room_id: HaveSentRoom( + status=status.status, + last_token=( + await RoomStreamToken.parse(store, status.last_token) + if status.last_token is not None + else None + ), + ) + for room_id, status in self.rooms._statuses.items() + } + + receipts = { + room_id: HaveSentRoom( + status=status.status, + last_token=( + await MultiWriterStreamToken.parse(store, status.last_token) + if status.last_token is not None + else None + ), + ) + for room_id, status in self.receipts._statuses.items() + } + + return PerConnectionState( + rooms=RoomStatusMap(rooms), + receipts=RoomStatusMap(receipts), + room_configs=self.room_configs, + ) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 581d00346bf..53d4ddd9bcc 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -19,7 +19,7 @@ # # -SCHEMA_VERSION = 86 # remember to update the list below when updating +SCHEMA_VERSION = 87 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the diff --git a/synapse/storage/schema/main/delta/87/02_per_connection_state.sql b/synapse/storage/schema/main/delta/87/02_per_connection_state.sql new file mode 100644 index 00000000000..28bda9cbfd0 --- /dev/null +++ b/synapse/storage/schema/main/delta/87/02_per_connection_state.sql @@ -0,0 +1,78 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + + +-- Table to track active sliding sync connections. +-- +-- A new connection will be created for every sliding sync request without a +-- `since` token for a given `conn_id` for a device.# +-- +-- Once a new connection is created and used we delete all other connections for +-- the `conn_id`. +CREATE TABLE sliding_sync_connections( + connection_key $%AUTO_INCREMENT_PRIMARY_KEY%$, + user_id TEXT NOT NULL, + device_id TEXT NOT NULL, + conn_id TEXT NOT NULL, + created_ts BIGINT NOT NULL +); + +CREATE INDEX sliding_sync_connections_idx ON sliding_sync_connections(user_id, device_id, conn_id); + +-- We track per-connection state by associating changes to the state with +-- connection positions. This ensures that we correctly track state even if we +-- see retries of requests. +-- +-- If the client starts a "new" connection (by not specifying a since token), +-- we'll clear out the other connections (to ensure that we don't end up with +-- lots of connection keys). +CREATE TABLE sliding_sync_connection_positions( + connection_position $%AUTO_INCREMENT_PRIMARY_KEY%$, + connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE, + created_ts BIGINT NOT NULL +); + +CREATE INDEX sliding_sync_connection_positions_key ON sliding_sync_connection_positions(connection_key); + + +-- To save space we deduplicate the `required_state` json by assigning IDs to +-- different values. +CREATE TABLE sliding_sync_connection_required_state( + required_state_id $%AUTO_INCREMENT_PRIMARY_KEY%$, + connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE, + required_state TEXT NOT NULL -- We store this as a json list of event type / state key tuples. +); + +CREATE INDEX sliding_sync_connection_required_state_conn_pos ON sliding_sync_connections(connection_key); + + +-- Stores the room configs we have seen for rooms in a connection. +CREATE TABLE sliding_sync_connection_room_configs( + connection_position BIGINT NOT NULL REFERENCES sliding_sync_connection_positions(connection_position) ON DELETE CASCADE, + room_id TEXT NOT NULL, + timeline_limit BIGINT NOT NULL, + required_state_id BIGINT NOT NULL REFERENCES sliding_sync_connection_required_state(required_state_id) +); + +CREATE UNIQUE INDEX sliding_sync_connection_room_configs_idx ON sliding_sync_connection_room_configs(connection_position, room_id); + +-- Stores what data we have sent for given streams down given connections. +CREATE TABLE sliding_sync_connection_streams( + connection_position BIGINT NOT NULL REFERENCES sliding_sync_connection_positions(connection_position) ON DELETE CASCADE, + stream TEXT NOT NULL, -- e.g. "events" or "receipts" + room_id TEXT NOT NULL, + room_status TEXT NOT NULL, -- "live" or "previously", i.e. the `HaveSentRoomFlag` value + last_position TEXT -- For "previously" the token for the stream we have sent up to. +); + +CREATE UNIQUE INDEX sliding_sync_connection_streams_idx ON sliding_sync_connection_streams(connection_position, room_id, stream); diff --git a/synapse/types/handlers/sliding_sync.py b/synapse/types/handlers/sliding_sync.py index a79dcb815ee..a2b963db93f 100644 --- a/synapse/types/handlers/sliding_sync.py +++ b/synapse/types/handlers/sliding_sync.py @@ -741,6 +741,9 @@ def copy(self) -> "RoomStatusMap[T]": return RoomStatusMap(statuses=dict(self._statuses)) + def __len__(self) -> int: + return len(self._statuses) + class MutableRoomStatusMap(RoomStatusMap[T]): """A mutable version of `RoomStatusMap`""" @@ -842,6 +845,9 @@ def copy(self) -> "PerConnectionState": room_configs=dict(self.room_configs), ) + def __len__(self) -> int: + return len(self.rooms) + len(self.receipts) + len(self.room_configs) + @attr.s(auto_attribs=True) class MutablePerConnectionState(PerConnectionState): diff --git a/tests/rest/client/sliding_sync/test_rooms_required_state.py b/tests/rest/client/sliding_sync/test_rooms_required_state.py index 823e7db569d..2a5b097eada 100644 --- a/tests/rest/client/sliding_sync/test_rooms_required_state.py +++ b/tests/rest/client/sliding_sync/test_rooms_required_state.py @@ -191,8 +191,14 @@ def test_rooms_incremental_sync_restart(self) -> None: } _, from_token = self.do_sync(sync_body, tok=user1_tok) - # Reset the in-memory cache - self.hs.get_sliding_sync_handler().connection_store._connections.clear() + # Reset the positions + self.get_success( + self.store.db_pool.simple_delete( + table="sliding_sync_connections", + keyvalues={"user_id": user1_id}, + desc="clear_cache", + ) + ) # Make the Sliding Sync request channel = self.make_request( From ed7591cbefccbb3d09cdae1dab28a8547ba65870 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Aug 2024 13:46:07 +0100 Subject: [PATCH 09/23] Remove mark_token_seen --- synapse/handlers/sliding_sync/__init__.py | 5 ----- synapse/handlers/sliding_sync/store.py | 8 -------- 2 files changed, 13 deletions(-) diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 683281a8a2c..17a2f511bdb 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -333,11 +333,6 @@ async def current_sync_for_user( ) ) - await self.connection_store.mark_token_seen( - sync_config=sync_config, - from_token=from_token, - ) - # Get all of the room IDs that the user should be able to see in the sync # response has_lists = sync_config.lists is not None and len(sync_config.lists) > 0 diff --git a/synapse/handlers/sliding_sync/store.py b/synapse/handlers/sliding_sync/store.py index 21511ae5b80..5cd2564515b 100644 --- a/synapse/handlers/sliding_sync/store.py +++ b/synapse/handlers/sliding_sync/store.py @@ -121,11 +121,3 @@ async def record_new_state( from_token.connection_position if from_token else None, new_connection_state, ) - - @trace - async def mark_token_seen( - self, - sync_config: SlidingSyncConfig, - from_token: Optional[SlidingSyncStreamToken], - ) -> None: - pass From 03eac5ae603dc2a452d72aff1c772b906df1ef4f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Aug 2024 17:26:22 +0100 Subject: [PATCH 10/23] Newsfile --- changelog.d/17599.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17599.misc diff --git a/changelog.d/17599.misc b/changelog.d/17599.misc new file mode 100644 index 00000000000..2f81356d127 --- /dev/null +++ b/changelog.d/17599.misc @@ -0,0 +1 @@ +Store sliding sync per-connection state in the database. From 7935423ec482a11ea0c385ed493a6716d399dee3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Aug 2024 20:02:10 +0100 Subject: [PATCH 11/23] Apply suggestions from code review Co-authored-by: Eric Eastwood --- synapse/handlers/sliding_sync/store.py | 1 + synapse/storage/databases/main/sliding_sync.py | 12 ++++++++---- .../schema/main/delta/87/02_per_connection_state.sql | 2 +- .../client/sliding_sync/test_rooms_required_state.py | 2 +- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sliding_sync/store.py b/synapse/handlers/sliding_sync/store.py index 5cd2564515b..cdafae27dba 100644 --- a/synapse/handlers/sliding_sync/store.py +++ b/synapse/handlers/sliding_sync/store.py @@ -73,6 +73,7 @@ async def get_per_connection_state( Raises: SlidingSyncUnknownPosition if the connection_token is unknown """ + # If this is our first request, there is no previous connection state to fetch out of the database if from_token is None or from_token.connection_position == 0: return PerConnectionState() diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index 2166758a4c1..e1ea85b6e56 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -75,7 +75,7 @@ def persist_per_connection_state_txn( previous_connection_position: Optional[int], per_connection_state: "PerConnectionStateDB", ) -> int: - # First we fetch the (or create) the connection key associated with the + # First we fetch (or create) the connection key associated with the # previous connection position. if previous_connection_position is not None: # The `previous_connection_position` is a user-supplied value, so we @@ -97,9 +97,11 @@ def persist_per_connection_state_txn( (connection_key,) = row else: - # We're restarting the connection, so we clear all existing - # connections. We do this here to ensure that if we get lots of - # one-shot requests we don't stack up lots of entries. + # We're restarting the connection, so we clear the previous existing data we + # used to track it. We do this here to ensure that if we get lots of + # one-shot requests we don't stack up lots of entries. We have `ON DELETE + # CASCADE` setup on the dependent tables so this will clear out all the + # associated data. self.db_pool.simple_delete_txn( txn, table="sliding_sync_connections", @@ -373,6 +375,8 @@ def _get_per_connection_state_txn( rooms[room_id] = have_sent_room elif stream == "receipts": receipts[room_id] = have_sent_room + else + raise AssertionError(...) return PerConnectionStateDB( rooms=RoomStatusMap(rooms), diff --git a/synapse/storage/schema/main/delta/87/02_per_connection_state.sql b/synapse/storage/schema/main/delta/87/02_per_connection_state.sql index 28bda9cbfd0..08f6d362910 100644 --- a/synapse/storage/schema/main/delta/87/02_per_connection_state.sql +++ b/synapse/storage/schema/main/delta/87/02_per_connection_state.sql @@ -53,7 +53,7 @@ CREATE TABLE sliding_sync_connection_required_state( required_state TEXT NOT NULL -- We store this as a json list of event type / state key tuples. ); -CREATE INDEX sliding_sync_connection_required_state_conn_pos ON sliding_sync_connections(connection_key); +CREATE INDEX sliding_sync_connection_required_state_conn_pos ON sliding_sync_connection_required_state(connection_key); -- Stores the room configs we have seen for rooms in a connection. diff --git a/tests/rest/client/sliding_sync/test_rooms_required_state.py b/tests/rest/client/sliding_sync/test_rooms_required_state.py index 2a5b097eada..498c921cbdd 100644 --- a/tests/rest/client/sliding_sync/test_rooms_required_state.py +++ b/tests/rest/client/sliding_sync/test_rooms_required_state.py @@ -196,7 +196,7 @@ def test_rooms_incremental_sync_restart(self) -> None: self.store.db_pool.simple_delete( table="sliding_sync_connections", keyvalues={"user_id": user1_id}, - desc="clear_cache", + desc="clear_sliding_sync_connections_cache", ) ) From 948456b1594ad0d27d4db1d5b6e59a135678f2fd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Aug 2024 20:07:07 +0100 Subject: [PATCH 12/23] Add comment about why we ignore zero position --- synapse/handlers/sliding_sync/store.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sliding_sync/store.py b/synapse/handlers/sliding_sync/store.py index cdafae27dba..42001775cc0 100644 --- a/synapse/handlers/sliding_sync/store.py +++ b/synapse/handlers/sliding_sync/store.py @@ -107,8 +107,12 @@ async def record_new_state( else: return 0 - if from_token is not None and from_token.connection_position == 0: - from_token = None + # A from token with a zero connection position means there was no + # previously stored connection state, so we treat a zero the same as + # there being no previous position. + previous_connection_position = None + if from_token is not None and from_token.connection_position != 0: + previous_connection_position = from_token.connection_position conn_id = sync_config.conn_id or "" @@ -119,6 +123,6 @@ async def record_new_state( sync_config.user.to_string(), device_id, conn_id, - from_token.connection_position if from_token else None, + previous_connection_position, new_connection_state, ) From 68a2a98b5b3f09aa69577855aff4761af0257261 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Aug 2024 20:08:50 +0100 Subject: [PATCH 13/23] Don't bother to use zip --- synapse/storage/database.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 132fcf3db3f..f93da00f2d6 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1108,16 +1108,14 @@ def simple_insert_returning_txn( """ if txn.database_engine.supports_returning: - keys, vals = zip(*values.items()) - sql = "INSERT INTO %s (%s) VALUES(%s) RETURNING %s" % ( table, - ", ".join(k for k in keys), - ", ".join("?" for _ in keys), + ", ".join(k for k in values.keys()), + ", ".join("?" for _ in values.keys()), ", ".join(k for k in returning), ) - txn.execute(sql, vals) + txn.execute(sql, values.values()) row = txn.fetchone() assert row is not None return row From 8ed1c074dd00cf2e7bafe618a5895127402cf0c1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Aug 2024 20:11:57 +0100 Subject: [PATCH 14/23] Add timestamp index --- synapse/storage/schema/main/delta/87/02_per_connection_state.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/schema/main/delta/87/02_per_connection_state.sql b/synapse/storage/schema/main/delta/87/02_per_connection_state.sql index 08f6d362910..17b61b4ae50 100644 --- a/synapse/storage/schema/main/delta/87/02_per_connection_state.sql +++ b/synapse/storage/schema/main/delta/87/02_per_connection_state.sql @@ -28,6 +28,7 @@ CREATE TABLE sliding_sync_connections( ); CREATE INDEX sliding_sync_connections_idx ON sliding_sync_connections(user_id, device_id, conn_id); +CREATE INDEX sliding_sync_connections_ts_idx ON sliding_sync_connections(created_ts); -- We track per-connection state by associating changes to the state with -- connection positions. This ensures that we correctly track state even if we From bae50d31d62165a839b8d5324fa6d810e304ee58 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Aug 2024 20:12:41 +0100 Subject: [PATCH 15/23] Rename column --- synapse/storage/databases/main/sliding_sync.py | 12 ++++++------ .../schema/main/delta/87/02_per_connection_state.sql | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index e1ea85b6e56..db7f73cb480 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -189,8 +189,8 @@ def persist_per_connection_state_txn( if previous_connection_position is not None: sql = """ INSERT INTO sliding_sync_connection_streams - (connection_position, stream, room_id, room_status, last_position) - SELECT ?, stream, room_id, room_status, last_position + (connection_position, stream, room_id, room_status, last_token) + SELECT ?, stream, room_id, room_status, last_token FROM sliding_sync_connection_streams WHERE connection_position = ? """ @@ -231,7 +231,7 @@ def persist_per_connection_state_txn( key_values=key_values, value_names=( "room_status", - "last_position", + "last_token", ), value_values=value_values, ) @@ -364,12 +364,12 @@ def _get_per_connection_state_txn( "stream", "room_id", "room_status", - "last_position", + "last_token", ), ) - for stream, room_id, room_status, last_position in receipt_rows: + for stream, room_id, room_status, last_token in receipt_rows: have_sent_room: HaveSentRoom[str] = HaveSentRoom( - status=HaveSentRoomFlag(room_status), last_token=last_position + status=HaveSentRoomFlag(room_status), last_token=last_token ) if stream == "rooms": rooms[room_id] = have_sent_room diff --git a/synapse/storage/schema/main/delta/87/02_per_connection_state.sql b/synapse/storage/schema/main/delta/87/02_per_connection_state.sql index 17b61b4ae50..1fadbafb3c2 100644 --- a/synapse/storage/schema/main/delta/87/02_per_connection_state.sql +++ b/synapse/storage/schema/main/delta/87/02_per_connection_state.sql @@ -73,7 +73,7 @@ CREATE TABLE sliding_sync_connection_streams( stream TEXT NOT NULL, -- e.g. "events" or "receipts" room_id TEXT NOT NULL, room_status TEXT NOT NULL, -- "live" or "previously", i.e. the `HaveSentRoomFlag` value - last_position TEXT -- For "previously" the token for the stream we have sent up to. + last_token TEXT -- For "previously" the token for the stream we have sent up to. ); CREATE UNIQUE INDEX sliding_sync_connection_streams_idx ON sliding_sync_connection_streams(connection_position, room_id, stream); From 1e5a3a7fb7f1aea26e094b81206559c1f503c498 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Aug 2024 20:15:38 +0100 Subject: [PATCH 16/23] Fix errors --- synapse/storage/database.py | 2 +- synapse/storage/databases/main/sliding_sync.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index f93da00f2d6..17ff02f8474 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1115,7 +1115,7 @@ def simple_insert_returning_txn( ", ".join(k for k in returning), ) - txn.execute(sql, values.values()) + txn.execute(sql, list(values.values())) row = txn.fetchone() assert row is not None return row diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index db7f73cb480..b72745d20a2 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -375,7 +375,7 @@ def _get_per_connection_state_txn( rooms[room_id] = have_sent_room elif stream == "receipts": receipts[room_id] = have_sent_room - else + else: raise AssertionError(...) return PerConnectionStateDB( From 0e07f657fe7953f2f82a9d52a08b21c33b5e741c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Aug 2024 11:55:00 +0100 Subject: [PATCH 17/23] Rename get_and_clear_connection_positions --- synapse/handlers/sliding_sync/__init__.py | 2 +- synapse/handlers/sliding_sync/store.py | 4 ++-- synapse/storage/databases/main/sliding_sync.py | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 17a2f511bdb..dd5680dc80d 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -328,7 +328,7 @@ async def current_sync_for_user( # amount of time (more with round-trips and re-processing) in the end to # get everything again. previous_connection_state = ( - await self.connection_store.get_per_connection_state( + await self.connection_store.get_and_clear_connection_positions( sync_config, from_token ) ) diff --git a/synapse/handlers/sliding_sync/store.py b/synapse/handlers/sliding_sync/store.py index 42001775cc0..d24fccf76f6 100644 --- a/synapse/handlers/sliding_sync/store.py +++ b/synapse/handlers/sliding_sync/store.py @@ -63,7 +63,7 @@ class SlidingSyncConnectionStore: store: "DataStore" - async def get_per_connection_state( + async def get_and_clear_connection_positions( self, sync_config: SlidingSyncConfig, from_token: Optional[SlidingSyncStreamToken], @@ -82,7 +82,7 @@ async def get_per_connection_state( device_id = sync_config.requester.device_id assert device_id is not None - return await self.store.get_per_connection_state( + return await self.store.get_and_clear_connection_positions( sync_config.user.to_string(), device_id, conn_id, diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index b72745d20a2..b2052e05771 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -261,14 +261,14 @@ def persist_per_connection_state_txn( return connection_position @cached(iterable=True, max_entries=100000) - async def get_per_connection_state( + async def get_and_clear_connection_positions( self, user_id: str, device_id: str, conn_id: str, connection_position: int ) -> "PerConnectionState": """Get the per-connection state for the given connection position.""" per_connection_state_db = await self.db_pool.runInteraction( - "get_per_connection_state", - self._get_per_connection_state_txn, + "get_and_clear_connection_positions", + self._get_and_clear_connection_positions_txn, user_id=user_id, device_id=device_id, conn_id=conn_id, @@ -277,7 +277,7 @@ async def get_per_connection_state( store = cast("DataStore", self) return await per_connection_state_db.to_state(store) - def _get_per_connection_state_txn( + def _get_and_clear_connection_positions_txn( self, txn: LoggingTransaction, user_id: str, From 4a68975459e46b70f87fb898021640567d3a8552 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Aug 2024 11:55:59 +0100 Subject: [PATCH 18/23] Don't assert unkown streams, log --- synapse/storage/databases/main/sliding_sync.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index b2052e05771..560429ee2db 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -13,6 +13,7 @@ # +import logging from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Set, cast import attr @@ -36,6 +37,8 @@ if TYPE_CHECKING: from synapse.storage.databases.main import DataStore +logger = logging.getLogger(__name__) + class SlidingSyncStore(SQLBaseStore): async def persist_per_connection_state( @@ -376,7 +379,9 @@ def _get_and_clear_connection_positions_txn( elif stream == "receipts": receipts[room_id] = have_sent_room else: - raise AssertionError(...) + # For forwards compatibility we ignore unknown streams, as in + # future we want to be able to easily add more stream types. + logger.warning("Unrecognized sliding sync stream in DB %r", stream) return PerConnectionStateDB( rooms=RoomStatusMap(rooms), From ac14e5745cd0b30962db902e96dfd4e3add417ce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Aug 2024 12:00:25 +0100 Subject: [PATCH 19/23] Index on created ts --- synapse/storage/schema/main/delta/87/02_per_connection_state.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/schema/main/delta/87/02_per_connection_state.sql b/synapse/storage/schema/main/delta/87/02_per_connection_state.sql index 1fadbafb3c2..3bf682e6fa4 100644 --- a/synapse/storage/schema/main/delta/87/02_per_connection_state.sql +++ b/synapse/storage/schema/main/delta/87/02_per_connection_state.sql @@ -44,6 +44,7 @@ CREATE TABLE sliding_sync_connection_positions( ); CREATE INDEX sliding_sync_connection_positions_key ON sliding_sync_connection_positions(connection_key); +CREATE INDEX sliding_sync_connection_positions_ts_idx ON sliding_sync_connection_positions(created_ts); -- To save space we deduplicate the `required_state` json by assigning IDs to From 5fe6466d60196ffff175c197e60e4332a13500af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Aug 2024 12:05:35 +0100 Subject: [PATCH 20/23] Update synapse/types/handlers/sliding_sync.py Co-authored-by: Eric Eastwood --- synapse/types/handlers/sliding_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/types/handlers/sliding_sync.py b/synapse/types/handlers/sliding_sync.py index a79dcb815ee..5a94ca5f95c 100644 --- a/synapse/types/handlers/sliding_sync.py +++ b/synapse/types/handlers/sliding_sync.py @@ -523,7 +523,7 @@ def combine_room_sync_config( } # Take the highest timeline limit - if self.timeline_limit < other_room_sync_config.timeline_limit: + if timeline_limit < other_room_sync_config.timeline_limit: timeline_limit = other_room_sync_config.timeline_limit # Union the required state From 9065382c218ae535c332f39f32c09fe1228f4927 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 28 Aug 2024 17:43:43 +0100 Subject: [PATCH 21/23] Add cast comment --- synapse/storage/databases/main/sliding_sync.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index 560429ee2db..e1cdf0a7867 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -56,7 +56,11 @@ async def persist_per_connection_state( The connection position of the newly persisted state. """ + # This cast is safe because the downstream code only cares about + # `store.get_id_for_instance(...)` and `StreamWorkerStore` is mixed + # alongside `SlidingSyncStore` wherever we create a store. store = cast("DataStore", self) + return await self.db_pool.runInteraction( "persist_per_connection_state", self.persist_per_connection_state_txn, @@ -277,7 +281,12 @@ async def get_and_clear_connection_positions( conn_id=conn_id, connection_position=connection_position, ) + + # This cast is safe because the downstream code only cares about + # `store.get_id_for_instance(...)` and `StreamWorkerStore` is mixed + # alongside `SlidingSyncStore` wherever we create a store. store = cast("DataStore", self) + return await per_connection_state_db.to_state(store) def _get_and_clear_connection_positions_txn( From 52f2199cf2877bc13acb5a72fa3c566924afb576 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 28 Aug 2024 18:09:51 +0100 Subject: [PATCH 22/23] Rename to 'effective_device_id' --- synapse/storage/databases/main/sliding_sync.py | 8 ++++---- .../schema/main/delta/87/02_per_connection_state.sql | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index e1cdf0a7867..dc747d7ac0a 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -93,7 +93,7 @@ def persist_per_connection_state_txn( INNER JOIN sliding_sync_connections USING (connection_key) WHERE connection_position = ? - AND user_id = ? AND device_id = ? AND conn_id = ? + AND user_id = ? AND effective_device_id = ? AND conn_id = ? """ txn.execute( sql, (previous_connection_position, user_id, device_id, conn_id) @@ -114,7 +114,7 @@ def persist_per_connection_state_txn( table="sliding_sync_connections", keyvalues={ "user_id": user_id, - "device_id": device_id, + "effective_device_id": device_id, "conn_id": conn_id, }, ) @@ -124,7 +124,7 @@ def persist_per_connection_state_txn( table="sliding_sync_connections", values={ "user_id": user_id, - "device_id": device_id, + "effective_device_id": device_id, "conn_id": conn_id, "created_ts": self._clock.time_msec(), }, @@ -305,7 +305,7 @@ def _get_and_clear_connection_positions_txn( INNER JOIN sliding_sync_connections USING (connection_key) WHERE connection_position = ? - AND user_id = ? AND device_id = ? AND conn_id = ? + AND user_id = ? AND effective_device_id = ? AND conn_id = ? """ txn.execute(sql, (connection_position, user_id, device_id, conn_id)) row = txn.fetchone() diff --git a/synapse/storage/schema/main/delta/87/02_per_connection_state.sql b/synapse/storage/schema/main/delta/87/02_per_connection_state.sql index 3bf682e6fa4..59bc14a2c91 100644 --- a/synapse/storage/schema/main/delta/87/02_per_connection_state.sql +++ b/synapse/storage/schema/main/delta/87/02_per_connection_state.sql @@ -22,12 +22,13 @@ CREATE TABLE sliding_sync_connections( connection_key $%AUTO_INCREMENT_PRIMARY_KEY%$, user_id TEXT NOT NULL, - device_id TEXT NOT NULL, + -- Generally the device ID, but may be something else for e.g. puppeted accounts. + effective_device_id TEXT NOT NULL, conn_id TEXT NOT NULL, created_ts BIGINT NOT NULL ); -CREATE INDEX sliding_sync_connections_idx ON sliding_sync_connections(user_id, device_id, conn_id); +CREATE INDEX sliding_sync_connections_idx ON sliding_sync_connections(user_id, effective_device_id, conn_id); CREATE INDEX sliding_sync_connections_ts_idx ON sliding_sync_connections(created_ts); -- We track per-connection state by associating changes to the state with From 2da80fe9644617ce6f0c5f59b65a10ffa5eff36a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Aug 2024 16:19:53 +0100 Subject: [PATCH 23/23] Add commente on new tables in in schema notes --- synapse/storage/schema/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 316541d8180..d8afa6da02d 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -146,6 +146,9 @@ Changes in SCHEMA_VERSION = 87 - Add tables to store Sliding Sync data for quick filtering/sorting (`sliding_sync_joined_rooms`, `sliding_sync_membership_snapshots`) + - Add tables for storing the per-connection state for sliding sync requests: + sliding_sync_connections, sliding_sync_connection_positions, sliding_sync_connection_required_state, + sliding_sync_connection_room_configs, sliding_sync_connection_streams """