Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Split out edu/query registration to a separate class #2976

Merged
merged 3 commits into from
Mar 13, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 69 additions & 48 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import simplejson as json
from twisted.internet import defer

from synapse.api.errors import AuthError, FederationError, SynapseError
from synapse.api.errors import AuthError, FederationError, SynapseError, NotFoundError
from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import (
FederationBase,
Expand Down Expand Up @@ -56,6 +56,8 @@ def __init__(self, hs):
self._server_linearizer = async.Linearizer("fed_server")
self._transaction_linearizer = async.Linearizer("fed_txn_handler")

self.registry = hs.get_federation_registry()

# We cache responses to state queries, as they take a while and often
# come in waves.
self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
Expand All @@ -67,35 +69,6 @@ def set_handler(self, handler):
"""
self.handler = handler

def register_edu_handler(self, edu_type, handler):
if edu_type in self.edu_handlers:
raise KeyError("Already have an EDU handler for %s" % (edu_type,))

self.edu_handlers[edu_type] = handler

def register_query_handler(self, query_type, handler):
"""Sets the handler callable that will be used to handle an incoming
federation Query of the given type.

Args:
query_type (str): Category name of the query, which should match
the string used by make_query.
handler (callable): Invoked to handle incoming queries of this type

handler is invoked as:
result = handler(args)

where 'args' is a dict mapping strings to strings of the query
arguments. It should return a Deferred that will eventually yield an
object to encode as JSON.
"""
if query_type in self.query_handlers:
raise KeyError(
"Already have a Query handler for %s" % (query_type,)
)

self.query_handlers[query_type] = handler

@defer.inlineCallbacks
@log_function
def on_backfill_request(self, origin, room_id, versions, limit):
Expand Down Expand Up @@ -229,16 +202,7 @@ def process_pdus_for_room(room_id):
@defer.inlineCallbacks
def received_edu(self, origin, edu_type, content):
received_edus_counter.inc()

if edu_type in self.edu_handlers:
try:
yield self.edu_handlers[edu_type](origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception as e:
logger.exception("Failed to handle edu %r", edu_type)
else:
logger.warn("Received EDU of type %s with no handler", edu_type)
yield self.registry.on_edu(edu_type, origin, content)

@defer.inlineCallbacks
@log_function
Expand Down Expand Up @@ -328,14 +292,8 @@ def on_pull_request(self, origin, versions):
@defer.inlineCallbacks
def on_query_request(self, query_type, args):
received_queries_counter.inc(query_type)

if query_type in self.query_handlers:
response = yield self.query_handlers[query_type](args)
defer.returnValue((200, response))
else:
defer.returnValue(
(404, "No handler for Query type '%s'" % (query_type,))
)
resp = yield self.registry.on_query(query_type, args)
defer.returnValue((200, resp))

@defer.inlineCallbacks
def on_make_join_request(self, room_id, user_id):
Expand Down Expand Up @@ -607,3 +565,66 @@ def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
origin, room_id, event_dict
)
defer.returnValue(ret)


class FederationHandlerRegistry(object):
"""Allows classes to register themselves as handlers for a given EDU or
query type for incoming federation traffic.
"""
def __init__(self):
self.edu_handlers = {}
self.query_handlers = {}

def register_edu_handler(self, edu_type, handler):
"""Sets the handler callable that will be used to handle an incoming
federation EDU of the given type.

Args:
edu_type (str): The type of the incoming EDU to register handler for
handler (Callable[str, dict]): A callable invoked on incoming EDU
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Callable[str, dict] means "a thing that takes a str and returns a dict". You probably want Callable[[str, dict], Deferred[None]] or something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I misread the thingy. Ta.

of the given type. The arguments are the origin server name and
the EDU contents.
"""
if edu_type in self.edu_handlers:
raise KeyError("Already have an EDU handler for %s" % (edu_type,))

self.edu_handlers[edu_type] = handler

def register_query_handler(self, query_type, handler):
"""Sets the handler callable that will be used to handle an incoming
federation query of the given type.

Args:
query_type (str): Category name of the query, which should match
the string used by make_query.
handler (Callable[dict] -> Deferred[dict]): Invoked to handle
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callable[dict, Deferred[dict]]

incoming queries of this type. The return will be yielded
on and the result used as the response to the query request.
"""
if query_type in self.query_handlers:
raise KeyError(
"Already have a Query handler for %s" % (query_type,)
)

self.query_handlers[query_type] = handler

@defer.inlineCallbacks
def on_edu(self, edu_type, origin, content):
handler = self.edu_handlers.get(edu_type)
if not handler:
logger.warn("No handler registered for EDU type %s", edu_type)

try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception as e:
logger.exception("Failed to handle edu %r", edu_type)

def on_query(self, query_type, args):
handler = self.query_handlers.get(query_type)
if not handler:
logger.warn("No handler registered for query type %s", query_type)
raise NotFoundError("No handler for Query type '%s'" % (query_type,))

return handler(args)
6 changes: 4 additions & 2 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ def __init__(self, hs):

self._edu_updater = DeviceListEduUpdater(hs, self)

self.federation.register_edu_handler(
federation_registry = hs.get_federation_registry()

federation_registry.register_edu_handler(
"m.device_list_update", self._edu_updater.incoming_device_list_update,
)
self.federation.register_query_handler(
federation_registry.register_query_handler(
"user_devices", self.on_federation_query_user_devices,
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, hs):
self.is_mine = hs.is_mine
self.federation = hs.get_federation_sender()

hs.get_replication_layer().register_edu_handler(
hs.get_federation_registry().register_edu_handler(
"m.direct_to_device", self.on_direct_to_device_edu
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, hs):
self.event_creation_handler = hs.get_event_creation_handler()

self.federation = hs.get_replication_layer()
self.federation.register_query_handler(
hs.get_federation_registry().register_query_handler(
"directory", self.on_directory_query
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self, hs):
# doesn't really work as part of the generic query API, because the
# query request requires an object POST, but we abuse the
# "query handler" interface.
self.federation.register_query_handler(
hs.get_federation_registry().register_query_handler(
"client_keys", self.on_federation_query_client_keys
)

Expand Down
10 changes: 6 additions & 4 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,26 @@ def __init__(self, hs):

self.state = hs.get_state_handler()

self.replication.register_edu_handler(
federation_registry = hs.get_federation_registry()

federation_registry.register_edu_handler(
"m.presence", self.incoming_presence
)
self.replication.register_edu_handler(
federation_registry.register_edu_handler(
"m.presence_invite",
lambda origin, content: self.invite_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
self.replication.register_edu_handler(
federation_registry.register_edu_handler(
"m.presence_accept",
lambda origin, content: self.accept_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
self.replication.register_edu_handler(
federation_registry.register_edu_handler(
"m.presence_deny",
lambda origin, content: self.deny_presence(
observed_user=UserID.from_string(content["observed_user"]),
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, hs):
super(ProfileHandler, self).__init__(hs)

self.federation = hs.get_replication_layer()
self.federation.register_query_handler(
hs.get_federation_registry().register_query_handler(
"profile", self.on_profile_query
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(self, hs):
self.store = hs.get_datastore()
self.hs = hs
self.federation = hs.get_federation_sender()
hs.get_replication_layer().register_edu_handler(
hs.get_federation_registry().register_edu_handler(
"m.receipt", self._received_remote_receipt
)
self.clock = self.hs.get_clock()
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(self, hs):

self.federation = hs.get_federation_sender()

hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu)
hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)

hs.get_distributor().observe("user_left_room", self.user_left_room)

Expand Down
5 changes: 5 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from synapse.events.spamcheck import SpamChecker
from synapse.federation import initialize_http_replication
from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.federation_server import FederationHandlerRegistry
from synapse.federation.transport.client import TransportLayerClient
from synapse.federation.transaction_queue import TransactionQueue
from synapse.handlers import Handlers
Expand Down Expand Up @@ -147,6 +148,7 @@ def build_DEPENDENCY(self)
'groups_attestation_renewer',
'spam_checker',
'room_member_handler',
'federation_registry',
]

def __init__(self, hostname, **kwargs):
Expand Down Expand Up @@ -387,6 +389,9 @@ def build_spam_checker(self):
def build_room_member_handler(self):
return RoomMemberHandler(self)

def build_federation_registry(self):
return FederationHandlerRegistry()

def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)

Expand Down
9 changes: 4 additions & 5 deletions tests/handlers/test_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,20 @@ class DirectoryTestCase(unittest.TestCase):

@defer.inlineCallbacks
def setUp(self):
self.mock_federation = Mock(spec=[
"make_query",
"register_edu_handler",
])
self.mock_federation = Mock()
self.mock_registry = Mock()

self.query_handlers = {}

def register_query_handler(query_type, handler):
self.query_handlers[query_type] = handler
self.mock_federation.register_query_handler = register_query_handler
self.mock_registry.register_query_handler = register_query_handler

hs = yield setup_test_homeserver(
http_client=None,
resource_for_federation=Mock(),
replication_layer=self.mock_federation,
federation_registry=self.mock_registry,
)
hs.handlers = DirectoryHandlers(hs)

Expand Down
9 changes: 4 additions & 5 deletions tests/handlers/test_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,22 @@ class ProfileTestCase(unittest.TestCase):

@defer.inlineCallbacks
def setUp(self):
self.mock_federation = Mock(spec=[
"make_query",
"register_edu_handler",
])
self.mock_federation = Mock()
self.mock_registry = Mock()

self.query_handlers = {}

def register_query_handler(query_type, handler):
self.query_handlers[query_type] = handler

self.mock_federation.register_query_handler = register_query_handler
self.mock_registry.register_query_handler = register_query_handler

hs = yield setup_test_homeserver(
http_client=None,
handlers=None,
resource_for_federation=Mock(),
replication_layer=self.mock_federation,
federation_registry=self.mock_registry,
ratelimiter=NonCallableMock(spec_set=[
"send_message",
])
Expand Down