From b78717b87ba3fc248838db522f007c13d0cd8c76 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 13:49:13 +0000 Subject: [PATCH 1/5] Split RoomMemberHandler into base and master class The intention here is to split the class into the bits that can be done on workers and the bits that have to be done on the master. In future there will also be a class that can be run on the worker, which will delegate work to the master when necessary. --- synapse/handlers/room_member.py | 231 +++++++++++++++++++------------- synapse/server.py | 6 +- 2 files changed, 139 insertions(+), 98 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 6ee8420d1fec..ce83d5645145 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -14,9 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from twisted.internet import defer @@ -31,6 +28,10 @@ from synapse.util.async import Linearizer from synapse.util.distributor import user_left_room, user_joined_room +import abc +import logging + + logger = logging.getLogger(__name__) id_server_scheme = "https://" @@ -42,6 +43,8 @@ class RoomMemberHandler(object): # API that takes ID strings and returns pagination chunks. These concerns # ought to be separated out a lot better. + __metaclass__ = abc.ABCMeta + def __init__(self, hs): self.hs = hs self.store = hs.get_datastore() @@ -62,9 +65,56 @@ def __init__(self, hs): self.clock = hs.get_clock() self.spam_checker = hs.get_spam_checker() - self.distributor = hs.get_distributor() - self.distributor.declare("user_joined_room") - self.distributor.declare("user_left_room") + @abc.abstractmethod + def _remote_join(self, requester, remote_room_hosts, room_id, user, content): + """Try and join a room that this server is not in + + Args: + remote_room_hosts (list[str]): List of servers that can be used + to join via. + room_id (str): Room that we are trying to join + user (UserID): User who is trying to join + content (dict): A dict that should be used as the content of the + join event. + + Returns: + Deferred + """ + raise NotImplementedError() + + @abc.abstractmethod + def _remote_reject_invite(self, remote_room_hosts, room_id, target): + """Attempt to reject an invite for a room this server is not in. If we + fail to do so we locally mark the invite as rejected. + + Args: + remote_room_hosts (list[str]): List of servers to use to try and + reject invite + room_id (str) + target (UserID): The user rejecting the invite + + Returns: + Deferred[dict]: A dictionary to be returned to the client, may + include event_id etc, or nothing if we locally rejected + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): + """Get a guest access token for a 3PID, creating a guest account if + one doesn't already exist. + + Args: + medium (str) + address (str) + inviter_user_id (str): The user ID who is trying to invite the + 3PID + + Returns: + Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the + 3PID guest account. + """ + raise NotImplementedError() @defer.inlineCallbacks def _local_membership_update( @@ -137,73 +187,6 @@ def _local_membership_update( defer.returnValue(event) - @defer.inlineCallbacks - def _remote_join(self, remote_room_hosts, room_id, user, content): - """Try and join a room that this server is not in - - Args: - remote_room_hosts (list[str]): List of servers that can be used - to join via. - room_id (str): Room that we are trying to join - user (UserID): User who is trying to join - content (dict): A dict that should be used as the content of the - join event. - - Returns: - Deferred - """ - if len(remote_room_hosts) == 0: - raise SynapseError(404, "No known servers") - - # We don't do an auth check if we are doing an invite - # join dance for now, since we're kinda implicitly checking - # that we are allowed to join when we decide whether or not we - # need to do the invite/join dance. - yield self.federation_handler.do_invite_join( - remote_room_hosts, - room_id, - user.to_string(), - content, - ) - yield user_joined_room(self.distributor, user, room_id) - - @defer.inlineCallbacks - def _remote_reject_invite(self, remote_room_hosts, room_id, target): - """Attempt to reject an invite for a room this server is not in. If we - fail to do so we locally mark the invite as rejected. - - Args: - remote_room_hosts (list[str]): List of servers to use to try and - reject invite - room_id (str) - target (UserID): The user rejecting the invite - - Returns: - Deferred[dict]: A dictionary to be returned to the client, may - include event_id etc, or nothing if we locally rejected - """ - fed_handler = self.federation_handler - try: - ret = yield fed_handler.do_remotely_reject_invite( - remote_room_hosts, - room_id, - target.to_string(), - ) - defer.returnValue(ret) - except Exception as e: - # if we were unable to reject the exception, just mark - # it as rejected on our end and plough ahead. - # - # The 'except' clause is very broad, but we need to - # capture everything from DNS failures upwards - # - logger.warn("Failed to reject invite: %s", e) - - yield self.store.locally_reject_invite( - target.to_string(), room_id - ) - defer.returnValue({}) - @defer.inlineCallbacks def update_membership( self, @@ -673,6 +656,7 @@ def _make_and_store_3pid_invite( token, public_keys, fallback_public_key, display_name = ( yield self._ask_id_server_for_third_party_invite( + requester=requester, id_server=id_server, medium=medium, address=address, @@ -709,6 +693,7 @@ def _make_and_store_3pid_invite( @defer.inlineCallbacks def _ask_id_server_for_third_party_invite( self, + requester, id_server, medium, address, @@ -725,6 +710,7 @@ def _ask_id_server_for_third_party_invite( Asks an identity server for a third party invite. Args: + requester (Requester) id_server (str): hostname + optional port for the identity server. medium (str): The literal string "email". address (str): The third party address being invited. @@ -767,8 +753,8 @@ def _ask_id_server_for_third_party_invite( } if self.config.invite_3pid_guest: - rh = self.registration_handler - guest_user_id, guest_access_token = yield rh.get_or_register_3pid_guest( + guest_access_token, guest_user_id = yield self.get_or_register_3pid_guest( + requester=requester, medium=medium, address=address, inviter_user_id=inviter_user_id, @@ -801,27 +787,6 @@ def _ask_id_server_for_third_party_invite( display_name = data["display_name"] defer.returnValue((token, public_keys, fallback_public_key, display_name)) - @defer.inlineCallbacks - def forget(self, user, room_id): - user_id = user.to_string() - - member = yield self.state_handler.get_current_state( - room_id=room_id, - event_type=EventTypes.Member, - state_key=user_id - ) - membership = member.membership if member else None - - if membership is not None and membership not in [ - Membership.LEAVE, Membership.BAN - ]: - raise SynapseError(400, "User %s in room %s" % ( - user_id, room_id - )) - - if membership: - yield self.store.forget(user_id, room_id) - @defer.inlineCallbacks def _is_host_in_room(self, current_state_ids): # Have we just created the room, and is this about to be the very @@ -843,3 +808,77 @@ def _is_host_in_room(self, current_state_ids): defer.returnValue(True) defer.returnValue(False) + + +class RoomMemberMasterHandler(RoomMemberHandler): + @defer.inlineCallbacks + def _remote_join(self, remote_room_hosts, room_id, user, content): + """Implements RoomMemberHandler._remote_join + """ + if len(remote_room_hosts) == 0: + raise SynapseError(404, "No known servers") + + # We don't do an auth check if we are doing an invite + # join dance for now, since we're kinda implicitly checking + # that we are allowed to join when we decide whether or not we + # need to do the invite/join dance. + yield self.federation_handler.do_invite_join( + remote_room_hosts, + room_id, + user.to_string(), + content, + ) + yield user_joined_room(self.distributor, user, room_id) + + @defer.inlineCallbacks + def _remote_reject_invite(self, remote_room_hosts, room_id, target): + """Implements RoomMemberHandler._remote_reject_invite + """ + fed_handler = self.federation_handler + try: + ret = yield fed_handler.do_remotely_reject_invite( + remote_room_hosts, + room_id, + target.to_string(), + ) + defer.returnValue(ret) + except Exception as e: + # if we were unable to reject the exception, just mark + # it as rejected on our end and plough ahead. + # + # The 'except' clause is very broad, but we need to + # capture everything from DNS failures upwards + # + logger.warn("Failed to reject invite: %s", e) + + yield self.store.locally_reject_invite( + target.to_string(), room_id + ) + defer.returnValue({}) + + def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): + """Implements RoomMemberHandler.get_or_register_3pid_guest + """ + rg = self.registration_handler + return rg.get_or_register_3pid_guest(medium, address, inviter_user_id) + + @defer.inlineCallbacks + def forget(self, user, room_id): + user_id = user.to_string() + + member = yield self.state_handler.get_current_state( + room_id=room_id, + event_type=EventTypes.Member, + state_key=user_id + ) + membership = member.membership if member else None + + if membership is not None and membership not in [ + Membership.LEAVE, Membership.BAN + ]: + raise SynapseError(400, "User %s in room %s" % ( + user_id, room_id + )) + + if membership: + yield self.store.forget(user_id, room_id) diff --git a/synapse/server.py b/synapse/server.py index 1bc8d6f702f3..3d47e2793ba1 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -46,7 +46,7 @@ from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.presence import PresenceHandler from synapse.handlers.room_list import RoomListHandler -from synapse.handlers.room_member import RoomMemberHandler +from synapse.handlers.room_member import RoomMemberMasterHandler from synapse.handlers.set_password import SetPasswordHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler @@ -387,7 +387,9 @@ def build_spam_checker(self): return SpamChecker(self) def build_room_member_handler(self): - return RoomMemberHandler(self) + if self.config.worker_app: + return Exception("Can't use RoomMemberHandler on workers") + return RoomMemberMasterHandler(self) def build_federation_registry(self): return FederationHandlerRegistry() From 82f16faa78864a23653ca952072a6f0e906f3367 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 16:00:26 +0000 Subject: [PATCH 2/5] Move user_*_room distributor stuff to master class I added yields when calling user_left_room, but they shouldn't matter on the master process as they always return None anyway. --- synapse/handlers/room_member.py | 55 ++++++++++++++++++++++++++++++--- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ce83d5645145..669210b73d5a 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -116,6 +116,34 @@ def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id """ raise NotImplementedError() + @abc.abstractmethod + def _user_joined_room(self, target, room_id): + """Notifies distributor on master process that the user has joined the + room. + + Args: + target (UserID) + room_id (str) + + Returns: + Deferred|None + """ + raise NotImplementedError() + + @abc.abstractmethod + def _user_left_room(self, target, room_id): + """Notifies distributor on master process that the user has left the + room. + + Args: + target (UserID) + room_id (str) + + Returns: + Deferred|None + """ + raise NotImplementedError() + @defer.inlineCallbacks def _local_membership_update( self, requester, target, room_id, membership, @@ -178,12 +206,12 @@ def _local_membership_update( prev_member_event = yield self.store.get_event(prev_member_event_id) newly_joined = prev_member_event.membership != Membership.JOIN if newly_joined: - yield user_joined_room(self.distributor, target, room_id) + yield self._user_joined_room(target, room_id) elif event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = yield self.store.get_event(prev_member_event_id) if prev_member_event.membership == Membership.JOIN: - user_left_room(self.distributor, target, room_id) + yield self._user_left_room(target, room_id) defer.returnValue(event) @@ -460,12 +488,12 @@ def send_membership_event( prev_member_event = yield self.store.get_event(prev_member_event_id) newly_joined = prev_member_event.membership != Membership.JOIN if newly_joined: - yield user_joined_room(self.distributor, target_user, room_id) + yield self._user_joined_room(target_user, room_id) elif event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = yield self.store.get_event(prev_member_event_id) if prev_member_event.membership == Membership.JOIN: - user_left_room(self.distributor, target_user, room_id) + yield self._user_left_room(target_user, room_id) @defer.inlineCallbacks def _can_guest_join(self, current_state_ids): @@ -811,6 +839,13 @@ def _is_host_in_room(self, current_state_ids): class RoomMemberMasterHandler(RoomMemberHandler): + def __init__(self, hs): + super(RoomMemberMasterHandler, self).__init__(hs) + + self.distributor = hs.get_distributor() + self.distributor.declare("user_joined_room") + self.distributor.declare("user_left_room") + @defer.inlineCallbacks def _remote_join(self, remote_room_hosts, room_id, user, content): """Implements RoomMemberHandler._remote_join @@ -828,7 +863,7 @@ def _remote_join(self, remote_room_hosts, room_id, user, content): user.to_string(), content, ) - yield user_joined_room(self.distributor, user, room_id) + yield self._user_joined_room(user, room_id) @defer.inlineCallbacks def _remote_reject_invite(self, remote_room_hosts, room_id, target): @@ -862,6 +897,16 @@ def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id rg = self.registration_handler return rg.get_or_register_3pid_guest(medium, address, inviter_user_id) + def _user_joined_room(self, target, room_id): + """Implements RoomMemberHandler._user_joined_room + """ + return user_joined_room(self.distributor, target, room_id) + + def _user_left_room(self, target, room_id): + """Implements RoomMemberHandler._user_left_room + """ + return user_left_room(self.distributor, target, room_id) + @defer.inlineCallbacks def forget(self, user, room_id): user_id = user.to_string() From 16adb11cc0aa255f0be98d6446d629e18a0dcefe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 16:57:07 +0000 Subject: [PATCH 3/5] Correct import order --- synapse/handlers/room_member.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 669210b73d5a..78b5feee4f35 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -14,6 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import abc +import logging + from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from twisted.internet import defer @@ -28,9 +31,6 @@ from synapse.util.async import Linearizer from synapse.util.distributor import user_left_room, user_joined_room -import abc -import logging - logger = logging.getLogger(__name__) From 6dbebef1415fd4d0da29ebf6b90a16b9cc877b96 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 17:15:32 +0000 Subject: [PATCH 4/5] Add missing param to docstrings --- synapse/handlers/room_member.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 78b5feee4f35..790e11c57c7c 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -70,6 +70,7 @@ def _remote_join(self, requester, remote_room_hosts, room_id, user, content): """Try and join a room that this server is not in Args: + requester (Requester) remote_room_hosts (list[str]): List of servers that can be used to join via. room_id (str): Room that we are trying to join @@ -88,6 +89,7 @@ def _remote_reject_invite(self, remote_room_hosts, room_id, target): fail to do so we locally mark the invite as rejected. Args: + requester (Requester) remote_room_hosts (list[str]): List of servers to use to try and reject invite room_id (str) @@ -105,6 +107,7 @@ def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id one doesn't already exist. Args: + requester (Requester) medium (str) address (str) inviter_user_id (str): The user ID who is trying to invite the From d45a1148245839c5f9776fee6d463ed69981d729 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 17:24:34 +0000 Subject: [PATCH 5/5] Raise, don't return, exception --- synapse/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/server.py b/synapse/server.py index 3d47e2793ba1..6ffe900b7485 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -388,7 +388,7 @@ def build_spam_checker(self): def build_room_member_handler(self): if self.config.worker_app: - return Exception("Can't use RoomMemberHandler on workers") + raise Exception("Can't use RoomMemberHandler on workers") return RoomMemberMasterHandler(self) def build_federation_registry(self):