From 68b7fc3e2ba0aae7813b0bae52370860b5cd9f26 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 19 Oct 2015 17:26:18 +0100 Subject: [PATCH 1/3] Add rooms that the user has left under archived in v2 sync. --- synapse/handlers/sync.py | 128 ++++++++++++++++++++++++++- synapse/rest/client/v2_alpha/sync.py | 29 ++++-- synapse/storage/roommember.py | 13 +++ 3 files changed, 161 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ee6b881de1be..1891cd088c22 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -61,18 +61,37 @@ def __nonzero__(self): return bool(self.timeline or self.state or self.ephemeral) +class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ + "room_id", + "timeline", + "state", +])): + __slots__ = [] + + def __nonzero__(self): + """Make the result appear empty if there are no updates. This is used + to tell if room needs to be part of the sync result. + """ + return bool(self.timeline or self.state) + + class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ "room_id", "invite", ])): __slots__ = [] + def __nonzero__(self): + """Invited rooms should always be reported to the client""" + return True + class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. "joined", # JoinedSyncResult for each joined room. "invited", # InvitedSyncResult for each invited room. + "archived", # ArchivedSyncResult for each archived room. ])): __slots__ = [] @@ -156,11 +175,14 @@ def initial_sync(self, sync_config): ) room_list = yield self.store.get_rooms_for_user_where_membership_is( user_id=sync_config.user.to_string(), - membership_list=[Membership.INVITE, Membership.JOIN] + membership_list=[ + Membership.INVITE, Membership.JOIN, Membership.LEAVE + ] ) joined = [] invited = [] + archived = [] for event in room_list: if event.membership == Membership.JOIN: room_sync = yield self.initial_sync_for_joined_room( @@ -173,11 +195,23 @@ def initial_sync(self, sync_config): room_id=event.room_id, invite=invite, )) + elif event.membership == Membership.LEAVE: + leave_token = now_token.copy_and_replace( + "room_key", "s%d" % (event.stream_ordering,) + ) + room_sync = yield self.initial_sync_for_archived_room( + sync_config=sync_config, + room_id=event.room_id, + leave_event_id=event.event_id, + leave_token=leave_token, + ) + archived.append(room_sync) defer.returnValue(SyncResult( presence=presence, joined=joined, invited=invited, + archived=archived, next_batch=now_token, )) @@ -204,6 +238,28 @@ def initial_sync_for_joined_room(self, room_id, sync_config, now_token): ephemeral=[], )) + @defer.inlineCallbacks + def initial_sync_for_archived_room(self, room_id, sync_config, + leave_event_id, leave_token): + """Sync a room for a client which is starting without any state + Returns: + A Deferred JoinedSyncResult. + """ + + batch = yield self.load_filtered_recents( + room_id, sync_config, leave_token, + ) + + leave_state = yield self.store.get_state_for_events( + [leave_event_id], None + ) + + defer.returnValue(ArchivedSyncResult( + room_id=room_id, + timeline=batch, + state=leave_state[leave_event_id].values(), + )) + @defer.inlineCallbacks def incremental_sync_with_gap(self, sync_config, since_token): """ Get the incremental delta needed to bring the client up to @@ -257,18 +313,22 @@ def incremental_sync_with_gap(self, sync_config, since_token): ) joined = [] + archived = [] if len(room_events) <= timeline_limit: # There is no gap in any of the rooms. Therefore we can just # partition the new events by room and return them. invite_events = [] + leave_events = [] events_by_room_id = {} for event in room_events: events_by_room_id.setdefault(event.room_id, []).append(event) if event.room_id not in joined_room_ids: if (event.type == EventTypes.Member - and event.membership == Membership.INVITE and event.state_key == sync_config.user.to_string()): - invite_events.append(event) + if event.membership == Membership.INVITE: + invite_events.append(event) + elif event.membership == Membership.LEAVE: + leave_events.append(event) for room_id in joined_room_ids: recents = events_by_room_id.get(room_id, []) @@ -296,11 +356,16 @@ def incremental_sync_with_gap(self, sync_config, since_token): ) if room_sync: joined.append(room_sync) + else: invite_events = yield self.store.get_invites_for_user( sync_config.user.to_string() ) + leave_events = yield self.store.get_leave_events_for_user( + sync_config.user.to_string() + ) + for room_id in joined_room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, @@ -309,6 +374,12 @@ def incremental_sync_with_gap(self, sync_config, since_token): if room_sync: joined.append(room_sync) + for leave_event in leave_events: + room_sync = yield self.incremental_sync_for_archived_room( + sync_config, leave_event, since_token + ) + archived.append(room_sync) + invited = [ InvitedSyncResult(room_id=event.room_id, invite=event) for event in invite_events @@ -318,6 +389,7 @@ def incremental_sync_with_gap(self, sync_config, since_token): presence=presence, joined=joined, invited=invited, + archived=archived, next_batch=now_token, )) @@ -416,6 +488,56 @@ def incremental_sync_with_gap_for_room(self, room_id, sync_config, defer.returnValue(room_sync) + @defer.inlineCallbacks + def incremental_sync_for_archived_room(self, sync_config, leave_event, + since_token): + """ Get the incremental delta needed to bring the client up to date for + the archived room. + Returns: + A Deferred ArchivedSyncResult + """ + + stream_token = yield self.store.get_stream_token_for_event( + leave_event.event_id + ) + + leave_token = since_token.copy_and_replace("room_key", stream_token) + + batch = yield self.load_filtered_recents( + leave_event.room_id, sync_config, leave_token, since_token, + ) + + logging.debug("Recents %r", batch) + + # TODO(mjark): This seems racy since this isn't being passed a + # token to indicate what point in the stream this is + leave_state = yield self.store.get_state_for_events( + [leave_event.event_id], None + ) + + state_events_at_leave = leave_state[leave_event.event_id].values() + + state_at_previous_sync = yield self.get_state_at_previous_sync( + leave_event.room_id, since_token=since_token + ) + + state_events_delta = yield self.compute_state_delta( + since_token=since_token, + previous_state=state_at_previous_sync, + current_state=state_events_at_leave, + ) + + room_sync = ArchivedSyncResult( + room_id=leave_event.room_id, + timeline=batch, + state=state_events_delta, + ) + + logging.debug("Room sync: %r", room_sync) + + defer.returnValue(room_sync) + + @defer.inlineCallbacks def get_state_at_previous_sync(self, room_id, since_token): """ Get the room state at the previous sync the client made. diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index fffecb24f593..73473a7e6b5c 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -136,6 +136,10 @@ def on_GET(self, request): sync_result.invited, filter, time_now, token_id ) + archived = self.encode_archived( + sync_result.archived, filter, time_now, token_id + ) + response_content = { "presence": self.encode_presence( sync_result.presence, filter, time_now @@ -143,7 +147,7 @@ def on_GET(self, request): "rooms": { "joined": joined, "invited": invited, - "archived": {}, + "archived": archived, }, "next_batch": sync_result.next_batch.to_string(), } @@ -182,14 +186,20 @@ def encode_invited(self, rooms, filter, time_now, token_id): return invited + def encode_archived(self, rooms, filter, time_now, token_id): + joined = {} + for room in rooms: + joined[room.room_id] = self.encode_room( + room, filter, time_now, token_id, joined=False + ) + + return joined + @staticmethod - def encode_room(room, filter, time_now, token_id): + def encode_room(room, filter, time_now, token_id, joined=True): event_map = {} state_events = filter.filter_room_state(room.state) - timeline_events = filter.filter_room_timeline(room.timeline.events) - ephemeral_events = filter.filter_room_ephemeral(room.ephemeral) state_event_ids = [] - timeline_event_ids = [] for event in state_events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( @@ -198,6 +208,8 @@ def encode_room(room, filter, time_now, token_id): ) state_event_ids.append(event.event_id) + timeline_events = filter.filter_room_timeline(room.timeline.events) + timeline_event_ids = [] for event in timeline_events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( @@ -205,6 +217,7 @@ def encode_room(room, filter, time_now, token_id): event_format=format_event_for_client_v2_without_event_id, ) timeline_event_ids.append(event.event_id) + result = { "event_map": event_map, "timeline": { @@ -213,8 +226,12 @@ def encode_room(room, filter, time_now, token_id): "limited": room.timeline.limited, }, "state": {"events": state_event_ids}, - "ephemeral": {"events": ephemeral_events}, } + + if joined: + ephemeral_events = filter.filter_room_ephemeral(room.ephemeral) + result["ephemeral"] = {"events": ephemeral_events} + return result diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index dd98dcfda896..623400fd36a2 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -124,6 +124,19 @@ def get_invites_for_user(self, user_id): invites.event_id for invite in invites ])) + def get_leave_events_for_user(self, user_id): + """ Get all the leave events for a user + Args: + user_id (str): The user ID. + Returns: + A deferred list of event objects. + """ + return self.get_rooms_for_user_where_membership_is( + user_id, [Membership.LEAVE] + ).addCallback(lambda leaves: self._get_events([ + leave.event_id for leave in leaves + ])) + def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user matches one in the membership list. From 51d03e65b2d481a59dfb08e6c75aa349fca71fe6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 19 Oct 2015 17:48:58 +0100 Subject: [PATCH 2/3] Fix pep8 --- synapse/handlers/sync.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 1891cd088c22..5ca26064432d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -537,7 +537,6 @@ def incremental_sync_for_archived_room(self, sync_config, leave_event, defer.returnValue(room_sync) - @defer.inlineCallbacks def get_state_at_previous_sync(self, room_id, since_token): """ Get the room state at the previous sync the client made. From e3d75f564ab1d17eb4ee47314f78c41553a486f1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 21 Oct 2015 11:15:48 +0100 Subject: [PATCH 3/3] Include banned rooms in the archived section of v2 sync --- synapse/handlers/sync.py | 15 +++++++++------ synapse/storage/roommember.py | 4 ++-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5ca26064432d..6cb756e47691 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -175,9 +175,12 @@ def initial_sync(self, sync_config): ) room_list = yield self.store.get_rooms_for_user_where_membership_is( user_id=sync_config.user.to_string(), - membership_list=[ - Membership.INVITE, Membership.JOIN, Membership.LEAVE - ] + membership_list=( + Membership.INVITE, + Membership.JOIN, + Membership.LEAVE, + Membership.BAN + ) ) joined = [] @@ -195,7 +198,7 @@ def initial_sync(self, sync_config): room_id=event.room_id, invite=invite, )) - elif event.membership == Membership.LEAVE: + elif event.membership in (Membership.LEAVE, Membership.BAN): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) @@ -327,7 +330,7 @@ def incremental_sync_with_gap(self, sync_config, since_token): and event.state_key == sync_config.user.to_string()): if event.membership == Membership.INVITE: invite_events.append(event) - elif event.membership == Membership.LEAVE: + elif event.membership in (Membership.LEAVE, Membership.BAN): leave_events.append(event) for room_id in joined_room_ids: @@ -362,7 +365,7 @@ def incremental_sync_with_gap(self, sync_config, since_token): sync_config.user.to_string() ) - leave_events = yield self.store.get_leave_events_for_user( + leave_events = yield self.store.get_leave_and_ban_events_for_user( sync_config.user.to_string() ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 623400fd36a2..ae1ad56d9abd 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -124,7 +124,7 @@ def get_invites_for_user(self, user_id): invites.event_id for invite in invites ])) - def get_leave_events_for_user(self, user_id): + def get_leave_and_ban_events_for_user(self, user_id): """ Get all the leave events for a user Args: user_id (str): The user ID. @@ -132,7 +132,7 @@ def get_leave_events_for_user(self, user_id): A deferred list of event objects. """ return self.get_rooms_for_user_where_membership_is( - user_id, [Membership.LEAVE] + user_id, (Membership.LEAVE, Membership.BAN) ).addCallback(lambda leaves: self._get_events([ leave.event_id for leave in leaves ]))