From e26dbd82ef5f1d755be9a62165556ebce041af10 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 Jul 2018 16:32:05 +0100 Subject: [PATCH 01/11] Add replication APIs for persisting federation events --- synapse/app/federation_reader.py | 8 + synapse/handlers/federation.py | 44 ++++- synapse/replication/http/__init__.py | 3 +- synapse/replication/http/federation.py | 245 +++++++++++++++++++++++++ 4 files changed, 290 insertions(+), 10 deletions(-) create mode 100644 synapse/replication/http/federation.py diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 7af00b8bcfaa..c512b4be877e 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -32,9 +32,13 @@ from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.pushers import SlavedPusherStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.tcp.client import ReplicationClientHandler @@ -49,6 +53,10 @@ class FederationReaderSlavedStore( + SlavedApplicationServiceStore, + SlavedPusherStore, + SlavedPushRuleStore, + SlavedReceiptsStore, SlavedEventStore, SlavedKeyStore, RoomStore, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 533b82c78361..0524dec942d1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -44,6 +44,8 @@ compute_event_signature, ) from synapse.events.validator import EventValidator +from synapse.replication.http.federation import send_federation_events_to_master +from synapse.replication.http.membership import notify_user_membership_change from synapse.state import resolve_events_with_factory from synapse.types import UserID, get_domain_from_id from synapse.util import logcontext, unwrapFirstError @@ -86,6 +88,8 @@ def __init__(self, hs): self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() self._server_notices_mxid = hs.config.server_notices_mxid + self.config = hs.config + self.http_client = hs.get_simple_http_client() # When joining a room we need to queue any events for that room up self.room_queues = {} @@ -2288,7 +2292,7 @@ def _check_key_revocation(self, public_key, url): for revocation. """ try: - response = yield self.hs.get_simple_http_client().get_json( + response = yield self.http_client.get_json( url, {"public_key": public_key} ) @@ -2313,14 +2317,25 @@ def _persist_events(self, event_and_contexts, backfilled=False): Returns: Deferred """ - max_stream_id = yield self.store.persist_events( - event_and_contexts, - backfilled=backfilled, - ) + if self.config.worker_app: + yield send_federation_events_to_master( + clock=self.hs.get_clock(), + store=self.store, + client=self.http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + event_and_contexts=event_and_contexts, + backfilled=backfilled + ) + else: + max_stream_id = yield self.store.persist_events( + event_and_contexts, + backfilled=backfilled, + ) - if not backfilled: # Never notify for backfilled events - for event, _ in event_and_contexts: - self._notify_persisted_event(event, max_stream_id) + if not backfilled: # Never notify for backfilled events + for event, _ in event_and_contexts: + self._notify_persisted_event(event, max_stream_id) def _notify_persisted_event(self, event, max_stream_id): """Checks to see if notifier/pushers should be notified about the @@ -2359,9 +2374,20 @@ def _notify_persisted_event(self, event, max_stream_id): ) def _clean_room_for_join(self, room_id): + # TODO move this out to master return self.store.clean_room_for_join(room_id) def user_joined_room(self, user, room_id): """Called when a new user has joined the room """ - return user_joined_room(self.distributor, user, room_id) + if self.config.worker_app: + return notify_user_membership_change( + client=self.http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + room_id=room_id, + user_id=user.to_string(), + change="joined", + ) + else: + return user_joined_room(self.distributor, user, room_id) diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 589ee94c66a0..19f214281ef6 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.http.server import JsonResource -from synapse.replication.http import membership, send_event +from synapse.replication.http import federation, membership, send_event REPLICATION_PREFIX = "/_synapse/replication" @@ -27,3 +27,4 @@ def __init__(self, hs): def register_servlets(self, hs): send_event.register_servlets(hs, self) membership.register_servlets(hs, self) + federation.register_servlets(hs, self) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py new file mode 100644 index 000000000000..f39aaa89be57 --- /dev/null +++ b/synapse/replication/http/federation.py @@ -0,0 +1,245 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, Membership +from synapse.events import FrozenEvent +from synapse.events.snapshot import EventContext +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint +from synapse.types import UserID +from synapse.util.logcontext import run_in_background +from synapse.util.metrics import Measure + +logger = logging.getLogger(__name__) + + +class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): + """Handles events newly received from federation, including persisting and + notifying. + + The API looks like: + + POST /_synapse/replication/fed_send_events/:txn_id + + { + "events": [{ + "event": { .. serialized event .. }, + "internal_metadata": { .. serialized internal_metadata .. }, + "rejected_reason": .., // The event.rejected_reason field + "context": { .. serialized event context .. }, + }], + "backfilled": false + """ + + NAME = "fed_send_events" + PATH_ARGS = () + + def __init__(self, hs): + super(ReplicationFederationSendEventsRestServlet, self).__init__(hs) + + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.is_mine_id = hs.is_mine_id + self.notifier = hs.get_notifier() + self.pusher_pool = hs.get_pusherpool() + + @defer.inlineCallbacks + @staticmethod + def _serialize_payload(store, event_and_contexts, backfilled): + """ + Args: + store + event_and_contexts (list[tuple[FrozenEvent, EventContext]]) + backfilled (bool): Whether or not the events are the result of + backfilling + """ + event_payloads = [] + for event, context in event_and_contexts: + serialized_context = yield context.serialize(event, store) + + event_payloads.append({ + "event": event.get_pdu_json(), + "internal_metadata": event.internal_metadata.get_dict(), + "rejected_reason": event.rejected_reason, + "context": serialized_context, + }) + + payload = { + "events": event_payloads, + "backfilled": backfilled, + } + + defer.returnValue(payload) + + @defer.inlineCallbacks + def _handle_request(self, request): + with Measure(self.clock, "repl_fed_send_events_parse"): + content = parse_json_object_from_request(request) + + backfilled = content["backfilled"] + + event_payloads = content["events"] + + event_and_contexts = [] + for event_payload in event_payloads: + event_dict = event_payload["event"] + internal_metadata = event_payload["internal_metadata"] + rejected_reason = event_payload["rejected_reason"] + event = FrozenEvent(event_dict, internal_metadata, rejected_reason) + + context = yield EventContext.deserialize( + self.store, event_payload["context"], + ) + + event_and_contexts.append((event, context)) + + logger.info( + "Got %d events from federation", + len(event_and_contexts), + ) + + max_stream_id = yield self.store.persist_events( + event_and_contexts, + backfilled=backfilled + ) + + if not backfilled: + for event, _ in event_and_contexts: + self._notify_persisted_event(event, max_stream_id) + + defer.returnValue((200, {})) + + def _notify_persisted_event(self, event, max_stream_id): + extra_users = [] + if event.type == EventTypes.Member: + target_user_id = event.state_key + + # We notify for memberships if its an invite for one of our + # users + if event.internal_metadata.is_outlier(): + if event.membership != Membership.INVITE: + if not self.is_mine_id(target_user_id): + return + + target_user = UserID.from_string(target_user_id) + extra_users.append(target_user) + elif event.internal_metadata.is_outlier(): + return + + event_stream_id = event.internal_metadata.stream_ordering + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + + run_in_background( + self.pusher_pool.on_new_notifications, + event_stream_id, max_stream_id, + ) + + +class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): + """Handles EDUs newly received from federation, including persisting and + notifying. + """ + + NAME = "fed_send_edu" + PATH_ARGS = ("edu_type",) + + def __init__(self, hs): + super(ReplicationFederationSendEduRestServlet, self).__init__(hs) + + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.registry = hs.get_federation_registry() + + @staticmethod + def _serialize_payload(edu_type, origin, content): + return { + "origin": origin, + "content": content, + } + + @defer.inlineCallbacks + def _handle_request(self, request, edu_type): + with Measure(self.clock, "repl_fed_send_edu_parse"): + content = parse_json_object_from_request(request) + + origin = content["origin"] + edu_content = content["content"] + + logger.info( + "Got %r edu from $s", + edu_type, origin, + ) + + result = yield self.registry.on_edu(edu_type, origin, edu_content) + + defer.returnValue((200, result)) + + +class ReplicationGetQueryRestServlet(ReplicationEndpoint): + """Handle responding to queries from federation. + """ + + NAME = "fed_query" + PATH_ARGS = ("query_type",) + + # This is a query, so let's not bother caching + CACHE = False + + def __init__(self, hs): + super(ReplicationGetQueryRestServlet, self).__init__(hs) + + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.registry = hs.get_federation_registry() + + @staticmethod + def _serialize_payload(query_type, args): + """ + Args: + query_type (str) + args (dict): The arguments received for the given query type + """ + return { + "args": args, + } + + @defer.inlineCallbacks + def _handle_request(self, request, query_type): + with Measure(self.clock, "repl_fed_query_parse"): + content = parse_json_object_from_request(request) + + args = content["args"] + + logger.info( + "Got %r query", + query_type, + ) + + result = yield self.registry.on_query(query_type, args) + + defer.returnValue((200, result)) + + +def register_servlets(hs, http_server): + ReplicationFederationSendEventsRestServlet(hs).register(http_server) + ReplicationFederationSendEduRestServlet(hs).register(http_server) + ReplicationGetQueryRestServlet(hs).register(http_server) From a3f5bf79a0fc0ea6d59069945f53717a3e9c6581 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jul 2018 11:44:22 +0100 Subject: [PATCH 02/11] Add EDU/query handling over replication --- synapse/federation/federation_server.py | 43 +++++++++++++++++++++++++ synapse/handlers/federation.py | 24 +++++++------- synapse/replication/http/federation.py | 2 +- synapse/server.py | 6 +++- 4 files changed, 62 insertions(+), 13 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index bf89d568afed..941e30a59632 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -33,6 +33,10 @@ from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction from synapse.http.endpoint import parse_server_name +from synapse.replication.http.federation import ( + ReplicationFederationSendEduRestServlet, + ReplicationGetQueryRestServlet, +) from synapse.types import get_domain_from_id from synapse.util import async from synapse.util.caches.response_cache import ResponseCache @@ -745,6 +749,8 @@ def register_edu_handler(self, edu_type, handler): if edu_type in self.edu_handlers: raise KeyError("Already have an EDU handler for %s" % (edu_type,)) + logger.info("Registering federation EDU handler for %r", edu_type) + self.edu_handlers[edu_type] = handler def register_query_handler(self, query_type, handler): @@ -763,6 +769,8 @@ def register_query_handler(self, query_type, handler): "Already have a Query handler for %s" % (query_type,) ) + logger.info("Registering federation query handler for %r", query_type) + self.query_handlers[query_type] = handler @defer.inlineCallbacks @@ -785,3 +793,38 @@ def on_query(self, query_type, args): raise NotFoundError("No handler for Query type '%s'" % (query_type,)) return handler(args) + + +class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): + def __init__(self, hs): + self.config = hs.config + self.http_client = hs.get_simple_http_client() + self.clock = hs.get_clock() + + self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs) + self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs) + + super(ReplicationFederationHandlerRegistry, self).__init__() + + def on_edu(self, edu_type, origin, content): + handler = self.edu_handlers.get(edu_type) + if handler: + return super(ReplicationFederationHandlerRegistry, self).on_edu( + edu_type, origin, content, + ) + + return self._send_edu( + edu_type=edu_type, + origin=origin, + content=content, + ) + + def on_query(self, query_type, args): + handler = self.query_handlers.get(query_type) + if handler: + return handler(args) + + return self._get_query_client( + query_type=query_type, + args=args, + ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0524dec942d1..d2cbb12df3e2 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -44,8 +44,10 @@ compute_event_signature, ) from synapse.events.validator import EventValidator -from synapse.replication.http.federation import send_federation_events_to_master -from synapse.replication.http.membership import notify_user_membership_change +from synapse.replication.http.federation import ( + ReplicationFederationSendEventsRestServlet, +) +from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import resolve_events_with_factory from synapse.types import UserID, get_domain_from_id from synapse.util import logcontext, unwrapFirstError @@ -91,6 +93,13 @@ def __init__(self, hs): self.config = hs.config self.http_client = hs.get_simple_http_client() + self._send_events_to_master = ( + ReplicationFederationSendEventsRestServlet.make_client(hs) + ) + self._notify_user_membership_change = ( + ReplicationUserJoinedLeftRoomRestServlet.make_client(hs) + ) + # When joining a room we need to queue any events for that room up self.room_queues = {} self._room_pdu_linearizer = Linearizer("fed_room_pdu") @@ -2318,12 +2327,8 @@ def _persist_events(self, event_and_contexts, backfilled=False): Deferred """ if self.config.worker_app: - yield send_federation_events_to_master( - clock=self.hs.get_clock(), + yield self._send_events_to_master( store=self.store, - client=self.http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, event_and_contexts=event_and_contexts, backfilled=backfilled ) @@ -2381,10 +2386,7 @@ def user_joined_room(self, user, room_id): """Called when a new user has joined the room """ if self.config.worker_app: - return notify_user_membership_change( - client=self.http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + return self._notify_user_membership_change( room_id=room_id, user_id=user.to_string(), change="joined", diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index f39aaa89be57..3fa7bd64c7b5 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -59,8 +59,8 @@ def __init__(self, hs): self.notifier = hs.get_notifier() self.pusher_pool = hs.get_pusherpool() - @defer.inlineCallbacks @staticmethod + @defer.inlineCallbacks def _serialize_payload(store, event_and_contexts, backfilled): """ Args: diff --git a/synapse/server.py b/synapse/server.py index 140be9ebe863..26228d8c72b4 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -36,6 +36,7 @@ from synapse.federation.federation_server import ( FederationHandlerRegistry, FederationServer, + ReplicationFederationHandlerRegistry, ) from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.federation.transaction_queue import TransactionQueue @@ -423,7 +424,10 @@ def build_room_member_handler(self): return RoomMemberMasterHandler(self) def build_federation_registry(self): - return FederationHandlerRegistry() + if self.config.worker_app: + return ReplicationFederationHandlerRegistry(self) + else: + return FederationHandlerRegistry() def build_server_notices_manager(self): if self.config.worker_app: From 1e2bed9656a4826ed7fdc82bc096df775bc38baf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jul 2018 11:45:12 +0100 Subject: [PATCH 03/11] Import all functions from TransactionStore --- synapse/replication/slave/storage/transactions.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py index 9c9a5eadd906..cd6416c47c8a 100644 --- a/synapse/replication/slave/storage/transactions.py +++ b/synapse/replication/slave/storage/transactions.py @@ -13,19 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage import DataStore from synapse.storage.transactions import TransactionStore from ._base import BaseSlavedStore -class TransactionStore(BaseSlavedStore): - get_destination_retry_timings = TransactionStore.__dict__[ - "get_destination_retry_timings" - ] - _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__ - set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__ - _set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__ - - prep_send_transaction = DataStore.prep_send_transaction.__func__ - delivered_txn = DataStore.delivered_txn.__func__ +class TransactionStore(TransactionStore, BaseSlavedStore): + pass From 62ace05c4521caeed023e5b9dadd993afe5c154f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jul 2018 13:31:59 +0100 Subject: [PATCH 04/11] Move necessary storage functions to worker classes --- synapse/storage/events.py | 82 ------------------------------- synapse/storage/events_worker.py | 84 ++++++++++++++++++++++++++++++++ synapse/storage/room.py | 32 ++++++------ 3 files changed, 100 insertions(+), 98 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e8e5a0fe443c..5374796fd730 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1430,88 +1430,6 @@ def _store_redaction(self, txn, event): (event.event_id, event.redacts) ) - @defer.inlineCallbacks - def have_events_in_timeline(self, event_ids): - """Given a list of event ids, check if we have already processed and - stored them as non outliers. - """ - rows = yield self._simple_select_many_batch( - table="events", - retcols=("event_id",), - column="event_id", - iterable=list(event_ids), - keyvalues={"outlier": False}, - desc="have_events_in_timeline", - ) - - defer.returnValue(set(r["event_id"] for r in rows)) - - @defer.inlineCallbacks - def have_seen_events(self, event_ids): - """Given a list of event ids, check if we have already processed them. - - Args: - event_ids (iterable[str]): - - Returns: - Deferred[set[str]]: The events we have already seen. - """ - results = set() - - def have_seen_events_txn(txn, chunk): - sql = ( - "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" - % (",".join("?" * len(chunk)), ) - ) - txn.execute(sql, chunk) - for (event_id, ) in txn: - results.add(event_id) - - # break the input up into chunks of 100 - input_iterator = iter(event_ids) - for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), - []): - yield self.runInteraction( - "have_seen_events", - have_seen_events_txn, - chunk, - ) - defer.returnValue(results) - - def get_seen_events_with_rejections(self, event_ids): - """Given a list of event ids, check if we rejected them. - - Args: - event_ids (list[str]) - - Returns: - Deferred[dict[str, str|None): - Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps - to None. - """ - if not event_ids: - return defer.succeed({}) - - def f(txn): - sql = ( - "SELECT e.event_id, reason FROM events as e " - "LEFT JOIN rejections as r ON e.event_id = r.event_id " - "WHERE e.event_id = ?" - ) - - res = {} - for event_id in event_ids: - txn.execute(sql, (event_id,)) - row = txn.fetchone() - if row: - _, rejected = row - res[event_id] = rejected - - return res - - return self.runInteraction("get_rejection_reasons", f) - @defer.inlineCallbacks def count_daily_messages(self): """ diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 9b4cfeb89922..e1dc2c62fd6f 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -12,7 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import itertools import logging + from collections import namedtuple from canonicaljson import json @@ -442,3 +444,85 @@ def _get_event_from_row(self, internal_metadata, js, redacted, self._get_event_cache.prefill((original_ev.event_id,), cache_entry) defer.returnValue(cache_entry) + + @defer.inlineCallbacks + def have_events_in_timeline(self, event_ids): + """Given a list of event ids, check if we have already processed and + stored them as non outliers. + """ + rows = yield self._simple_select_many_batch( + table="events", + retcols=("event_id",), + column="event_id", + iterable=list(event_ids), + keyvalues={"outlier": False}, + desc="have_events_in_timeline", + ) + + defer.returnValue(set(r["event_id"] for r in rows)) + + @defer.inlineCallbacks + def have_seen_events(self, event_ids): + """Given a list of event ids, check if we have already processed them. + + Args: + event_ids (iterable[str]): + + Returns: + Deferred[set[str]]: The events we have already seen. + """ + results = set() + + def have_seen_events_txn(txn, chunk): + sql = ( + "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" + % (",".join("?" * len(chunk)), ) + ) + txn.execute(sql, chunk) + for (event_id, ) in txn: + results.add(event_id) + + # break the input up into chunks of 100 + input_iterator = iter(event_ids) + for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), + []): + yield self.runInteraction( + "have_seen_events", + have_seen_events_txn, + chunk, + ) + defer.returnValue(results) + + def get_seen_events_with_rejections(self, event_ids): + """Given a list of event ids, check if we rejected them. + + Args: + event_ids (list[str]) + + Returns: + Deferred[dict[str, str|None): + Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps + to None. + """ + if not event_ids: + return defer.succeed({}) + + def f(txn): + sql = ( + "SELECT e.event_id, reason FROM events as e " + "LEFT JOIN rejections as r ON e.event_id = r.event_id " + "WHERE e.event_id = ?" + ) + + res = {} + for event_id in event_ids: + txn.execute(sql, (event_id,)) + row = txn.fetchone() + if row: + _, rejected = row + res[event_id] = rejected + + return res + + return self.runInteraction("get_rejection_reasons", f) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 3147fb682775..3378fc77d1c4 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -41,6 +41,22 @@ class RoomWorkerStore(SQLBaseStore): + def get_room(self, room_id): + """Retrieve a room. + + Args: + room_id (str): The ID of the room to retrieve. + Returns: + A namedtuple containing the room information, or an empty list. + """ + return self._simple_select_one( + table="rooms", + keyvalues={"room_id": room_id}, + retcols=("room_id", "is_public", "creator"), + desc="get_room", + allow_none=True, + ) + def get_public_room_ids(self): return self._simple_select_onecol( table="rooms", @@ -215,22 +231,6 @@ def store_room_txn(txn, next_id): logger.error("store_room with room_id=%s failed: %s", room_id, e) raise StoreError(500, "Problem creating room.") - def get_room(self, room_id): - """Retrieve a room. - - Args: - room_id (str): The ID of the room to retrieve. - Returns: - A namedtuple containing the room information, or an empty list. - """ - return self._simple_select_one( - table="rooms", - keyvalues={"room_id": room_id}, - retcols=("room_id", "is_public", "creator"), - desc="get_room", - allow_none=True, - ) - @defer.inlineCallbacks def set_room_is_public(self, room_id, is_public): def set_room_is_public_txn(txn, next_id): From 96a9a296455e2c08be4e0375cf784e4113fa51e1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jul 2018 13:35:36 +0100 Subject: [PATCH 05/11] Pull in necessary stores in federation_reader --- synapse/app/federation_reader.py | 2 ++ synapse/storage/events_worker.py | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index c512b4be877e..0c557a824227 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -36,6 +36,7 @@ from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.profile import SlavedProfileStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore @@ -53,6 +54,7 @@ class FederationReaderSlavedStore( + SlavedProfileStore, SlavedApplicationServiceStore, SlavedPusherStore, SlavedPushRuleStore, diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index e1dc2c62fd6f..59822178ff02 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -14,7 +14,6 @@ # limitations under the License. import itertools import logging - from collections import namedtuple from canonicaljson import json From 72d1902bbeaa029e7003aecaeaf9bc5a41d17646 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Aug 2018 10:23:49 +0100 Subject: [PATCH 06/11] Fixup doc comments --- synapse/federation/federation_server.py | 11 +++++++++++ synapse/replication/http/federation.py | 17 +++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d23c1cf13b99..3bc26c4ab43f 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -811,6 +811,13 @@ def on_query(self, query_type, args): class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): + """A FederationHandlerRegistry for worker processes. + + When receiving EDU or queries it will check if an appropriate handler has + been registered on the worker, if there isn't one then it calls off to the + master process. + """ + def __init__(self, hs): self.config = hs.config self.http_client = hs.get_simple_http_client() @@ -822,6 +829,8 @@ def __init__(self, hs): super(ReplicationFederationHandlerRegistry, self).__init__() def on_edu(self, edu_type, origin, content): + """Overrides FederationHandlerRegistry + """ handler = self.edu_handlers.get(edu_type) if handler: return super(ReplicationFederationHandlerRegistry, self).on_edu( @@ -835,6 +844,8 @@ def on_edu(self, edu_type, origin, content): ) def on_query(self, query_type, args): + """Overrides FederationHandlerRegistry + """ handler = self.query_handlers.get(query_type) if handler: return handler(args) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 3fa7bd64c7b5..3e6cbbf5a19a 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -157,6 +157,15 @@ def _notify_persisted_event(self, event, max_stream_id): class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): """Handles EDUs newly received from federation, including persisting and notifying. + + Request format: + + POST /_synapse/replication/fed_send_edu/:edu_type/:txn_id + + { + "origin": ..., + "content: { ... } + } """ NAME = "fed_send_edu" @@ -196,6 +205,14 @@ def _handle_request(self, request, edu_type): class ReplicationGetQueryRestServlet(ReplicationEndpoint): """Handle responding to queries from federation. + + Request format: + + POST /_synapse/replication/fed_query/:query_type + + { + "args": { ... } + } """ NAME = "fed_query" From b179537f2a51f4de52e2625939cc32eeba75cd6b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Aug 2018 10:29:48 +0100 Subject: [PATCH 07/11] Move clean_room_for_join to master --- synapse/handlers/federation.py | 16 ++++++++++-- synapse/replication/http/federation.py | 35 ++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 37d2307d0abc..acabca1d258c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -50,6 +50,7 @@ ) from synapse.events.validator import EventValidator from synapse.replication.http.federation import ( + ReplicationCleanRoomRestServlet, ReplicationFederationSendEventsRestServlet, ) from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet @@ -104,6 +105,9 @@ def __init__(self, hs): self._notify_user_membership_change = ( ReplicationUserJoinedLeftRoomRestServlet.make_client(hs) ) + self._clean_room_for_join_client = ( + ReplicationCleanRoomRestServlet.make_client(hs) + ) # When joining a room we need to queue any events for that room up self.room_queues = {} @@ -2388,8 +2392,16 @@ def _notify_persisted_event(self, event, max_stream_id): ) def _clean_room_for_join(self, room_id): - # TODO move this out to master - return self.store.clean_room_for_join(room_id) + """Called to clean up any data in DB for a given room, ready for the + server to join the room. + + Args: + room_id (str) + """ + if self.config.worker_app: + return self._clean_room_for_join_client(room_id) + else: + return self.store.clean_room_for_join(room_id) def user_joined_room(self, user, room_id): """Called when a new user has joined the room diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 3e6cbbf5a19a..7b0b1cd32ed1 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -256,7 +256,42 @@ def _handle_request(self, request, query_type): defer.returnValue((200, result)) +class ReplicationCleanRoomRestServlet(ReplicationEndpoint): + """Called to clean up any data in DB for a given room, ready for the + server to join the room. + + Request format: + + POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id + + {} + """ + + NAME = "fed_cleanup_room" + PATH_ARGS = ("room_id",) + + def __init__(self, hs): + super(ReplicationCleanRoomRestServlet, self).__init__(hs) + + self.store = hs.get_datastore() + + @staticmethod + def _serialize_payload(room_id, args): + """ + Args: + room_id (str) + """ + return {} + + @defer.inlineCallbacks + def _handle_request(self, request, room_id): + yield self.store.clean_room_for_join(room_id) + + defer.returnValue((200, {})) + + def register_servlets(hs, http_server): ReplicationFederationSendEventsRestServlet(hs).register(http_server) ReplicationFederationSendEduRestServlet(hs).register(http_server) ReplicationGetQueryRestServlet(hs).register(http_server) + ReplicationCleanRoomRestServlet(hs).register(http_server) From 8876ce7f77ecdf543cc23fb8333e9020791e9376 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Aug 2018 10:30:50 +0100 Subject: [PATCH 08/11] Newsfile --- changelog.d/3653.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3653.feature diff --git a/changelog.d/3653.feature b/changelog.d/3653.feature new file mode 100644 index 000000000000..6c5422994f34 --- /dev/null +++ b/changelog.d/3653.feature @@ -0,0 +1 @@ +Support more federation endpoints on workers From 5c6226707d2588c9f2669512adabc00687295445 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Aug 2018 10:33:55 +0100 Subject: [PATCH 09/11] Update docs/workers.rst --- docs/workers.rst | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/docs/workers.rst b/docs/workers.rst index c5b37c3ded73..ac9efb621f06 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -173,10 +173,23 @@ endpoints matching the following regular expressions:: ^/_matrix/federation/v1/backfill/ ^/_matrix/federation/v1/get_missing_events/ ^/_matrix/federation/v1/publicRooms + ^/_matrix/federation/v1/query/ + ^/_matrix/federation/v1/make_join/ + ^/_matrix/federation/v1/make_leave/ + ^/_matrix/federation/v1/send_join/ + ^/_matrix/federation/v1/send_leave/ + ^/_matrix/federation/v1/invite/ + ^/_matrix/federation/v1/query_auth/ + ^/_matrix/federation/v1/event_auth/ + ^/_matrix/federation/v1/exchange_third_party_invite/ + ^/_matrix/federation/v1/send/ The above endpoints should all be routed to the federation_reader worker by the reverse-proxy configuration. +The `^/_matrix/federation/v1/send/` endpoint must only be handled by a single +instance. + ``synapse.app.federation_sender`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ From 773db62a22ca8be06c9537ae0296952d0e53837e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Aug 2018 14:10:32 +0100 Subject: [PATCH 10/11] Rename slave TransactionStore to SlaveTransactionStore --- synapse/app/client_reader.py | 4 ++-- synapse/app/event_creator.py | 4 ++-- synapse/app/federation_reader.py | 4 ++-- synapse/app/federation_sender.py | 4 ++-- synapse/app/media_repository.py | 4 ++-- synapse/replication/slave/storage/transactions.py | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index e2c91123db8c..86129ed544bc 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -39,7 +39,7 @@ from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v1.room import ( JoinedRoomMemberListRestServlet, @@ -66,7 +66,7 @@ class ClientReaderSlavedStore( DirectoryStore, SlavedApplicationServiceStore, SlavedRegistrationStore, - TransactionStore, + SlavedTransactionStore, SlavedClientIpStore, BaseSlavedStore, ): diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index 374f1156449c..95bfff181bb2 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -43,7 +43,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v1.room import ( JoinRoomAliasServlet, @@ -63,7 +63,7 @@ class EventCreatorSlavedStore( DirectoryStore, - TransactionStore, + SlavedTransactionStore, SlavedProfileStore, SlavedAccountDataStore, SlavedPusherStore, diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 0c557a824227..7e342c178594 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -41,7 +41,7 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -63,7 +63,7 @@ class FederationReaderSlavedStore( SlavedKeyStore, RoomStore, DirectoryStore, - TransactionStore, + SlavedTransactionStore, BaseSlavedStore, ): pass diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 18469013faf0..29911ec68710 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -36,7 +36,7 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -50,7 +50,7 @@ class FederationSenderSlaveStore( - SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, + SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore, SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore, ): def __init__(self, db_conn, hs): diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 749bbf37d021..ee16755660d9 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -34,7 +34,7 @@ from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.server import HomeServer @@ -52,7 +52,7 @@ class MediaRepositorySlavedStore( SlavedApplicationServiceStore, SlavedRegistrationStore, SlavedClientIpStore, - TransactionStore, + SlavedTransactionStore, BaseSlavedStore, MediaRepositoryStore, ): diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py index cd6416c47c8a..3527beb3c926 100644 --- a/synapse/replication/slave/storage/transactions.py +++ b/synapse/replication/slave/storage/transactions.py @@ -18,5 +18,5 @@ from ._base import BaseSlavedStore -class TransactionStore(TransactionStore, BaseSlavedStore): +class SlavedTransactionStore(TransactionStore, BaseSlavedStore): pass From 488ffe6fdb36c7479052096489c683777597c2aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Aug 2018 14:17:18 +0100 Subject: [PATCH 11/11] Use federation handler function rather than duplicate This involves renaming _persist_events to be a public function. --- synapse/handlers/federation.py | 14 ++++---- synapse/replication/http/federation.py | 44 ++------------------------ 2 files changed, 10 insertions(+), 48 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index acabca1d258c..9a37d627ca9a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1175,7 +1175,7 @@ def on_invite_request(self, origin, pdu): ) context = yield self.state_handler.compute_event_context(event) - yield self._persist_events([(event, context)]) + yield self.persist_events_and_notify([(event, context)]) defer.returnValue(event) @@ -1206,7 +1206,7 @@ def do_remotely_reject_invite(self, target_hosts, room_id, user_id): ) context = yield self.state_handler.compute_event_context(event) - yield self._persist_events([(event, context)]) + yield self.persist_events_and_notify([(event, context)]) defer.returnValue(event) @@ -1449,7 +1449,7 @@ def _handle_new_event(self, origin, event, state=None, auth_events=None, event, context ) - yield self._persist_events( + yield self.persist_events_and_notify( [(event, context)], backfilled=backfilled, ) @@ -1487,7 +1487,7 @@ def _handle_new_events(self, origin, event_infos, backfilled=False): ], consumeErrors=True, )) - yield self._persist_events( + yield self.persist_events_and_notify( [ (ev_info["event"], context) for ev_info, context in zip(event_infos, contexts) @@ -1575,7 +1575,7 @@ def _persist_auth_tree(self, origin, auth_events, state, event): raise events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR - yield self._persist_events( + yield self.persist_events_and_notify( [ (e, events_to_context[e.event_id]) for e in itertools.chain(auth_events, state) @@ -1586,7 +1586,7 @@ def _persist_auth_tree(self, origin, auth_events, state, event): event, old_state=state ) - yield self._persist_events( + yield self.persist_events_and_notify( [(event, new_event_context)], ) @@ -2327,7 +2327,7 @@ def _check_key_revocation(self, public_key, url): raise AuthError(403, "Third party certificate was invalid") @defer.inlineCallbacks - def _persist_events(self, event_and_contexts, backfilled=False): + def persist_events_and_notify(self, event_and_contexts, backfilled=False): """Persists events and tells the notifier/pushers about them, if necessary. diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 7b0b1cd32ed1..2ddd18f73b31 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -17,13 +17,10 @@ from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership from synapse.events import FrozenEvent from synapse.events.snapshot import EventContext from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint -from synapse.types import UserID -from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -55,9 +52,7 @@ def __init__(self, hs): self.store = hs.get_datastore() self.clock = hs.get_clock() - self.is_mine_id = hs.is_mine_id - self.notifier = hs.get_notifier() - self.pusher_pool = hs.get_pusherpool() + self.federation_handler = hs.get_handlers().federation_handler @staticmethod @defer.inlineCallbacks @@ -114,45 +109,12 @@ def _handle_request(self, request): len(event_and_contexts), ) - max_stream_id = yield self.store.persist_events( - event_and_contexts, - backfilled=backfilled + yield self.federation_handler.persist_events_and_notify( + event_and_contexts, backfilled, ) - if not backfilled: - for event, _ in event_and_contexts: - self._notify_persisted_event(event, max_stream_id) - defer.returnValue((200, {})) - def _notify_persisted_event(self, event, max_stream_id): - extra_users = [] - if event.type == EventTypes.Member: - target_user_id = event.state_key - - # We notify for memberships if its an invite for one of our - # users - if event.internal_metadata.is_outlier(): - if event.membership != Membership.INVITE: - if not self.is_mine_id(target_user_id): - return - - target_user = UserID.from_string(target_user_id) - extra_users.append(target_user) - elif event.internal_metadata.is_outlier(): - return - - event_stream_id = event.internal_metadata.stream_ordering - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) - - run_in_background( - self.pusher_pool.on_new_notifications, - event_stream_id, max_stream_id, - ) - class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): """Handles EDUs newly received from federation, including persisting and