Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Persist room hierarchy pagination sessions to the database. #10613

Merged
merged 4 commits into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10613.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add pagination to the spaces summary based on updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946).
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ files =
synapse/storage/databases/main/keys.py,
synapse/storage/databases/main/pusher.py,
synapse/storage/databases/main/registration.py,
synapse/storage/databases/main/session.py,
synapse/storage/databases/main/stream.py,
synapse/storage/databases/main/ui_auth.py,
synapse/storage/database.py,
Expand Down
2 changes: 2 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
from synapse.storage.databases.main.presence import PresenceStore
from synapse.storage.databases.main.room import RoomWorkerStore
from synapse.storage.databases.main.search import SearchStore
from synapse.storage.databases.main.session import SessionStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
Expand Down Expand Up @@ -253,6 +254,7 @@ class GenericWorkerSlavedStore(
SearchStore,
TransactionWorkerStore,
LockStore,
SessionStore,
BaseSlavedStore,
):
pass
Expand Down
76 changes: 38 additions & 38 deletions synapse/handlers/room_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@
Membership,
RoomTypes,
)
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
from synapse.events import EventBase
from synapse.events.utils import format_event_for_client_v2
from synapse.types import JsonDict
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -76,6 +75,9 @@ class _PaginationSession:


class RoomSummaryHandler:
# A unique key used for pagination sessions for the room hierarchy endpoint.
_PAGINATION_SESSION_TYPE = "room_hierarchy_pagination"

# The time a pagination session remains valid for.
_PAGINATION_SESSION_VALIDITY_PERIOD_MS = 5 * 60 * 1000

Expand All @@ -87,12 +89,6 @@ def __init__(self, hs: "HomeServer"):
self._server_name = hs.hostname
self._federation_client = hs.get_federation_client()

# A map of query information to the current pagination state.
#
# TODO Allow for multiple workers to share this data.
# TODO Expire pagination tokens.
self._pagination_sessions: Dict[_PaginationKey, _PaginationSession] = {}

# If a user tries to fetch the same page multiple times in quick succession,
# only process the first attempt and return its result to subsequent requests.
self._pagination_response_cache: ResponseCache[
Expand All @@ -102,21 +98,6 @@ def __init__(self, hs: "HomeServer"):
"get_room_hierarchy",
)

def _expire_pagination_sessions(self):
"""Expire pagination session which are old."""
expire_before = (
self._clock.time_msec() - self._PAGINATION_SESSION_VALIDITY_PERIOD_MS
)
to_expire = []

for key, value in self._pagination_sessions.items():
if value.creation_time_ms < expire_before:
to_expire.append(key)

for key in to_expire:
logger.debug("Expiring pagination session id %s", key)
del self._pagination_sessions[key]

async def get_space_summary(
self,
requester: str,
Expand Down Expand Up @@ -327,18 +308,29 @@ async def _get_room_hierarchy(

# If this is continuing a previous session, pull the persisted data.
if from_token:
self._expire_pagination_sessions()
try:
pagination_session = await self._store.get_session(
session_type=self._PAGINATION_SESSION_TYPE,
session_id=from_token,
)
except StoreError:
raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM)

pagination_key = _PaginationKey(
requested_room_id, suggested_only, max_depth, from_token
)
if pagination_key not in self._pagination_sessions:
# If the requester, room ID, suggested-only, or max depth were modified
# the session is invalid.
if (
requester != pagination_session["requester"]
or requested_room_id != pagination_session["room_id"]
or suggested_only != pagination_session["suggested_only"]
or max_depth != pagination_session["max_depth"]
):
raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM)

# Load the previous state.
pagination_session = self._pagination_sessions[pagination_key]
room_queue = pagination_session.room_queue
processed_rooms = pagination_session.processed_rooms
room_queue = [
_RoomQueueEntry(*fields) for fields in pagination_session["room_queue"]
]
processed_rooms = set(pagination_session["processed_rooms"])
else:
# The queue of rooms to process, the next room is last on the stack.
room_queue = [_RoomQueueEntry(requested_room_id, ())]
Expand Down Expand Up @@ -456,13 +448,21 @@ async def _get_room_hierarchy(

# If there's additional data, generate a pagination token (and persist state).
if room_queue:
next_batch = random_string(24)
result["next_batch"] = next_batch
pagination_key = _PaginationKey(
requested_room_id, suggested_only, max_depth, next_batch
)
self._pagination_sessions[pagination_key] = _PaginationSession(
self._clock.time_msec(), room_queue, processed_rooms
result["next_batch"] = await self._store.create_session(
session_type=self._PAGINATION_SESSION_TYPE,
value={
# Information which must be identical across pagination.
"requester": requester,
"room_id": requested_room_id,
"suggested_only": suggested_only,
"max_depth": max_depth,
# The stored state.
"room_queue": [
attr.astuple(room_entry) for room_entry in room_queue
],
"processed_rooms": list(processed_rooms),
},
expiry_ms=self._PAGINATION_SESSION_VALIDITY_PERIOD_MS,
)

return result
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from .room import RoomStore
from .roommember import RoomMemberStore
from .search import SearchStore
from .session import SessionStore
from .signatures import SignatureStore
from .state import StateStore
from .stats import StatsStore
Expand Down Expand Up @@ -121,6 +122,7 @@ class DataStore(
ServerMetricsStore,
EventForwardExtremitiesStore,
LockStore,
SessionStore,
):
def __init__(self, database: DatabasePool, db_conn, hs):
self.hs = hs
Expand Down
145 changes: 145 additions & 0 deletions synapse/storage/databases/main/session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# -*- coding: utf-8 -*-
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING

import synapse.util.stringutils as stringutils
from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.types import JsonDict
from synapse.util import json_encoder

if TYPE_CHECKING:
from synapse.server import HomeServer


class SessionStore(SQLBaseStore):
"""
A store for generic session data.

Each type of session should provide a unique type (to separate sessions).

Sessions are automatically removed when they expire.
"""

def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)

# Create a background job for culling expired sessions.
if hs.config.run_background_tasks:
self._clock.looping_call(self._delete_expired_sessions, 30 * 60 * 1000)

async def create_session(
self, session_type: str, value: JsonDict, expiry_ms: int
) -> str:
"""
Creates a new pagination session for the room hierarchy endpoint.

Args:
session_type: The type for this session.
value: The value to store.
expiry_ms: How long before an item is evicted from the cache
in milliseconds. Default is 0, indicating items never get
evicted based on time.

Returns:
The newly created session ID.

Raises:
StoreError if a unique session ID cannot be generated.
"""
# autogen a session ID and try to create it. We may clash, so just
# try a few times till one goes through, giving up eventually.
attempts = 0
while attempts < 5:
session_id = stringutils.random_string(24)

try:
await self.db_pool.simple_insert(
table="sessions",
values={
"session_id": session_id,
"session_type": session_type,
"value": json_encoder.encode(value),
"expiry_time_ms": self.hs.get_clock().time_msec() + expiry_ms,
},
desc="create_session",
)

return session_id
except self.db_pool.engine.module.IntegrityError:
attempts += 1
raise StoreError(500, "Couldn't generate a session ID.")

async def get_session(self, session_type: str, session_id: str) -> JsonDict:
"""
Retrieve data stored with create_session

Args:
session_type: The type for this session.
session_id: The session ID returned from create_session.

Raises:
StoreError if the session cannot be found.
"""

def _get_session(
txn: LoggingTransaction, session_type: str, session_id: str, ts: int
) -> JsonDict:
# This includes the expiry time since items are only periodically
# deleted, not upon expiry.
select_sql = """
SELECT value FROM sessions WHERE
session_type = ? AND session_id = ? AND expiry_time_ms > ?
"""
txn.execute(select_sql, [session_type, session_id, ts])
row = txn.fetchone()

if not row:
raise StoreError(404, "No session")

return db_to_json(row[0])

return await self.db_pool.runInteraction(
"get_session",
_get_session,
session_type,
session_id,
self._clock.time_msec(),
)

@wrap_as_background_process("delete_expired_sessions")
async def _delete_expired_sessions(self) -> None:
"""Remove sessions with expiry dates that have passed."""

def _delete_expired_sessions_txn(txn: LoggingTransaction, ts: int) -> None:
sql = "DELETE FROM sessions WHERE expiry_time_ms <= ?"
txn.execute(sql, (ts,))

await self.db_pool.runInteraction(
"delete_expired_sessions",
_delete_expired_sessions_txn,
self._clock.time_msec(),
)
23 changes: 23 additions & 0 deletions synapse/storage/schema/main/delta/62/02session_store.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE TABLE IF NOT EXISTS sessions(
session_type TEXT NOT NULL, -- The unique key for this type of session.
session_id TEXT NOT NULL, -- The session ID passed to the client.
value TEXT NOT NULL, -- A JSON dictionary to persist.
expiry_time_ms BIGINT NOT NULL, -- The time this session will expire (epoch time in milliseconds).
UNIQUE (session_type, session_id)
);