diff --git a/changelog.d/5388.bugfix b/changelog.d/5388.bugfix new file mode 100644 index 000000000000..503e8309151d --- /dev/null +++ b/changelog.d/5388.bugfix @@ -0,0 +1 @@ +Fix email notifications for unnamed rooms with multiple people. diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index e8ee67401fbd..c89a8438a938 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -114,6 +114,21 @@ def _start_processing(self): run_as_background_process("emailpush.process", self._process) + def _pause_processing(self): + """Used by tests to temporarily pause processing of events. + + Asserts that its not currently processing. + """ + assert not self._is_processing + self._is_processing = True + + def _resume_processing(self): + """Used by tests to resume processing of events after pausing. + """ + assert self._is_processing + self._is_processing = False + self._start_processing() + @defer.inlineCallbacks def _process(self): # we should never get here if we are already processing @@ -215,6 +230,10 @@ def _unsafe_process(self): @defer.inlineCallbacks def save_last_stream_ordering_and_success(self, last_stream_ordering): + if last_stream_ordering is None: + # This happens if we haven't yet processed anything + return + self.last_stream_ordering = last_stream_ordering yield self.store.update_pusher_last_stream_ordering_and_success( self.app_id, self.email, self.user_id, diff --git a/synapse/push/presentable_names.py b/synapse/push/presentable_names.py index eef6e18c2e1d..0c66702325ad 100644 --- a/synapse/push/presentable_names.py +++ b/synapse/push/presentable_names.py @@ -162,6 +162,17 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True def descriptor_from_member_events(member_events): + """Get a description of the room based on the member events. + + Args: + member_events (Iterable[FrozenEvent]) + + Returns: + str + """ + + member_events = list(member_events) + if len(member_events) == 0: return "nobody" elif len(member_events) == 1: diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 40a7709c0949..63c583565fa0 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -60,6 +60,11 @@ def start(self): def add_pusher(self, user_id, access_token, kind, app_id, app_display_name, device_display_name, pushkey, lang, data, profile_tag=""): + """Creates a new pusher and adds it to the pool + + Returns: + Deferred[EmailPusher|HttpPusher] + """ time_now_msec = self.clock.time_msec() # we try to create the pusher just to validate the config: it @@ -103,7 +108,9 @@ def add_pusher(self, user_id, access_token, kind, app_id, last_stream_ordering=last_stream_ordering, profile_tag=profile_tag, ) - yield self.start_pusher_by_id(app_id, pushkey, user_id) + pusher = yield self.start_pusher_by_id(app_id, pushkey, user_id) + + defer.returnValue(pusher) @defer.inlineCallbacks def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey, @@ -184,7 +191,11 @@ def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): @defer.inlineCallbacks def start_pusher_by_id(self, app_id, pushkey, user_id): - """Look up the details for the given pusher, and start it""" + """Look up the details for the given pusher, and start it + + Returns: + Deferred[EmailPusher|HttpPusher|None]: The pusher started, if any + """ if not self._should_start_pushers: return @@ -192,13 +203,16 @@ def start_pusher_by_id(self, app_id, pushkey, user_id): app_id, pushkey ) - p = None + pusher_dict = None for r in resultlist: if r['user_name'] == user_id: - p = r + pusher_dict = r - if p: - yield self._start_pusher(p) + pusher = None + if pusher_dict: + pusher = yield self._start_pusher(pusher_dict) + + defer.returnValue(pusher) @defer.inlineCallbacks def _start_pushers(self): @@ -224,7 +238,7 @@ def _start_pusher(self, pusherdict): pusherdict (dict): Returns: - None + Deferred[EmailPusher|HttpPusher] """ try: p = self.pusher_factory.create_pusher(pusherdict) @@ -270,6 +284,8 @@ def _start_pusher(self, pusherdict): p.on_started(have_notifs) + defer.returnValue(p) + @defer.inlineCallbacks def remove_pusher(self, app_id, pushkey, user_id): appid_pushkey = "%s:%s" % (app_id, pushkey) diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 9bc5f07de181..72760a0733b7 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -15,6 +15,7 @@ import os +import attr import pkg_resources from twisted.internet.defer import Deferred @@ -25,6 +26,13 @@ from tests.unittest import HomeserverTestCase +@attr.s +class _User(object): + "Helper wrapper for user ID and access token" + id = attr.ib() + token = attr.ib() + + class EmailPusherTests(HomeserverTestCase): servlets = [ @@ -71,25 +79,32 @@ def sendmail(*args, **kwargs): return hs - def test_sends_email(self): - + def prepare(self, reactor, clock, hs): # Register the user who gets notified - user_id = self.register_user("user", "pass") - access_token = self.login("user", "pass") - - # Register the user who sends the message - other_user_id = self.register_user("otheruser", "pass") - other_access_token = self.login("otheruser", "pass") + self.user_id = self.register_user("user", "pass") + self.access_token = self.login("user", "pass") + + # Register other users + self.others = [ + _User( + id=self.register_user("otheruser1", "pass"), + token=self.login("otheruser1", "pass"), + ), + _User( + id=self.register_user("otheruser2", "pass"), + token=self.login("otheruser2", "pass"), + ), + ] # Register the pusher user_tuple = self.get_success( - self.hs.get_datastore().get_user_by_access_token(access_token) + self.hs.get_datastore().get_user_by_access_token(self.access_token) ) token_id = user_tuple["token_id"] - self.get_success( + self.pusher = self.get_success( self.hs.get_pusherpool().add_pusher( - user_id=user_id, + user_id=self.user_id, access_token=token_id, kind="email", app_id="m.email", @@ -101,22 +116,54 @@ def test_sends_email(self): ) ) - # Create a room - room = self.helper.create_room_as(user_id, tok=access_token) + def test_simple_sends_email(self): + # Create a simple room with two users + room = self.helper.create_room_as(self.user_id, tok=self.access_token) + self.helper.invite( + room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id, + ) + self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token) - # Invite the other person - self.helper.invite(room=room, src=user_id, tok=access_token, targ=other_user_id) + # The other user sends some messages + self.helper.send(room, body="Hi!", tok=self.others[0].token) + self.helper.send(room, body="There!", tok=self.others[0].token) - # The other user joins - self.helper.join(room=room, user=other_user_id, tok=other_access_token) + # We should get emailed about that message + self._check_for_mail() - # The other user sends some messages - self.helper.send(room, body="Hi!", tok=other_access_token) - self.helper.send(room, body="There!", tok=other_access_token) + def test_multiple_members_email(self): + # We want to test multiple notifications, so we pause processing of push + # while we send messages. + self.pusher._pause_processing() + + # Create a simple room with multiple other users + room = self.helper.create_room_as(self.user_id, tok=self.access_token) + + for other in self.others: + self.helper.invite( + room=room, src=self.user_id, tok=self.access_token, targ=other.id, + ) + self.helper.join(room=room, user=other.id, tok=other.token) + + # The other users send some messages + self.helper.send(room, body="Hi!", tok=self.others[0].token) + self.helper.send(room, body="There!", tok=self.others[1].token) + self.helper.send(room, body="There!", tok=self.others[1].token) + + # Nothing should have happened yet, as we're paused. + assert not self.email_attempts + + self.pusher._resume_processing() + + # We should get emailed about those messages + self._check_for_mail() + + def _check_for_mail(self): + "Check that the user receives an email notification" # Get the stream ordering before it gets sent pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) + self.hs.get_datastore().get_pushers_by(dict(user_name=self.user_id)) ) self.assertEqual(len(pushers), 1) last_stream_ordering = pushers[0]["last_stream_ordering"] @@ -126,7 +173,7 @@ def test_sends_email(self): # It hasn't succeeded yet, so the stream ordering shouldn't have moved pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) + self.hs.get_datastore().get_pushers_by(dict(user_name=self.user_id)) ) self.assertEqual(len(pushers), 1) self.assertEqual(last_stream_ordering, pushers[0]["last_stream_ordering"]) @@ -143,7 +190,7 @@ def test_sends_email(self): # The stream ordering has increased pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) + self.hs.get_datastore().get_pushers_by(dict(user_name=self.user_id)) ) self.assertEqual(len(pushers), 1) self.assertTrue(pushers[0]["last_stream_ordering"] > last_stream_ordering)