Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Faster room joins: fix race in recalculation of current room state #13151

Merged
1 change: 1 addition & 0 deletions changelog.d/13151.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Faster room joins: fix race in recalculation of current room state.
9 changes: 2 additions & 7 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1543,14 +1543,9 @@ async def _sync_partial_state_room(
# all the events are updated, so we can update current state and
# clear the lazy-loading flag.
logger.info("Updating current state for %s", room_id)
# TODO(faster_joins): support workers
# TODO(faster_joins): notify workers in notify_room_un_partial_stated
# https://github.com/matrix-org/synapse/issues/12994
assert (
self._storage_controllers.persistence is not None
), "worker-mode deployments not currently supported here"
await self._storage_controllers.persistence.update_current_state(
room_id
)
await self.state_handler.update_current_state(room_id)

logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
Expand Down
2 changes: 2 additions & 0 deletions synapse/replication/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
push,
register,
send_event,
state,
streams,
)

Expand All @@ -48,6 +49,7 @@ def register_servlets(self, hs: "HomeServer") -> None:
streams.register_servlets(hs, self)
account_data.register_servlets(hs, self)
push.register_servlets(hs, self)
state.register_servlets(hs, self)

# The following can't currently be instantiated on workers.
if hs.config.worker.worker_app is None:
Expand Down
75 changes: 75 additions & 0 deletions synapse/replication/http/state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Tuple

from twisted.web.server import Request

from synapse.api.errors import SynapseError
from synapse.http.server import HttpServer
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)


class ReplicationUpdateCurrentStateRestServlet(ReplicationEndpoint):
"""Recalculates the current state for a room, and persists it.

The API looks like:

POST /_synapse/replication/update_current_state/:room_id

{}

200 OK

{}
"""

NAME = "update_current_state"
PATH_ARGS = ("room_id",)

def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self._state_handler = hs.get_state_handler()
self._events_shard_config = hs.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()

@staticmethod
async def _serialize_payload(room_id: str) -> JsonDict: # type: ignore[override]
return {}
Comment on lines +56 to +57
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can drop this since returning an empty dict is the default implementation in the base class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python complains because the base implementation is an abstractmethod

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh boo


async def _handle_request( # type: ignore[override]
self, request: Request, room_id: str
) -> Tuple[int, JsonDict]:
writer_instance = self._events_shard_config.get_instance(room_id)
if writer_instance != self._instance_name:
raise SynapseError(
400, "/update_current_state request was routed to the wrong worker"
)

await self._state_handler.update_current_state(room_id)

return 200, {}


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
if hs.get_instance_name() in hs.config.worker.writers.events:
ReplicationUpdateCurrentStateRestServlet(hs).register(http_server)
25 changes: 25 additions & 0 deletions synapse/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.context import ContextResourceUsage
from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet
from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.roommember import ProfileInfo
Expand Down Expand Up @@ -129,6 +130,12 @@ def __init__(self, hs: "HomeServer"):
self.hs = hs
self._state_resolution_handler = hs.get_state_resolution_handler()
self._storage_controllers = hs.get_storage_controllers()
self._events_shard_config = hs.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()

self._update_current_state = (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth naming this _client to make it obvious what the difference is between it and update_current_state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed!

ReplicationUpdateCurrentStateRestServlet.make_client(hs)
)

async def get_current_state_ids(
self,
Expand Down Expand Up @@ -417,6 +424,24 @@ async def resolve_events(

return {key: state_map[ev_id] for key, ev_id in new_state.items()}

async def update_current_state(self, room_id: str) -> None:
"""Recalculates the current state for a room, and persists it.

Raises:
SynapseError(502): if all attempts to connect to the event persister worker
fail
"""
writer_instance = self._events_shard_config.get_instance(room_id)
if writer_instance != self._instance_name:
await self._update_current_state(
instance_name=writer_instance,
room_id=room_id,
)
return

assert self._storage_controllers.persistence is not None
await self._storage_controllers.persistence.update_current_state(room_id)


@attr.s(slots=True, auto_attribs=True)
class _StateResMetrics:
Expand Down
Loading