Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Persist pagination sessions to the database.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Aug 16, 2021
1 parent 0ace38b commit 085aec6
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 32 deletions.
1 change: 1 addition & 0 deletions changelog.d/10613.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add pagination to the spaces summary based on updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946).
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ files =
synapse/storage/databases/main/keys.py,
synapse/storage/databases/main/pusher.py,
synapse/storage/databases/main/registration.py,
synapse/storage/databases/main/room_summary.py,
synapse/storage/databases/main/stream.py,
synapse/storage/databases/main/ui_auth.py,
synapse/storage/database.py,
Expand Down
2 changes: 2 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.databases.main.presence import PresenceStore
from synapse.storage.databases.main.room_summary import RoomSummaryStore
from synapse.storage.databases.main.search import SearchStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
Expand Down Expand Up @@ -250,6 +251,7 @@ class GenericWorkerSlavedStore(
SearchStore,
TransactionWorkerStore,
LockStore,
RoomSummaryStore,
BaseSlavedStore,
):
pass
Expand Down
54 changes: 22 additions & 32 deletions synapse/handlers/room_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@
Membership,
RoomTypes,
)
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
from synapse.events import EventBase
from synapse.events.utils import format_event_for_client_v2
from synapse.types import JsonDict
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -87,12 +86,6 @@ def __init__(self, hs: "HomeServer"):
self._server_name = hs.hostname
self._federation_client = hs.get_federation_client()

# A map of query information to the current pagination state.
#
# TODO Allow for multiple workers to share this data.
# TODO Expire pagination tokens.
self._pagination_sessions: Dict[_PaginationKey, _PaginationSession] = {}

# If a user tries to fetch the same page multiple times in quick succession,
# only process the first attempt and return its result to subsequent requests.
self._pagination_response_cache: ResponseCache[
Expand All @@ -102,20 +95,12 @@ def __init__(self, hs: "HomeServer"):
"get_room_hierarchy",
)

def _expire_pagination_sessions(self):
async def _expire_pagination_sessions(self):
"""Expire pagination session which are old."""
expire_before = (
self._clock.time_msec() - self._PAGINATION_SESSION_VALIDITY_PERIOD_MS
)
to_expire = []

for key, value in self._pagination_sessions.items():
if value.creation_time_ms < expire_before:
to_expire.append(key)

for key in to_expire:
logger.debug("Expiring pagination session id %s", key)
del self._pagination_sessions[key]
await self._store.delete_old_room_hierarchy_pagination_sessions(expire_before)

async def get_space_summary(
self,
Expand Down Expand Up @@ -327,17 +312,21 @@ async def _get_room_hierarchy(

# If this is continuing a previous session, pull the persisted data.
if from_token:
self._expire_pagination_sessions()
await self._expire_pagination_sessions()

pagination_key = _PaginationKey(
requested_room_id, suggested_only, max_depth, from_token
)
if pagination_key not in self._pagination_sessions:
try:
pagination_session = (
await self._store.get_room_hierarchy_pagination_session(
requested_room_id, suggested_only, max_depth, from_token
)
)
except StoreError:
raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM)

# Load the previous state.
pagination_session = self._pagination_sessions[pagination_key]
room_queue = pagination_session.room_queue
room_queue = [
_RoomQueueEntry(*fields) for fields in pagination_session.room_queue
]
processed_rooms = pagination_session.processed_rooms
else:
# The queue of rooms to process, the next room is last on the stack.
Expand Down Expand Up @@ -456,13 +445,14 @@ async def _get_room_hierarchy(

# If there's additional data, generate a pagination token (and persist state).
if room_queue:
next_batch = random_string(24)
result["next_batch"] = next_batch
pagination_key = _PaginationKey(
requested_room_id, suggested_only, max_depth, next_batch
)
self._pagination_sessions[pagination_key] = _PaginationSession(
self._clock.time_msec(), room_queue, processed_rooms
result[
"next_batch"
] = await self._store.create_room_hierarchy_pagination_session(
requested_room_id,
suggested_only,
max_depth,
[attr.astuple(room_entry) for room_entry in room_queue], # type: ignore[misc]
processed_rooms,
)

return result
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from .rejections import RejectionsStore
from .relations import RelationsStore
from .room import RoomStore
from .room_summary import RoomSummaryStore
from .roommember import RoomMemberStore
from .search import SearchStore
from .signatures import SignatureStore
Expand Down Expand Up @@ -121,6 +122,7 @@ class DataStore(
ServerMetricsStore,
EventForwardExtremitiesStore,
LockStore,
RoomSummaryStore,
):
def __init__(self, database: DatabasePool, db_conn, hs):
self.hs = hs
Expand Down
185 changes: 185 additions & 0 deletions synapse/storage/databases/main/room_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# -*- coding: utf-8 -*-
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from typing import List, Optional, Sequence, Set, Tuple

import attr

import synapse.util.stringutils as stringutils
from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import LoggingTransaction

logger = logging.getLogger(__name__)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class _PaginationSession:
"""The information that is stored for pagination."""

# The queue of rooms which are still to process as packed _RoomQueueEntry tuples.
room_queue: List[Tuple[str, Sequence[str], int]]
# A set of rooms which have been processed.
processed_rooms: Set[str]


class RoomSummaryStore(SQLBaseStore):
"""
Manage user interactive authentication sessions.
"""

async def create_room_hierarchy_pagination_session(
self,
room_id: str,
suggested_only: bool,
max_depth: Optional[int],
room_queue: List[Tuple[str, Sequence[str], int]],
processed_rooms: Set[str],
) -> str:
"""
Creates a new pagination session for the room hierarchy endpoint.
Args:
room_id: The room ID the pagination session is for.
suggested_only: Whether we should only return children with the
"suggested" flag set.
max_depth: The maximum depth in the tree to explore, must be a
non-negative integer.
room_queue:
The queue of rooms which are still to process.
processed_rooms:
A set of rooms which have been processed.
Returns:
The newly created session ID.
Raises:
StoreError if a unique session ID cannot be generated.
"""
pagination_state = json.dumps(
{
"room_queue": room_queue,
"processed_rooms": list(processed_rooms),
}
)

# autogen a session ID and try to create it. We may clash, so just
# try a few times till one goes through, giving up eventually.
attempts = 0
while attempts < 5:
session_id = stringutils.random_string(24)

try:
await self.db_pool.simple_insert(
table="room_hierarchy_pagination_sessions",
values={
"session_id": session_id,
"room_id": room_id,
"suggested_only": suggested_only,
"max_depth": max_depth,
"pagination_state": pagination_state,
"creation_time": self.hs.get_clock().time_msec(),
},
desc="create_room_hierarchy_pagination_session",
)
logger.debug(
"Persisted room hierarchy pagination session: %s for room %s (suggested: %s, max_depth: %s)",
session_id,
room_id,
suggested_only,
max_depth,
)

return session_id
except self.db_pool.engine.module.IntegrityError:
attempts += 1
raise StoreError(500, "Couldn't generate a session ID.")

async def get_room_hierarchy_pagination_session(
self,
room_id: str,
suggested_only: bool,
max_depth: Optional[int],
session_id: str,
) -> _PaginationSession:
"""
Retrieve data stored with set_session_data
Args:
room_id: The room ID the pagination session is for.
suggested_only: Whether we should only return children with the
"suggested" flag set.
max_depth: The maximum depth in the tree to explore, must be a
non-negative integer.
session_id: The pagination session ID.
Raises:
StoreError if the session cannot be found.
"""
logger.debug(
"Fetch room hierarchy pagination session: %s for room %s (suggested: %s, max_depth: %s)",
session_id,
room_id,
suggested_only,
max_depth,
)
result = await self.db_pool.simple_select_one(
table="room_hierarchy_pagination_sessions",
keyvalues={
"session_id": session_id,
"room_id": room_id,
"suggested_only": suggested_only,
},
retcols=(
"max_depth",
"pagination_state",
),
desc="get_room_hierarchy_pagination_sessions",
)
# Check the value of max_depth separately since null != null.
if result["max_depth"] != max_depth:
raise StoreError(404, "No row found (room_hierarchy_pagination_sessions)")

pagination_state = db_to_json(result["pagination_state"])

return _PaginationSession(
room_queue=pagination_state["room_queue"],
processed_rooms=set(pagination_state["processed_rooms"]),
)

async def delete_old_room_hierarchy_pagination_sessions(
self, expiration_time: int
) -> None:
"""
Remove sessions which were last used earlier than the expiration time.
Args:
expiration_time: The latest time that is still considered valid.
This is an epoch time in milliseconds.
"""
await self.db_pool.runInteraction(
"delete_old_room_hierarchy_pagination_sessions",
self._delete_old_room_hierarchy_pagination_sessions_txn,
expiration_time,
)

def _delete_old_room_hierarchy_pagination_sessions_txn(
self, txn: LoggingTransaction, expiration_time: int
):
# Get the expired sessions.
sql = "DELETE FROM room_hierarchy_pagination_sessions WHERE creation_time <= ?"
txn.execute(sql, [expiration_time])
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE TABLE IF NOT EXISTS room_hierarchy_pagination_sessions(
session_id TEXT NOT NULL, -- The session ID passed to the client.
creation_time BIGINT NOT NULL, -- The time this session was created (epoch time in milliseconds).
room_id TEXT NOT NULL, -- The room ID of the pagination session.
suggested_only BOOLEAN NOT NULL, -- Whether to only include suggested rooms/spaces.
max_depth int, -- The maximum depth to fetch.
pagination_state TEXT NOT NULL, -- A JSON dictionary of persisted state.
UNIQUE (session_id)
);

0 comments on commit 085aec6

Please sign in to comment.