Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Faster remote room joins: stream the un-partial-stating of events over replication. [rei:frrj/streams/unpsr] #14545

Merged
merged 7 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14545.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Faster remote room joins: stream the un-partial-stating of events over replication.
2 changes: 2 additions & 0 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,8 @@ async def update_state_for_partial_state_event(
self._state_storage_controller.notify_event_un_partial_stated(
event.event_id
)
# Notify that there's a new row in the un_partial_stated_events stream.
self._notifier.notify_replication()

@trace
async def backfill(
Expand Down
7 changes: 6 additions & 1 deletion synapse/replication/tcp/streams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@
)
from synapse.replication.tcp.streams.events import EventsStream
from synapse.replication.tcp.streams.federation import FederationStream
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
from synapse.replication.tcp.streams.partial_state import (
UnPartialStatedEventStream,
UnPartialStatedRoomStream,
)

STREAMS_MAP = {
stream.NAME: stream
Expand All @@ -63,6 +66,7 @@
AccountDataStream,
UserSignatureStream,
UnPartialStatedRoomStream,
UnPartialStatedEventStream,
)
}

Expand All @@ -83,4 +87,5 @@
"AccountDataStream",
"UserSignatureStream",
"UnPartialStatedRoomStream",
"UnPartialStatedEventStream",
]
28 changes: 28 additions & 0 deletions synapse/replication/tcp/streams/partial_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,31 @@ def __init__(self, hs: "HomeServer"):
current_token_without_instance(store.get_un_partial_stated_rooms_token),
store.get_un_partial_stated_rooms_from_stream,
)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class UnPartialStatedEventStreamRow:
# ID of the event that has been un-partial-stated.
event_id: str

# True iff the rejection status of the event changed as a result of being
# un-partial-stated.
rejection_status_changed: bool


class UnPartialStatedEventStream(Stream):
"""
Stream to notify about events becoming un-partial-stated.
"""

NAME = "un_partial_stated_event"
ROW_TYPE = UnPartialStatedEventStreamRow

def __init__(self, hs: "HomeServer"):
store = hs.get_datastores().main
super().__init__(
hs.get_instance_name(),
# TODO(faster_joins, multiple writers): we need to account for instance names
current_token_without_instance(store.get_un_partial_stated_events_token),
store.get_un_partial_stated_events_from_stream,
)
88 changes: 88 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
AbstractStreamIdTracker,
MultiWriterIdGenerator,
StreamIdGenerator,
Expand Down Expand Up @@ -291,6 +292,93 @@ def get_chain_id_txn(txn: Cursor) -> int:
id_column="chain_id",
)

self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator

if isinstance(database.engine, PostgresEngine):
self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
stream_name="un_partial_stated_event_stream",
instance_name=hs.get_instance_name(),
tables=[
("un_partial_stated_event_stream", "instance_name", "stream_id")
],
sequence_name="un_partial_stated_event_stream_sequence",
# TODO(faster_joins, multiple writers) Support multiple writers.
writers=["master"],
)
else:
self._un_partial_stated_events_stream_id_gen = StreamIdGenerator(
db_conn, "un_partial_stated_event_stream", "stream_id"
)

def get_un_partial_stated_events_token(self) -> int:
# TODO(faster_joins, multiple writers): This is inappropriate if there are multiple
# writers because workers that don't write often will hold all
# readers up.
return self._un_partial_stated_events_stream_id_gen.get_current_token()

async def get_un_partial_stated_events_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
"""Get updates for the un-partial-stated events replication stream.

Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.

Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.

The token returned can be used in a subsequent call to this
function to get further updatees.

The updates are a list of 2-tuples of stream ID and the row data
"""

if last_id == current_id:
return [], current_id, False

def get_un_partial_stated_events_from_stream_txn(
txn: LoggingTransaction,
) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
sql = """
SELECT stream_id, event_id, rejection_status_changed
FROM un_partial_stated_event_stream
WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
ORDER BY stream_id ASC
LIMIT ?
"""
txn.execute(sql, (last_id, current_id, instance_name, limit))
updates = [
(
row[0],
(
row[1],
bool(row[2]),
),
)
for row in txn
]
limited = False
upto_token = current_id
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True

return updates, upto_token, limited

return await self.db_pool.runInteraction(
"get_un_partial_stated_events_from_stream",
get_un_partial_stated_events_from_stream_txn,
)

def process_replication_rows(
self,
stream_name: str,
Expand Down
34 changes: 25 additions & 9 deletions synapse/storage/databases/main/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def __init__(
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self._instance_name: str = hs.get_instance_name()

async def get_room_version(self, room_id: str) -> RoomVersion:
"""Get the room_version of a given room
Expand Down Expand Up @@ -404,18 +405,21 @@ async def update_state_for_partial_state_event(
context: EventContext,
) -> None:
"""Update the state group for a partial state event"""
await self.db_pool.runInteraction(
"update_state_for_partial_state_event",
self._update_state_for_partial_state_event_txn,
event,
context,
)
async with self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id:
await self.db_pool.runInteraction(
"update_state_for_partial_state_event",
self._update_state_for_partial_state_event_txn,
event,
context,
un_partial_state_event_stream_id,
)

def _update_state_for_partial_state_event_txn(
self,
txn: LoggingTransaction,
event: EventBase,
context: EventContext,
un_partial_state_event_stream_id: int,
) -> None:
# we shouldn't have any outliers here
assert not event.internal_metadata.is_outlier()
Expand All @@ -436,7 +440,10 @@ def _update_state_for_partial_state_event_txn(

# the event may now be rejected where it was not before, or vice versa,
# in which case we need to update the rejected flags.
if bool(context.rejected) != (event.rejected_reason is not None):
rejection_status_changed = bool(context.rejected) != (
event.rejected_reason is not None
)
if rejection_status_changed:
self.mark_event_rejected_txn(txn, event.event_id, context.rejected)

self.db_pool.simple_delete_one_txn(
Expand All @@ -445,15 +452,24 @@ def _update_state_for_partial_state_event_txn(
keyvalues={"event_id": event.event_id},
)

# TODO(faster_joins): need to do something about workers here
# https://github.com/matrix-org/synapse/issues/12994
txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,))
txn.call_after(
self._get_state_group_for_event.prefill,
(event.event_id,),
state_group,
)

self.db_pool.simple_insert_txn(
txn,
"un_partial_stated_event_stream",
{
"stream_id": un_partial_state_event_stream_id,
"instance_name": self._instance_name,
"event_id": event.event_id,
"rejection_status_changed": rejection_status_changed,
},
)


class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Stream for notifying that an event has become un-partial-stated.
CREATE TABLE un_partial_stated_event_stream(
-- Position in the stream
stream_id BIGINT PRIMARY KEY NOT NULL,

-- Which instance wrote this entry.
instance_name TEXT NOT NULL,

-- Which event has been un-partial-stated.
event_id TEXT NOT NULL REFERENCES events(event_id) ON DELETE CASCADE,

-- true iff the `rejected` status of the event changed when it became
-- un-partial-stated.
rejection_status_changed BOOLEAN NOT NULL
);

-- We want an index here because of the foreign key constraint:
-- upon deleting an event, the database needs to be able to check here.
CREATE UNIQUE INDEX un_partial_stated_event_stream_room_id ON un_partial_stated_event_stream (event_id);
Comment on lines +32 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With rooms, the same room can be un-partial stated multiple times.
Can events really only be un-partial stated once? I'm not clear on what happens when the last user leaves a room.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I should be a bit more stringent about researching this (possibly need to 'prove' that we only make an event partial-stated when it's initially persisted(?)), but I can't for the life of me imagine how it makes sense to turn a full-state event into a partial-stated event (that we then intend to make full-state again).
When a user leaves a room, we should keep the events and state around so that the user can access the room in their 'archive' section.
Only when a room is purged (e.g. because all users forgot the room and we do it automatically, or by admin action) should we start removing information we already hold. In this case, events will be removed but then so too will the stream entries for those events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I have confirmed this: an event can only be made partial-stated when it is created and possibly when it is de-outliered. But note that an event can't be both an outlier and partial-stated, so exactly one of those can be true for the lifetime of an event.
An event is only un-partial-stated upon deletion (at which point we also remove the stream row for it) or after we fetch the full state for it (it can't revert to being an outlier from this state, except by being deleted first and being persisted anew later).

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE SEQUENCE IF NOT EXISTS un_partial_stated_event_stream_sequence;

SELECT setval('un_partial_stated_event_stream_sequence', (
SELECT COALESCE(MAX(stream_id), 1) FROM un_partial_stated_event_stream
));