Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Split RoomMemberHandler into base and master class #2987

Merged
merged 5 commits into from
Mar 13, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 182 additions & 98 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import abc
import logging

from signedjson.key import decode_verify_key_bytes
Expand All @@ -31,6 +31,7 @@
from synapse.util.async import Linearizer
from synapse.util.distributor import user_left_room, user_joined_room


logger = logging.getLogger(__name__)

id_server_scheme = "https://"
Expand All @@ -42,6 +43,8 @@ class RoomMemberHandler(object):
# API that takes ID strings and returns pagination chunks. These concerns
# ought to be separated out a lot better.

__metaclass__ = abc.ABCMeta

def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
Expand All @@ -62,9 +65,84 @@ def __init__(self, hs):
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()

self.distributor = hs.get_distributor()
self.distributor.declare("user_joined_room")
self.distributor.declare("user_left_room")
@abc.abstractmethod
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Try and join a room that this server is not in

Args:
remote_room_hosts (list[str]): List of servers that can be used
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requester is missing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likewise for some of the other methods

to join via.
room_id (str): Room that we are trying to join
user (UserID): User who is trying to join
content (dict): A dict that should be used as the content of the
join event.

Returns:
Deferred
"""
raise NotImplementedError()

@abc.abstractmethod
def _remote_reject_invite(self, remote_room_hosts, room_id, target):
"""Attempt to reject an invite for a room this server is not in. If we
fail to do so we locally mark the invite as rejected.

Args:
remote_room_hosts (list[str]): List of servers to use to try and
reject invite
room_id (str)
target (UserID): The user rejecting the invite

Returns:
Deferred[dict]: A dictionary to be returned to the client, may
include event_id etc, or nothing if we locally rejected
"""
raise NotImplementedError()

@abc.abstractmethod
def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
"""Get a guest access token for a 3PID, creating a guest account if
one doesn't already exist.

Args:
medium (str)
address (str)
inviter_user_id (str): The user ID who is trying to invite the
3PID

Returns:
Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
3PID guest account.
"""
raise NotImplementedError()

@abc.abstractmethod
def _user_joined_room(self, target, room_id):
"""Notifies distributor on master process that the user has joined the
room.

Args:
target (UserID)
room_id (str)

Returns:
Deferred|None
"""
raise NotImplementedError()

@abc.abstractmethod
def _user_left_room(self, target, room_id):
"""Notifies distributor on master process that the user has left the
room.

Args:
target (UserID)
room_id (str)

Returns:
Deferred|None
"""
raise NotImplementedError()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These need to be docced.


@defer.inlineCallbacks
def _local_membership_update(
Expand Down Expand Up @@ -128,82 +206,15 @@ def _local_membership_update(
prev_member_event = yield self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
if newly_joined:
yield user_joined_room(self.distributor, target, room_id)
yield self._user_joined_room(target, room_id)
elif event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = yield self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
user_left_room(self.distributor, target, room_id)
yield self._user_left_room(target, room_id)

defer.returnValue(event)

@defer.inlineCallbacks
def _remote_join(self, remote_room_hosts, room_id, user, content):
"""Try and join a room that this server is not in

Args:
remote_room_hosts (list[str]): List of servers that can be used
to join via.
room_id (str): Room that we are trying to join
user (UserID): User who is trying to join
content (dict): A dict that should be used as the content of the
join event.

Returns:
Deferred
"""
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")

# We don't do an auth check if we are doing an invite
# join dance for now, since we're kinda implicitly checking
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
yield self.federation_handler.do_invite_join(
remote_room_hosts,
room_id,
user.to_string(),
content,
)
yield user_joined_room(self.distributor, user, room_id)

@defer.inlineCallbacks
def _remote_reject_invite(self, remote_room_hosts, room_id, target):
"""Attempt to reject an invite for a room this server is not in. If we
fail to do so we locally mark the invite as rejected.

Args:
remote_room_hosts (list[str]): List of servers to use to try and
reject invite
room_id (str)
target (UserID): The user rejecting the invite

Returns:
Deferred[dict]: A dictionary to be returned to the client, may
include event_id etc, or nothing if we locally rejected
"""
fed_handler = self.federation_handler
try:
ret = yield fed_handler.do_remotely_reject_invite(
remote_room_hosts,
room_id,
target.to_string(),
)
defer.returnValue(ret)
except Exception as e:
# if we were unable to reject the exception, just mark
# it as rejected on our end and plough ahead.
#
# The 'except' clause is very broad, but we need to
# capture everything from DNS failures upwards
#
logger.warn("Failed to reject invite: %s", e)

yield self.store.locally_reject_invite(
target.to_string(), room_id
)
defer.returnValue({})

@defer.inlineCallbacks
def update_membership(
self,
Expand Down Expand Up @@ -477,12 +488,12 @@ def send_membership_event(
prev_member_event = yield self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
if newly_joined:
yield user_joined_room(self.distributor, target_user, room_id)
yield self._user_joined_room(target_user, room_id)
elif event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = yield self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
user_left_room(self.distributor, target_user, room_id)
yield self._user_left_room(target_user, room_id)

@defer.inlineCallbacks
def _can_guest_join(self, current_state_ids):
Expand Down Expand Up @@ -673,6 +684,7 @@ def _make_and_store_3pid_invite(

token, public_keys, fallback_public_key, display_name = (
yield self._ask_id_server_for_third_party_invite(
requester=requester,
id_server=id_server,
medium=medium,
address=address,
Expand Down Expand Up @@ -709,6 +721,7 @@ def _make_and_store_3pid_invite(
@defer.inlineCallbacks
def _ask_id_server_for_third_party_invite(
self,
requester,
id_server,
medium,
address,
Expand All @@ -725,6 +738,7 @@ def _ask_id_server_for_third_party_invite(
Asks an identity server for a third party invite.

Args:
requester (Requester)
id_server (str): hostname + optional port for the identity server.
medium (str): The literal string "email".
address (str): The third party address being invited.
Expand Down Expand Up @@ -767,8 +781,8 @@ def _ask_id_server_for_third_party_invite(
}

if self.config.invite_3pid_guest:
rh = self.registration_handler
guest_user_id, guest_access_token = yield rh.get_or_register_3pid_guest(
guest_access_token, guest_user_id = yield self.get_or_register_3pid_guest(
requester=requester,
medium=medium,
address=address,
inviter_user_id=inviter_user_id,
Expand Down Expand Up @@ -801,27 +815,6 @@ def _ask_id_server_for_third_party_invite(
display_name = data["display_name"]
defer.returnValue((token, public_keys, fallback_public_key, display_name))

@defer.inlineCallbacks
def forget(self, user, room_id):
user_id = user.to_string()

member = yield self.state_handler.get_current_state(
room_id=room_id,
event_type=EventTypes.Member,
state_key=user_id
)
membership = member.membership if member else None

if membership is not None and membership not in [
Membership.LEAVE, Membership.BAN
]:
raise SynapseError(400, "User %s in room %s" % (
user_id, room_id
))

if membership:
yield self.store.forget(user_id, room_id)

@defer.inlineCallbacks
def _is_host_in_room(self, current_state_ids):
# Have we just created the room, and is this about to be the very
Expand All @@ -843,3 +836,94 @@ def _is_host_in_room(self, current_state_ids):
defer.returnValue(True)

defer.returnValue(False)


class RoomMemberMasterHandler(RoomMemberHandler):
def __init__(self, hs):
super(RoomMemberMasterHandler, self).__init__(hs)

self.distributor = hs.get_distributor()
self.distributor.declare("user_joined_room")
self.distributor.declare("user_left_room")

@defer.inlineCallbacks
def _remote_join(self, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join
"""
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")

# We don't do an auth check if we are doing an invite
# join dance for now, since we're kinda implicitly checking
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
yield self.federation_handler.do_invite_join(
remote_room_hosts,
room_id,
user.to_string(),
content,
)
yield self._user_joined_room(user, room_id)

@defer.inlineCallbacks
def _remote_reject_invite(self, remote_room_hosts, room_id, target):
"""Implements RoomMemberHandler._remote_reject_invite
"""
fed_handler = self.federation_handler
try:
ret = yield fed_handler.do_remotely_reject_invite(
remote_room_hosts,
room_id,
target.to_string(),
)
defer.returnValue(ret)
except Exception as e:
# if we were unable to reject the exception, just mark
# it as rejected on our end and plough ahead.
#
# The 'except' clause is very broad, but we need to
# capture everything from DNS failures upwards
#
logger.warn("Failed to reject invite: %s", e)

yield self.store.locally_reject_invite(
target.to_string(), room_id
)
defer.returnValue({})

def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to pass requester here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, as we'll need to hit out to the master and we want to include the Requester in the logging there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(We don't do it for the user_*_room as that just goes straight into the distributor which immediately strips the context)

"""Implements RoomMemberHandler.get_or_register_3pid_guest
"""
rg = self.registration_handler
return rg.get_or_register_3pid_guest(medium, address, inviter_user_id)

def _user_joined_room(self, target, room_id):
"""Implements RoomMemberHandler._user_joined_room
"""
return user_joined_room(self.distributor, target, room_id)

def _user_left_room(self, target, room_id):
"""Implements RoomMemberHandler._user_left_room
"""
return user_left_room(self.distributor, target, room_id)

@defer.inlineCallbacks
def forget(self, user, room_id):
user_id = user.to_string()

member = yield self.state_handler.get_current_state(
room_id=room_id,
event_type=EventTypes.Member,
state_key=user_id
)
membership = member.membership if member else None

if membership is not None and membership not in [
Membership.LEAVE, Membership.BAN
]:
raise SynapseError(400, "User %s in room %s" % (
user_id, room_id
))

if membership:
yield self.store.forget(user_id, room_id)
6 changes: 4 additions & 2 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.presence import PresenceHandler
from synapse.handlers.room_list import RoomListHandler
from synapse.handlers.room_member import RoomMemberHandler
from synapse.handlers.room_member import RoomMemberMasterHandler
from synapse.handlers.set_password import SetPasswordHandler
from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import TypingHandler
Expand Down Expand Up @@ -387,7 +387,9 @@ def build_spam_checker(self):
return SpamChecker(self)

def build_room_member_handler(self):
return RoomMemberHandler(self)
if self.config.worker_app:
return Exception("Can't use RoomMemberHandler on workers")
return RoomMemberMasterHandler(self)

def build_federation_registry(self):
return FederationHandlerRegistry()
Expand Down