Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't build handlers on workers unnecessarily #2979

Merged
merged 7 commits into from
Mar 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion synapse/app/client_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ def start(config_options):
)

ss.setup()
ss.get_handlers()
ss.start_listening(config.worker_listeners)

def start():
Expand Down
1 change: 0 additions & 1 deletion synapse/app/event_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ def start(config_options):
)

ss.setup()
ss.get_handlers()
ss.start_listening(config.worker_listeners)

def start():
Expand Down
1 change: 0 additions & 1 deletion synapse/app/federation_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ def start(config_options):
)

ss.setup()
ss.get_handlers()
ss.start_listening(config.worker_listeners)

def start():
Expand Down
1 change: 0 additions & 1 deletion synapse/app/frontend_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ def start(config_options):
)

ss.setup()
ss.get_handlers()
ss.start_listening(config.worker_listeners)

def start():
Expand Down
2 changes: 1 addition & 1 deletion synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def start():
hs.get_state_handler().start_caching()
hs.get_datastore().start_profiling()
hs.get_datastore().start_doing_background_updates()
hs.get_replication_layer().start_get_pdu_cache()
hs.get_replication_client().start_get_pdu_cache()

register_memory_metrics(hs)

Expand Down
1 change: 0 additions & 1 deletion synapse/app/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ def start(config_options):
)

ss.setup()
ss.get_handlers()
ss.start_listening(config.worker_listeners)

def start():
Expand Down
8 changes: 0 additions & 8 deletions synapse/federation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,3 @@

""" This package includes all the federation specific logic.
"""

from .replication import ReplicationLayer


def initialize_http_replication(hs):
transport = hs.get_federation_transport_client()

return ReplicationLayer(hs, transport)
6 changes: 6 additions & 0 deletions synapse/federation/federation_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@

class FederationBase(object):
def __init__(self, hs):
self.hs = hs

self.server_name = hs.hostname
self.keyring = hs.get_keyring()
self.spam_checker = hs.get_spam_checker()
self.store = hs.get_datastore()
self._clock = hs.get_clock()

@defer.inlineCallbacks
def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
Expand Down
1 change: 1 addition & 0 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, hs):
self._clear_tried_cache, 60 * 1000,
)
self.state = hs.get_state_handler()
self.transport_layer = hs.get_federation_transport_client()

def _clear_tried_cache(self):
"""Clear pdu_destination_tried cache"""
Expand Down
129 changes: 74 additions & 55 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import simplejson as json
from twisted.internet import defer

from synapse.api.errors import AuthError, FederationError, SynapseError
from synapse.api.errors import AuthError, FederationError, SynapseError, NotFoundError
from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import (
FederationBase,
event_from_pdu_json,
)

from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
import synapse.metrics
from synapse.types import get_domain_from_id
Expand Down Expand Up @@ -52,50 +54,19 @@ def __init__(self, hs):
super(FederationServer, self).__init__(hs)

self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler

self._server_linearizer = async.Linearizer("fed_server")
self._transaction_linearizer = async.Linearizer("fed_txn_handler")

self.transaction_actions = TransactionActions(self.store)

self.registry = hs.get_federation_registry()

# We cache responses to state queries, as they take a while and often
# come in waves.
self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)

def set_handler(self, handler):
"""Sets the handler that the replication layer will use to communicate
receipt of new PDUs from other home servers. The required methods are
documented on :py:class:`.ReplicationHandler`.
"""
self.handler = handler

def register_edu_handler(self, edu_type, handler):
if edu_type in self.edu_handlers:
raise KeyError("Already have an EDU handler for %s" % (edu_type,))

self.edu_handlers[edu_type] = handler

def register_query_handler(self, query_type, handler):
"""Sets the handler callable that will be used to handle an incoming
federation Query of the given type.

Args:
query_type (str): Category name of the query, which should match
the string used by make_query.
handler (callable): Invoked to handle incoming queries of this type

handler is invoked as:
result = handler(args)

where 'args' is a dict mapping strings to strings of the query
arguments. It should return a Deferred that will eventually yield an
object to encode as JSON.
"""
if query_type in self.query_handlers:
raise KeyError(
"Already have a Query handler for %s" % (query_type,)
)

self.query_handlers[query_type] = handler

@defer.inlineCallbacks
@log_function
def on_backfill_request(self, origin, room_id, versions, limit):
Expand Down Expand Up @@ -229,16 +200,7 @@ def process_pdus_for_room(room_id):
@defer.inlineCallbacks
def received_edu(self, origin, edu_type, content):
received_edus_counter.inc()

if edu_type in self.edu_handlers:
try:
yield self.edu_handlers[edu_type](origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception as e:
logger.exception("Failed to handle edu %r", edu_type)
else:
logger.warn("Received EDU of type %s with no handler", edu_type)
yield self.registry.on_edu(edu_type, origin, content)

@defer.inlineCallbacks
@log_function
Expand Down Expand Up @@ -328,14 +290,8 @@ def on_pull_request(self, origin, versions):
@defer.inlineCallbacks
def on_query_request(self, query_type, args):
received_queries_counter.inc(query_type)

if query_type in self.query_handlers:
response = yield self.query_handlers[query_type](args)
defer.returnValue((200, response))
else:
defer.returnValue(
(404, "No handler for Query type '%s'" % (query_type,))
)
resp = yield self.registry.on_query(query_type, args)
defer.returnValue((200, resp))

@defer.inlineCallbacks
def on_make_join_request(self, room_id, user_id):
Expand Down Expand Up @@ -607,3 +563,66 @@ def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
origin, room_id, event_dict
)
defer.returnValue(ret)


class FederationHandlerRegistry(object):
"""Allows classes to register themselves as handlers for a given EDU or
query type for incoming federation traffic.
"""
def __init__(self):
self.edu_handlers = {}
self.query_handlers = {}

def register_edu_handler(self, edu_type, handler):
"""Sets the handler callable that will be used to handle an incoming
federation EDU of the given type.

Args:
edu_type (str): The type of the incoming EDU to register handler for
handler (Callable[str, dict]): A callable invoked on incoming EDU
of the given type. The arguments are the origin server name and
the EDU contents.
"""
if edu_type in self.edu_handlers:
raise KeyError("Already have an EDU handler for %s" % (edu_type,))

self.edu_handlers[edu_type] = handler

def register_query_handler(self, query_type, handler):
"""Sets the handler callable that will be used to handle an incoming
federation query of the given type.

Args:
query_type (str): Category name of the query, which should match
the string used by make_query.
handler (Callable[dict] -> Deferred[dict]): Invoked to handle
incoming queries of this type. The return will be yielded
on and the result used as the response to the query request.
"""
if query_type in self.query_handlers:
raise KeyError(
"Already have a Query handler for %s" % (query_type,)
)

self.query_handlers[query_type] = handler

@defer.inlineCallbacks
def on_edu(self, edu_type, origin, content):
handler = self.edu_handlers.get(edu_type)
if not handler:
logger.warn("No handler registered for EDU type %s", edu_type)

try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception as e:
logger.exception("Failed to handle edu %r", edu_type)

def on_query(self, query_type, args):
handler = self.query_handlers.get(query_type)
if not handler:
logger.warn("No handler registered for query type %s", query_type)
raise NotFoundError("No handler for Query type '%s'" % (query_type,))

return handler(args)
73 changes: 0 additions & 73 deletions synapse/federation/replication.py

This file was deleted.

2 changes: 1 addition & 1 deletion synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ def on_POST(self, origin, content, query):
def register_servlets(hs, resource, authenticator, ratelimiter):
for servletclass in FEDERATION_SERVLET_CLASSES:
servletclass(
handler=hs.get_replication_layer(),
handler=hs.get_replication_server(),
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
Expand Down
9 changes: 5 additions & 4 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ def __init__(self, hs):
self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler()
self.federation_sender = hs.get_federation_sender()
self.federation = hs.get_replication_layer()

self._edu_updater = DeviceListEduUpdater(hs, self)

self.federation.register_edu_handler(
federation_registry = hs.get_federation_registry()

federation_registry.register_edu_handler(
"m.device_list_update", self._edu_updater.incoming_device_list_update,
)
self.federation.register_query_handler(
federation_registry.register_query_handler(
"user_devices", self.on_federation_query_user_devices,
)

Expand Down Expand Up @@ -430,7 +431,7 @@ class DeviceListEduUpdater(object):

def __init__(self, hs, device_handler):
self.store = hs.get_datastore()
self.federation = hs.get_replication_layer()
self.federation = hs.get_replication_client()
self.clock = hs.get_clock()
self.device_handler = device_handler

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, hs):
self.is_mine = hs.is_mine
self.federation = hs.get_federation_sender()

hs.get_replication_layer().register_edu_handler(
hs.get_federation_registry().register_edu_handler(
"m.direct_to_device", self.on_direct_to_device_edu
)

Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ def __init__(self, hs):
self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler()

self.federation = hs.get_replication_layer()
self.federation.register_query_handler(
self.federation = hs.get_replication_client()
hs.get_federation_registry().register_query_handler(
"directory", self.on_directory_query
)

Expand Down
Loading