diff --git a/changelog.d/10613.feature b/changelog.d/10613.feature new file mode 100644 index 000000000000..ffc4e4289cfa --- /dev/null +++ b/changelog.d/10613.feature @@ -0,0 +1 @@ +Add pagination to the spaces summary based on updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946). diff --git a/mypy.ini b/mypy.ini index e1b9405daa85..b4a33e66de3e 100644 --- a/mypy.ini +++ b/mypy.ini @@ -54,6 +54,7 @@ files = synapse/storage/databases/main/keys.py, synapse/storage/databases/main/pusher.py, synapse/storage/databases/main/registration.py, + synapse/storage/databases/main/room_summary.py, synapse/storage/databases/main/stream.py, synapse/storage/databases/main/ui_auth.py, synapse/storage/database.py, diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 3b7131af8fa2..e76af49ba657 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -114,6 +114,7 @@ MonthlyActiveUsersWorkerStore, ) from synapse.storage.databases.main.presence import PresenceStore +from synapse.storage.databases.main.room_summary import RoomSummaryStore from synapse.storage.databases.main.search import SearchStore from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.transactions import TransactionWorkerStore @@ -250,6 +251,7 @@ class GenericWorkerSlavedStore( SearchStore, TransactionWorkerStore, LockStore, + RoomSummaryStore, BaseSlavedStore, ): pass diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py index ac6cfc0da915..92c7dd038dc1 100644 --- a/synapse/handlers/room_summary.py +++ b/synapse/handlers/room_summary.py @@ -28,12 +28,11 @@ Membership, RoomTypes, ) -from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError +from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError from synapse.events import EventBase from synapse.events.utils import format_event_for_client_v2 from synapse.types import JsonDict from synapse.util.caches.response_cache import ResponseCache -from synapse.util.stringutils import random_string if TYPE_CHECKING: from synapse.server import HomeServer @@ -87,12 +86,6 @@ def __init__(self, hs: "HomeServer"): self._server_name = hs.hostname self._federation_client = hs.get_federation_client() - # A map of query information to the current pagination state. - # - # TODO Allow for multiple workers to share this data. - # TODO Expire pagination tokens. - self._pagination_sessions: Dict[_PaginationKey, _PaginationSession] = {} - # If a user tries to fetch the same page multiple times in quick succession, # only process the first attempt and return its result to subsequent requests. self._pagination_response_cache: ResponseCache[ @@ -102,20 +95,12 @@ def __init__(self, hs: "HomeServer"): "get_room_hierarchy", ) - def _expire_pagination_sessions(self): + async def _expire_pagination_sessions(self): """Expire pagination session which are old.""" expire_before = ( self._clock.time_msec() - self._PAGINATION_SESSION_VALIDITY_PERIOD_MS ) - to_expire = [] - - for key, value in self._pagination_sessions.items(): - if value.creation_time_ms < expire_before: - to_expire.append(key) - - for key in to_expire: - logger.debug("Expiring pagination session id %s", key) - del self._pagination_sessions[key] + await self._store.delete_old_room_hierarchy_pagination_sessions(expire_before) async def get_space_summary( self, @@ -327,17 +312,21 @@ async def _get_room_hierarchy( # If this is continuing a previous session, pull the persisted data. if from_token: - self._expire_pagination_sessions() + await self._expire_pagination_sessions() - pagination_key = _PaginationKey( - requested_room_id, suggested_only, max_depth, from_token - ) - if pagination_key not in self._pagination_sessions: + try: + pagination_session = ( + await self._store.get_room_hierarchy_pagination_session( + requested_room_id, suggested_only, max_depth, from_token + ) + ) + except StoreError: raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM) # Load the previous state. - pagination_session = self._pagination_sessions[pagination_key] - room_queue = pagination_session.room_queue + room_queue = [ + _RoomQueueEntry(*fields) for fields in pagination_session.room_queue + ] processed_rooms = pagination_session.processed_rooms else: # The queue of rooms to process, the next room is last on the stack. @@ -456,13 +445,14 @@ async def _get_room_hierarchy( # If there's additional data, generate a pagination token (and persist state). if room_queue: - next_batch = random_string(24) - result["next_batch"] = next_batch - pagination_key = _PaginationKey( - requested_room_id, suggested_only, max_depth, next_batch - ) - self._pagination_sessions[pagination_key] = _PaginationSession( - self._clock.time_msec(), room_queue, processed_rooms + result[ + "next_batch" + ] = await self._store.create_room_hierarchy_pagination_session( + requested_room_id, + suggested_only, + max_depth, + [attr.astuple(room_entry) for room_entry in room_queue], # type: ignore[misc] + processed_rooms, ) return result diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 8d9f07111db5..65dff98709cf 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -61,6 +61,7 @@ from .rejections import RejectionsStore from .relations import RelationsStore from .room import RoomStore +from .room_summary import RoomSummaryStore from .roommember import RoomMemberStore from .search import SearchStore from .signatures import SignatureStore @@ -121,6 +122,7 @@ class DataStore( ServerMetricsStore, EventForwardExtremitiesStore, LockStore, + RoomSummaryStore, ): def __init__(self, database: DatabasePool, db_conn, hs): self.hs = hs diff --git a/synapse/storage/databases/main/room_summary.py b/synapse/storage/databases/main/room_summary.py new file mode 100644 index 000000000000..02067590aea0 --- /dev/null +++ b/synapse/storage/databases/main/room_summary.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +import logging +from typing import List, Optional, Sequence, Set, Tuple + +import attr + +import synapse.util.stringutils as stringutils +from synapse.api.errors import StoreError +from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage.database import LoggingTransaction + +logger = logging.getLogger(__name__) + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class _PaginationSession: + """The information that is stored for pagination.""" + + # The queue of rooms which are still to process as packed _RoomQueueEntry tuples. + room_queue: List[Tuple[str, Sequence[str], int]] + # A set of rooms which have been processed. + processed_rooms: Set[str] + + +class RoomSummaryStore(SQLBaseStore): + """ + Manage user interactive authentication sessions. + """ + + async def create_room_hierarchy_pagination_session( + self, + room_id: str, + suggested_only: bool, + max_depth: Optional[int], + room_queue: List[Tuple[str, Sequence[str], int]], + processed_rooms: Set[str], + ) -> str: + """ + Creates a new pagination session for the room hierarchy endpoint. + + Args: + room_id: The room ID the pagination session is for. + suggested_only: Whether we should only return children with the + "suggested" flag set. + max_depth: The maximum depth in the tree to explore, must be a + non-negative integer. + room_queue: + The queue of rooms which are still to process. + processed_rooms: + A set of rooms which have been processed. + + Returns: + The newly created session ID. + + Raises: + StoreError if a unique session ID cannot be generated. + """ + pagination_state = json.dumps( + { + "room_queue": room_queue, + "processed_rooms": list(processed_rooms), + } + ) + + # autogen a session ID and try to create it. We may clash, so just + # try a few times till one goes through, giving up eventually. + attempts = 0 + while attempts < 5: + session_id = stringutils.random_string(24) + + try: + await self.db_pool.simple_insert( + table="room_hierarchy_pagination_sessions", + values={ + "session_id": session_id, + "room_id": room_id, + "suggested_only": suggested_only, + "max_depth": max_depth, + "pagination_state": pagination_state, + "creation_time": self.hs.get_clock().time_msec(), + }, + desc="create_room_hierarchy_pagination_session", + ) + logger.debug( + "Persisted room hierarchy pagination session: %s for room %s (suggested: %s, max_depth: %s)", + session_id, + room_id, + suggested_only, + max_depth, + ) + + return session_id + except self.db_pool.engine.module.IntegrityError: + attempts += 1 + raise StoreError(500, "Couldn't generate a session ID.") + + async def get_room_hierarchy_pagination_session( + self, + room_id: str, + suggested_only: bool, + max_depth: Optional[int], + session_id: str, + ) -> _PaginationSession: + """ + Retrieve data stored with set_session_data + + Args: + room_id: The room ID the pagination session is for. + suggested_only: Whether we should only return children with the + "suggested" flag set. + max_depth: The maximum depth in the tree to explore, must be a + non-negative integer. + session_id: The pagination session ID. + + Raises: + StoreError if the session cannot be found. + """ + logger.debug( + "Fetch room hierarchy pagination session: %s for room %s (suggested: %s, max_depth: %s)", + session_id, + room_id, + suggested_only, + max_depth, + ) + result = await self.db_pool.simple_select_one( + table="room_hierarchy_pagination_sessions", + keyvalues={ + "session_id": session_id, + "room_id": room_id, + "suggested_only": suggested_only, + }, + retcols=( + "max_depth", + "pagination_state", + ), + desc="get_room_hierarchy_pagination_sessions", + ) + # Check the value of max_depth separately since null != null. + if result["max_depth"] != max_depth: + raise StoreError(404, "No row found (room_hierarchy_pagination_sessions)") + + pagination_state = db_to_json(result["pagination_state"]) + + return _PaginationSession( + room_queue=pagination_state["room_queue"], + processed_rooms=set(pagination_state["processed_rooms"]), + ) + + async def delete_old_room_hierarchy_pagination_sessions( + self, expiration_time: int + ) -> None: + """ + Remove sessions which were last used earlier than the expiration time. + + Args: + expiration_time: The latest time that is still considered valid. + This is an epoch time in milliseconds. + + """ + await self.db_pool.runInteraction( + "delete_old_room_hierarchy_pagination_sessions", + self._delete_old_room_hierarchy_pagination_sessions_txn, + expiration_time, + ) + + def _delete_old_room_hierarchy_pagination_sessions_txn( + self, txn: LoggingTransaction, expiration_time: int + ): + # Get the expired sessions. + sql = "DELETE FROM room_hierarchy_pagination_sessions WHERE creation_time <= ?" + txn.execute(sql, [expiration_time]) diff --git a/synapse/storage/schema/main/delta/62/02persist_room_hierarchy_pagination_sessions.sql b/synapse/storage/schema/main/delta/62/02persist_room_hierarchy_pagination_sessions.sql new file mode 100644 index 000000000000..74d6c83ea7bd --- /dev/null +++ b/synapse/storage/schema/main/delta/62/02persist_room_hierarchy_pagination_sessions.sql @@ -0,0 +1,25 @@ +/* + * Copyright 2021 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS room_hierarchy_pagination_sessions( + session_id TEXT NOT NULL, -- The session ID passed to the client. + creation_time BIGINT NOT NULL, -- The time this session was created (epoch time in milliseconds). + room_id TEXT NOT NULL, -- The room ID of the pagination session. + suggested_only BOOLEAN NOT NULL, -- Whether to only include suggested rooms/spaces. + max_depth int, -- The maximum depth to fetch. + pagination_state TEXT NOT NULL, -- A JSON dictionary of persisted state. + UNIQUE (session_id) +);