Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Faster remote room joins: stream the un-partial-stating of rooms over…
Browse files Browse the repository at this point in the history
… replication. [rei:frrj/streams/unpsr] (#14473)
  • Loading branch information
reivilibre authored Dec 5, 2022
1 parent e1779bc commit 501f62d
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 67 deletions.
1 change: 1 addition & 0 deletions changelog.d/14473.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Faster remote room joins: stream the un-partial-stating of rooms over replication.
2 changes: 1 addition & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ async def incoming_device_list_update(
# Check if we are partially joining any rooms. If so we need to store
# all device list updates so that we can handle them correctly once we
# know who is in the room.
# TODO(faster joins): this fetches and processes a bunch of data that we don't
# TODO(faster_joins): this fetches and processes a bunch of data that we don't
# use. Could be replaced by a tighter query e.g.
# SELECT EXISTS(SELECT 1 FROM partial_state_rooms)
partial_rooms = await self.store.get_partial_state_room_resync_info()
Expand Down
4 changes: 4 additions & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def __init__(self, hs: "HomeServer"):
self._federation_event_handler = hs.get_federation_event_handler()
self._device_handler = hs.get_device_handler()
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
self._notifier = hs.get_notifier()

self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
Expand Down Expand Up @@ -1692,6 +1693,9 @@ async def _sync_partial_state_room(
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)
# Poke the notifier so that other workers see the write to
# the un-partial-stated rooms stream.
self._notifier.notify_replication()

# TODO(faster_joins) update room stats and user directory?
# https://github.com/matrix-org/synapse/issues/12814
Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/tcp/streams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
)
from synapse.replication.tcp.streams.events import EventsStream
from synapse.replication.tcp.streams.federation import FederationStream
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream

STREAMS_MAP = {
stream.NAME: stream
Expand All @@ -61,6 +62,7 @@
TagAccountDataStream,
AccountDataStream,
UserSignatureStream,
UnPartialStatedRoomStream,
)
}

Expand All @@ -80,4 +82,5 @@
"TagAccountDataStream",
"AccountDataStream",
"UserSignatureStream",
"UnPartialStatedRoomStream",
]
48 changes: 48 additions & 0 deletions synapse/replication/tcp/streams/partial_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING

import attr

from synapse.replication.tcp.streams import Stream
from synapse.replication.tcp.streams._base import current_token_without_instance

if TYPE_CHECKING:
from synapse.server import HomeServer


@attr.s(slots=True, frozen=True, auto_attribs=True)
class UnPartialStatedRoomStreamRow:
# ID of the room that has been un-partial-stated.
room_id: str


class UnPartialStatedRoomStream(Stream):
"""
Stream to notify about rooms becoming un-partial-stated;
that is, when the background sync finishes such that we now have full state for
the room.
"""

NAME = "un_partial_stated_room"
ROW_TYPE = UnPartialStatedRoomStreamRow

def __init__(self, hs: "HomeServer"):
store = hs.get_datastores().main
super().__init__(
hs.get_instance_name(),
# TODO(faster_joins, multiple writers): we need to account for instance names
current_token_without_instance(store.get_un_partial_stated_rooms_token),
store.get_un_partial_stated_rooms_from_stream,
)
237 changes: 171 additions & 66 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2019, 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,8 +50,14 @@
LoggingTransaction,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import IdGenerator
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
IdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
Expand Down Expand Up @@ -114,6 +120,26 @@ def __init__(

self.config: HomeServerConfig = hs.config

self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator

if isinstance(database.engine, PostgresEngine):
self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
stream_name="un_partial_stated_room_stream",
instance_name=self._instance_name,
tables=[
("un_partial_stated_room_stream", "instance_name", "stream_id")
],
sequence_name="un_partial_stated_room_stream_sequence",
# TODO(faster_joins, multiple writers) Support multiple writers.
writers=["master"],
)
else:
self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator(
db_conn, "un_partial_stated_room_stream", "stream_id"
)

async def store_room(
self,
room_id: str,
Expand Down Expand Up @@ -1216,70 +1242,6 @@ async def get_partial_state_room_resync_info(

return room_servers

async def clear_partial_state_room(self, room_id: str) -> bool:
"""Clears the partial state flag for a room.
Args:
room_id: The room whose partial state flag is to be cleared.
Returns:
`True` if the partial state flag has been cleared successfully.
`False` if the partial state flag could not be cleared because the room
still contains events with partial state.
"""
try:
await self.db_pool.runInteraction(
"clear_partial_state_room", self._clear_partial_state_room_txn, room_id
)
return True
except self.db_pool.engine.module.IntegrityError as e:
# Assume that any `IntegrityError`s are due to partial state events.
logger.info(
"Exception while clearing lazy partial-state-room %s, retrying: %s",
room_id,
e,
)
return False

def _clear_partial_state_room_txn(
self, txn: LoggingTransaction, room_id: str
) -> None:
DatabasePool.simple_delete_txn(
txn,
table="partial_state_rooms_servers",
keyvalues={"room_id": room_id},
)
DatabasePool.simple_delete_one_txn(
txn,
table="partial_state_rooms",
keyvalues={"room_id": room_id},
)
self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
self._invalidate_cache_and_stream(
txn, self.get_partial_state_servers_at_join, (room_id,)
)

# We now delete anything from `device_lists_remote_pending` with a
# stream ID less than the minimum
# `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
txn,
table="partial_state_rooms",
keyvalues={},
retcol="MIN(device_lists_stream_id)",
allow_none=True,
)
if device_lists_stream_id is None:
# There are no rooms being currently partially joined, so we delete everything.
txn.execute("DELETE FROM device_lists_remote_pending")
else:
sql = """
DELETE FROM device_lists_remote_pending
WHERE stream_id <= ?
"""
txn.execute(sql, (device_lists_stream_id,))

@cached()
async def is_partial_state_room(self, room_id: str) -> bool:
"""Checks if this room has partial state.
Expand Down Expand Up @@ -1315,6 +1277,66 @@ async def get_join_event_id_and_device_lists_stream_id_for_partial_state(
)
return result["join_event_id"], result["device_lists_stream_id"]

def get_un_partial_stated_rooms_token(self) -> int:
# TODO(faster_joins, multiple writers): This is inappropriate if there
# are multiple writers because workers that don't write often will
# hold all readers up.
# (See `MultiWriterIdGenerator.get_persisted_upto_position` for an
# explanation.)
return self._un_partial_stated_rooms_stream_id_gen.get_current_token()

async def get_un_partial_stated_rooms_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
"""Get updates for caches replication stream.
Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.
Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.
The token returned can be used in a subsequent call to this
function to get further updatees.
The updates are a list of 2-tuples of stream ID and the row data
"""

if last_id == current_id:
return [], current_id, False

def get_un_partial_stated_rooms_from_stream_txn(
txn: LoggingTransaction,
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
sql = """
SELECT stream_id, room_id
FROM un_partial_stated_room_stream
WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
ORDER BY stream_id ASC
LIMIT ?
"""
txn.execute(sql, (last_id, current_id, instance_name, limit))
updates = [(row[0], (row[1],)) for row in txn]
limited = False
upto_token = current_id
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True

return updates, upto_token, limited

return await self.db_pool.runInteraction(
"get_un_partial_stated_rooms_from_stream",
get_un_partial_stated_rooms_from_stream_txn,
)


class _BackgroundUpdates:
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
Expand Down Expand Up @@ -1806,6 +1828,8 @@ def __init__(

self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")

self._instance_name = hs.get_instance_name()

async def upsert_room_on_join(
self, room_id: str, room_version: RoomVersion, state_events: List[EventBase]
) -> None:
Expand Down Expand Up @@ -2270,3 +2294,84 @@ async def unblock_room(self, room_id: str) -> None:
self.is_room_blocked,
(room_id,),
)

async def clear_partial_state_room(self, room_id: str) -> bool:
"""Clears the partial state flag for a room.
Args:
room_id: The room whose partial state flag is to be cleared.
Returns:
`True` if the partial state flag has been cleared successfully.
`False` if the partial state flag could not be cleared because the room
still contains events with partial state.
"""
try:
async with self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id:
await self.db_pool.runInteraction(
"clear_partial_state_room",
self._clear_partial_state_room_txn,
room_id,
un_partial_state_room_stream_id,
)
return True
except self.db_pool.engine.module.IntegrityError as e:
# Assume that any `IntegrityError`s are due to partial state events.
logger.info(
"Exception while clearing lazy partial-state-room %s, retrying: %s",
room_id,
e,
)
return False

def _clear_partial_state_room_txn(
self,
txn: LoggingTransaction,
room_id: str,
un_partial_state_room_stream_id: int,
) -> None:
DatabasePool.simple_delete_txn(
txn,
table="partial_state_rooms_servers",
keyvalues={"room_id": room_id},
)
DatabasePool.simple_delete_one_txn(
txn,
table="partial_state_rooms",
keyvalues={"room_id": room_id},
)
self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
self._invalidate_cache_and_stream(
txn, self.get_partial_state_servers_at_join, (room_id,)
)

DatabasePool.simple_insert_txn(
txn,
"un_partial_stated_room_stream",
{
"stream_id": un_partial_state_room_stream_id,
"instance_name": self._instance_name,
"room_id": room_id,
},
)

# We now delete anything from `device_lists_remote_pending` with a
# stream ID less than the minimum
# `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
txn,
table="partial_state_rooms",
keyvalues={},
retcol="MIN(device_lists_stream_id)",
allow_none=True,
)
if device_lists_stream_id is None:
# There are no rooms being currently partially joined, so we delete everything.
txn.execute("DELETE FROM device_lists_remote_pending")
else:
sql = """
DELETE FROM device_lists_remote_pending
WHERE stream_id <= ?
"""
txn.execute(sql, (device_lists_stream_id,))
Loading

0 comments on commit 501f62d

Please sign in to comment.