Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3653 from matrix-org/erikj/split_federation
Browse files Browse the repository at this point in the history
Move more federation APIs to workers
  • Loading branch information
erikjohnston authored Aug 15, 2018
2 parents 4601129 + ef184ca commit dc56c47
Show file tree
Hide file tree
Showing 16 changed files with 512 additions and 138 deletions.
1 change: 1 addition & 0 deletions changelog.d/3653.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support more federation endpoints on workers
13 changes: 13 additions & 0 deletions docs/workers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,23 @@ endpoints matching the following regular expressions::
^/_matrix/federation/v1/backfill/
^/_matrix/federation/v1/get_missing_events/
^/_matrix/federation/v1/publicRooms
^/_matrix/federation/v1/query/
^/_matrix/federation/v1/make_join/
^/_matrix/federation/v1/make_leave/
^/_matrix/federation/v1/send_join/
^/_matrix/federation/v1/send_leave/
^/_matrix/federation/v1/invite/
^/_matrix/federation/v1/query_auth/
^/_matrix/federation/v1/event_auth/
^/_matrix/federation/v1/exchange_third_party_invite/
^/_matrix/federation/v1/send/

The above endpoints should all be routed to the federation_reader worker by the
reverse-proxy configuration.

The `^/_matrix/federation/v1/send/` endpoint must only be handled by a single
instance.

``synapse.app.federation_sender``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
4 changes: 2 additions & 2 deletions synapse/app/client_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import (
JoinedRoomMemberListRestServlet,
Expand All @@ -66,7 +66,7 @@ class ClientReaderSlavedStore(
DirectoryStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
TransactionStore,
SlavedTransactionStore,
SlavedClientIpStore,
BaseSlavedStore,
):
Expand Down
4 changes: 2 additions & 2 deletions synapse/app/event_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import (
JoinRoomAliasServlet,
Expand All @@ -63,7 +63,7 @@

class EventCreatorSlavedStore(
DirectoryStore,
TransactionStore,
SlavedTransactionStore,
SlavedProfileStore,
SlavedAccountDataStore,
SlavedPusherStore,
Expand Down
14 changes: 12 additions & 2 deletions synapse/app/federation_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.profile import SlavedProfileStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
Expand All @@ -49,11 +54,16 @@


class FederationReaderSlavedStore(
SlavedProfileStore,
SlavedApplicationServiceStore,
SlavedPusherStore,
SlavedPushRuleStore,
SlavedReceiptsStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
DirectoryStore,
TransactionStore,
SlavedTransactionStore,
BaseSlavedStore,
):
pass
Expand Down
4 changes: 2 additions & 2 deletions synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
Expand All @@ -50,7 +50,7 @@


class FederationSenderSlaveStore(
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
):
def __init__(self, db_conn, hs):
Expand Down
4 changes: 2 additions & 2 deletions synapse/app/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
Expand All @@ -52,7 +52,7 @@ class MediaRepositorySlavedStore(
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedClientIpStore,
TransactionStore,
SlavedTransactionStore,
BaseSlavedStore,
MediaRepositoryStore,
):
Expand Down
54 changes: 54 additions & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
Expand Down Expand Up @@ -760,6 +764,8 @@ def register_edu_handler(self, edu_type, handler):
if edu_type in self.edu_handlers:
raise KeyError("Already have an EDU handler for %s" % (edu_type,))

logger.info("Registering federation EDU handler for %r", edu_type)

self.edu_handlers[edu_type] = handler

def register_query_handler(self, query_type, handler):
Expand All @@ -778,6 +784,8 @@ def register_query_handler(self, query_type, handler):
"Already have a Query handler for %s" % (query_type,)
)

logger.info("Registering federation query handler for %r", query_type)

self.query_handlers[query_type] = handler

@defer.inlineCallbacks
Expand All @@ -800,3 +808,49 @@ def on_query(self, query_type, args):
raise NotFoundError("No handler for Query type '%s'" % (query_type,))

return handler(args)


class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
"""A FederationHandlerRegistry for worker processes.
When receiving EDU or queries it will check if an appropriate handler has
been registered on the worker, if there isn't one then it calls off to the
master process.
"""

def __init__(self, hs):
self.config = hs.config
self.http_client = hs.get_simple_http_client()
self.clock = hs.get_clock()

self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)

super(ReplicationFederationHandlerRegistry, self).__init__()

def on_edu(self, edu_type, origin, content):
"""Overrides FederationHandlerRegistry
"""
handler = self.edu_handlers.get(edu_type)
if handler:
return super(ReplicationFederationHandlerRegistry, self).on_edu(
edu_type, origin, content,
)

return self._send_edu(
edu_type=edu_type,
origin=origin,
content=content,
)

def on_query(self, query_type, args):
"""Overrides FederationHandlerRegistry
"""
handler = self.query_handlers.get(query_type)
if handler:
return handler(args)

return self._get_query_client(
query_type=query_type,
args=args,
)
74 changes: 57 additions & 17 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
compute_event_signature,
)
from synapse.events.validator import EventValidator
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
ReplicationFederationSendEventsRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
Expand Down Expand Up @@ -91,6 +96,18 @@ def __init__(self, hs):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config
self.http_client = hs.get_simple_http_client()

self._send_events_to_master = (
ReplicationFederationSendEventsRestServlet.make_client(hs)
)
self._notify_user_membership_change = (
ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
)
self._clean_room_for_join_client = (
ReplicationCleanRoomRestServlet.make_client(hs)
)

# When joining a room we need to queue any events for that room up
self.room_queues = {}
Expand Down Expand Up @@ -1158,7 +1175,7 @@ def on_invite_request(self, origin, pdu):
)

context = yield self.state_handler.compute_event_context(event)
yield self._persist_events([(event, context)])
yield self.persist_events_and_notify([(event, context)])

defer.returnValue(event)

Expand Down Expand Up @@ -1189,7 +1206,7 @@ def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
)

context = yield self.state_handler.compute_event_context(event)
yield self._persist_events([(event, context)])
yield self.persist_events_and_notify([(event, context)])

defer.returnValue(event)

Expand Down Expand Up @@ -1432,7 +1449,7 @@ def _handle_new_event(self, origin, event, state=None, auth_events=None,
event, context
)

yield self._persist_events(
yield self.persist_events_and_notify(
[(event, context)],
backfilled=backfilled,
)
Expand Down Expand Up @@ -1470,7 +1487,7 @@ def _handle_new_events(self, origin, event_infos, backfilled=False):
], consumeErrors=True,
))

yield self._persist_events(
yield self.persist_events_and_notify(
[
(ev_info["event"], context)
for ev_info, context in zip(event_infos, contexts)
Expand Down Expand Up @@ -1558,7 +1575,7 @@ def _persist_auth_tree(self, origin, auth_events, state, event):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR

yield self._persist_events(
yield self.persist_events_and_notify(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
Expand All @@ -1569,7 +1586,7 @@ def _persist_auth_tree(self, origin, auth_events, state, event):
event, old_state=state
)

yield self._persist_events(
yield self.persist_events_and_notify(
[(event, new_event_context)],
)

Expand Down Expand Up @@ -2297,7 +2314,7 @@ def _check_key_revocation(self, public_key, url):
for revocation.
"""
try:
response = yield self.hs.get_simple_http_client().get_json(
response = yield self.http_client.get_json(
url,
{"public_key": public_key}
)
Expand All @@ -2310,7 +2327,7 @@ def _check_key_revocation(self, public_key, url):
raise AuthError(403, "Third party certificate was invalid")

@defer.inlineCallbacks
def _persist_events(self, event_and_contexts, backfilled=False):
def persist_events_and_notify(self, event_and_contexts, backfilled=False):
"""Persists events and tells the notifier/pushers about them, if
necessary.
Expand All @@ -2322,14 +2339,21 @@ def _persist_events(self, event_and_contexts, backfilled=False):
Returns:
Deferred
"""
max_stream_id = yield self.store.persist_events(
event_and_contexts,
backfilled=backfilled,
)
if self.config.worker_app:
yield self._send_events_to_master(
store=self.store,
event_and_contexts=event_and_contexts,
backfilled=backfilled
)
else:
max_stream_id = yield self.store.persist_events(
event_and_contexts,
backfilled=backfilled,
)

if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id)
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id)

def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
Expand Down Expand Up @@ -2368,9 +2392,25 @@ def _notify_persisted_event(self, event, max_stream_id):
)

def _clean_room_for_join(self, room_id):
return self.store.clean_room_for_join(room_id)
"""Called to clean up any data in DB for a given room, ready for the
server to join the room.
Args:
room_id (str)
"""
if self.config.worker_app:
return self._clean_room_for_join_client(room_id)
else:
return self.store.clean_room_for_join(room_id)

def user_joined_room(self, user, room_id):
"""Called when a new user has joined the room
"""
return user_joined_room(self.distributor, user, room_id)
if self.config.worker_app:
return self._notify_user_membership_change(
room_id=room_id,
user_id=user.to_string(),
change="joined",
)
else:
return user_joined_room(self.distributor, user, room_id)
3 changes: 2 additions & 1 deletion synapse/replication/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

from synapse.http.server import JsonResource
from synapse.replication.http import membership, send_event
from synapse.replication.http import federation, membership, send_event

REPLICATION_PREFIX = "/_synapse/replication"

Expand All @@ -27,3 +27,4 @@ def __init__(self, hs):
def register_servlets(self, hs):
send_event.register_servlets(hs, self)
membership.register_servlets(hs, self)
federation.register_servlets(hs, self)
Loading

0 comments on commit dc56c47

Please sign in to comment.