Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/fa…
Browse files Browse the repository at this point in the history
…ctor_remote_leave
  • Loading branch information
erikjohnston committed Mar 13, 2018
2 parents ea3442c + 16469a4 commit bf8e97b
Show file tree
Hide file tree
Showing 21 changed files with 154 additions and 120 deletions.
6 changes: 6 additions & 0 deletions synapse/federation/federation_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@

class FederationBase(object):
def __init__(self, hs):
self.hs = hs

self.server_name = hs.hostname
self.keyring = hs.get_keyring()
self.spam_checker = hs.get_spam_checker()
self.store = hs.get_datastore()
self._clock = hs.get_clock()

@defer.inlineCallbacks
def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
Expand Down
1 change: 1 addition & 0 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, hs):
self._clear_tried_cache, 60 * 1000,
)
self.state = hs.get_state_handler()
self.transport_layer = hs.get_federation_transport_client()

def _clear_tried_cache(self):
"""Clear pdu_destination_tried cache"""
Expand Down
123 changes: 75 additions & 48 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import simplejson as json
from twisted.internet import defer

from synapse.api.errors import AuthError, FederationError, SynapseError
from synapse.api.errors import AuthError, FederationError, SynapseError, NotFoundError
from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import (
FederationBase,
event_from_pdu_json,
)

from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
import synapse.metrics
from synapse.types import get_domain_from_id
Expand Down Expand Up @@ -56,6 +58,12 @@ def __init__(self, hs):
self._server_linearizer = async.Linearizer("fed_server")
self._transaction_linearizer = async.Linearizer("fed_txn_handler")

self.transaction_actions = TransactionActions(self.store)

self.handler = None

self.registry = hs.get_federation_registry()

# We cache responses to state queries, as they take a while and often
# come in waves.
self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
Expand All @@ -67,35 +75,6 @@ def set_handler(self, handler):
"""
self.handler = handler

def register_edu_handler(self, edu_type, handler):
if edu_type in self.edu_handlers:
raise KeyError("Already have an EDU handler for %s" % (edu_type,))

self.edu_handlers[edu_type] = handler

def register_query_handler(self, query_type, handler):
"""Sets the handler callable that will be used to handle an incoming
federation Query of the given type.
Args:
query_type (str): Category name of the query, which should match
the string used by make_query.
handler (callable): Invoked to handle incoming queries of this type
handler is invoked as:
result = handler(args)
where 'args' is a dict mapping strings to strings of the query
arguments. It should return a Deferred that will eventually yield an
object to encode as JSON.
"""
if query_type in self.query_handlers:
raise KeyError(
"Already have a Query handler for %s" % (query_type,)
)

self.query_handlers[query_type] = handler

@defer.inlineCallbacks
@log_function
def on_backfill_request(self, origin, room_id, versions, limit):
Expand Down Expand Up @@ -229,16 +208,7 @@ def process_pdus_for_room(room_id):
@defer.inlineCallbacks
def received_edu(self, origin, edu_type, content):
received_edus_counter.inc()

if edu_type in self.edu_handlers:
try:
yield self.edu_handlers[edu_type](origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception as e:
logger.exception("Failed to handle edu %r", edu_type)
else:
logger.warn("Received EDU of type %s with no handler", edu_type)
yield self.registry.on_edu(edu_type, origin, content)

@defer.inlineCallbacks
@log_function
Expand Down Expand Up @@ -328,14 +298,8 @@ def on_pull_request(self, origin, versions):
@defer.inlineCallbacks
def on_query_request(self, query_type, args):
received_queries_counter.inc(query_type)

if query_type in self.query_handlers:
response = yield self.query_handlers[query_type](args)
defer.returnValue((200, response))
else:
defer.returnValue(
(404, "No handler for Query type '%s'" % (query_type,))
)
resp = yield self.registry.on_query(query_type, args)
defer.returnValue((200, resp))

@defer.inlineCallbacks
def on_make_join_request(self, room_id, user_id):
Expand Down Expand Up @@ -607,3 +571,66 @@ def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
origin, room_id, event_dict
)
defer.returnValue(ret)


class FederationHandlerRegistry(object):
"""Allows classes to register themselves as handlers for a given EDU or
query type for incoming federation traffic.
"""
def __init__(self):
self.edu_handlers = {}
self.query_handlers = {}

def register_edu_handler(self, edu_type, handler):
"""Sets the handler callable that will be used to handle an incoming
federation EDU of the given type.
Args:
edu_type (str): The type of the incoming EDU to register handler for
handler (Callable[[str, dict]]): A callable invoked on incoming EDU
of the given type. The arguments are the origin server name and
the EDU contents.
"""
if edu_type in self.edu_handlers:
raise KeyError("Already have an EDU handler for %s" % (edu_type,))

self.edu_handlers[edu_type] = handler

def register_query_handler(self, query_type, handler):
"""Sets the handler callable that will be used to handle an incoming
federation query of the given type.
Args:
query_type (str): Category name of the query, which should match
the string used by make_query.
handler (Callable[[dict], Deferred[dict]]): Invoked to handle
incoming queries of this type. The return will be yielded
on and the result used as the response to the query request.
"""
if query_type in self.query_handlers:
raise KeyError(
"Already have a Query handler for %s" % (query_type,)
)

self.query_handlers[query_type] = handler

@defer.inlineCallbacks
def on_edu(self, edu_type, origin, content):
handler = self.edu_handlers.get(edu_type)
if not handler:
logger.warn("No handler registered for EDU type %s", edu_type)

try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception as e:
logger.exception("Failed to handle edu %r", edu_type)

def on_query(self, query_type, args):
handler = self.query_handlers.get(query_type)
if not handler:
logger.warn("No handler registered for query type %s", query_type)
raise NotFoundError("No handler for Query type '%s'" % (query_type,))

return handler(args)
22 changes: 0 additions & 22 deletions synapse/federation/replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
from .federation_client import FederationClient
from .federation_server import FederationServer

from .persistence import TransactionActions

import logging


Expand All @@ -47,26 +45,6 @@ class ReplicationLayer(FederationClient, FederationServer):
"""

def __init__(self, hs, transport_layer):
self.server_name = hs.hostname

self.keyring = hs.get_keyring()

self.transport_layer = transport_layer

self.federation_client = self

self.store = hs.get_datastore()

self.handler = None
self.edu_handlers = {}
self.query_handlers = {}

self._clock = hs.get_clock()

self.transaction_actions = TransactionActions(self.store)

self.hs = hs

super(ReplicationLayer, self).__init__(hs)

def __str__(self):
Expand Down
6 changes: 4 additions & 2 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ def __init__(self, hs):

self._edu_updater = DeviceListEduUpdater(hs, self)

self.federation.register_edu_handler(
federation_registry = hs.get_federation_registry()

federation_registry.register_edu_handler(
"m.device_list_update", self._edu_updater.incoming_device_list_update,
)
self.federation.register_query_handler(
federation_registry.register_query_handler(
"user_devices", self.on_federation_query_user_devices,
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, hs):
self.is_mine = hs.is_mine
self.federation = hs.get_federation_sender()

hs.get_replication_layer().register_edu_handler(
hs.get_federation_registry().register_edu_handler(
"m.direct_to_device", self.on_direct_to_device_edu
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, hs):
self.event_creation_handler = hs.get_event_creation_handler()

self.federation = hs.get_replication_layer()
self.federation.register_query_handler(
hs.get_federation_registry().register_query_handler(
"directory", self.on_directory_query
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self, hs):
# doesn't really work as part of the generic query API, because the
# query request requires an object POST, but we abuse the
# "query handler" interface.
self.federation.register_query_handler(
hs.get_federation_registry().register_query_handler(
"client_keys", self.on_federation_query_client_keys
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ def handle_new_client_event(
event (FrozenEvent)
context (EventContext)
ratelimit (bool)
extra_users (list(str)): Any extra users to notify about event
extra_users (list(UserID)): Any extra users to notify about event
"""

try:
Expand Down
10 changes: 6 additions & 4 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,26 @@ def __init__(self, hs):

self.state = hs.get_state_handler()

self.replication.register_edu_handler(
federation_registry = hs.get_federation_registry()

federation_registry.register_edu_handler(
"m.presence", self.incoming_presence
)
self.replication.register_edu_handler(
federation_registry.register_edu_handler(
"m.presence_invite",
lambda origin, content: self.invite_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
self.replication.register_edu_handler(
federation_registry.register_edu_handler(
"m.presence_accept",
lambda origin, content: self.accept_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
self.replication.register_edu_handler(
federation_registry.register_edu_handler(
"m.presence_deny",
lambda origin, content: self.deny_presence(
observed_user=UserID.from_string(content["observed_user"]),
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, hs):
super(ProfileHandler, self).__init__(hs)

self.federation = hs.get_replication_layer()
self.federation.register_query_handler(
hs.get_federation_registry().register_query_handler(
"profile", self.on_profile_query
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(self, hs):
self.store = hs.get_datastore()
self.hs = hs
self.federation = hs.get_federation_sender()
hs.get_replication_layer().register_edu_handler(
hs.get_federation_registry().register_edu_handler(
"m.receipt", self._received_remote_receipt
)
self.clock = self.hs.get_clock()
Expand Down
26 changes: 22 additions & 4 deletions synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,16 +446,34 @@ def auth_handler(self):
return self.hs.get_auth_handler()

@defer.inlineCallbacks
def guest_access_token_for(self, medium, address, inviter_user_id):
def get_or_register_3pid_guest(self, medium, address, inviter_user_id):
"""Get a guest access token for a 3PID, creating a guest account if
one doesn't already exist.
Args:
medium (str)
address (str)
inviter_user_id (str): The user ID who is trying to invite the
3PID
Returns:
Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
3PID guest account.
"""
access_token = yield self.store.get_3pid_guest_access_token(medium, address)
if access_token:
defer.returnValue(access_token)
user_info = yield self.auth.get_user_by_access_token(
access_token
)

_, access_token = yield self.register(
defer.returnValue((user_info["user"].to_string(), access_token))

user_id, access_token = yield self.register(
generate_token=True,
make_guest=True
)
access_token = yield self.store.save_or_get_3pid_guest_access_token(
medium, address, access_token, inviter_user_id
)
defer.returnValue(access_token)

defer.returnValue((user_id, access_token))
Loading

0 comments on commit bf8e97b

Please sign in to comment.