Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Send down device list change notif when member leaves/rejoins room #2443

Merged
merged 7 commits into from
Sep 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 55 additions & 8 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ def get_user_ids_changed(self, user_id, from_token):
user_id (str)
from_token (StreamToken)
"""
now_token = yield self.hs.get_event_sources().get_current_token()

room_ids = yield self.store.get_rooms_for_user(user_id)

# First we check if any devices have changed
Expand All @@ -280,11 +282,24 @@ def get_user_ids_changed(self, user_id, from_token):
# Then work out if any users have since joined
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)

member_events = yield self.store.get_membership_changes_for_user(
user_id, from_token.room_key, now_token.room_key
)
rooms_changed.update(event.room_id for event in member_events)

stream_ordering = RoomStreamToken.parse_stream_token(
from_token.room_key).stream
from_token.room_key
).stream

possibly_changed = set(changed)
possibly_left_rooms = set()
for room_id in rooms_changed:
# The user may have left the room
# TODO: Check if they actually did or if we were just invited.
if room_id not in room_ids:
possibly_left_rooms.add(room_id)
continue

# Fetch the current state at the time.
try:
event_ids = yield self.store.get_forward_extremeties_for_room(
Expand All @@ -307,9 +322,25 @@ def get_user_ids_changed(self, user_id, from_token):
possibly_changed.add(state_key)
continue

current_member_id = current_state_ids.get((EventTypes.Member, user_id))
if not current_member_id:
continue

# mapping from event_id -> state_dict
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)

# Check if we've joined the room? If so we just blindly add all the users to
# the "possibly changed" users.
for state_dict in prev_state_ids.itervalues():
member_event = state_dict.get((EventTypes.Member, user_id), None)
if not member_event or member_event != current_member_id:
for key, event_id in current_state_ids.iteritems():
etype, state_key = key
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
break

# If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users.
Expand All @@ -320,19 +351,35 @@ def get_user_ids_changed(self, user_id, from_token):

# check if this member has changed since any of the extremities
# at the stream_ordering, and add them to the list if so.
for state_dict in prev_state_ids.values():
for state_dict in prev_state_ids.itervalues():
prev_event_id = state_dict.get(key, None)
if not prev_event_id or prev_event_id != event_id:
possibly_changed.add(state_key)
if state_key == user_id:
for key, event_id in current_state_ids.iteritems():
etype, state_key = key
if etype != EventTypes.Member:
continue
possibly_changed.add(room_id)
break

users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)
if possibly_changed:
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)

# Take the intersection of the users whose devices may have changed
# and those that actually still share a room with the user
possibly_joined = possibly_changed & users_who_share_room
possibly_left = possibly_changed - users_who_share_room
else:
possibly_joined = []
possibly_left = []

# Take the intersection of the users whose devices may have changed
# and those that actually still share a room with the user
defer.returnValue(users_who_share_room & possibly_changed)
defer.returnValue({
"changed": list(possibly_joined),
"left": list(possibly_left),
})

@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
Expand Down
110 changes: 94 additions & 16 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ def __nonzero__(self):
return True


class DeviceLists(collections.namedtuple("DeviceLists", [
"changed", # list of user_ids whose devices may have changed
"left", # list of user_ids whose devices we no longer track
])):
__slots__ = []

def __nonzero__(self):
return bool(self.changed or self.left)


class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync
"presence", # List of presence events for the user.
Expand Down Expand Up @@ -535,7 +545,8 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False):
res = yield self._generate_sync_entry_for_rooms(
sync_result_builder, account_data_by_room
)
newly_joined_rooms, newly_joined_users = res
newly_joined_rooms, newly_joined_users, _, _ = res
_, _, newly_left_rooms, newly_left_users = res

block_all_presence_data = (
since_token is None and
Expand All @@ -549,7 +560,11 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False):
yield self._generate_sync_entry_for_to_device(sync_result_builder)

device_lists = yield self._generate_sync_entry_for_device_list(
sync_result_builder
sync_result_builder,
newly_joined_rooms=newly_joined_rooms,
newly_joined_users=newly_joined_users,
newly_left_rooms=newly_left_rooms,
newly_left_users=newly_left_users,
)

device_id = sync_config.device_id
Expand All @@ -574,24 +589,50 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False):

@measure_func("_generate_sync_entry_for_device_list")
@defer.inlineCallbacks
def _generate_sync_entry_for_device_list(self, sync_result_builder):
def _generate_sync_entry_for_device_list(self, sync_result_builder,
newly_joined_rooms, newly_joined_users,
newly_left_rooms, newly_left_users):
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token

if since_token and since_token.device_list_key:
changed = yield self.store.get_user_whose_devices_changed(
since_token.device_list_key
)
if not changed:
defer.returnValue([])

# TODO: Be more clever than this, i.e. remove users who we already
# share a room with?
for room_id in newly_joined_rooms:
joined_users = yield self.state.get_current_user_in_room(room_id)
newly_joined_users.update(joined_users)

for room_id in newly_left_rooms:
left_users = yield self.state.get_current_user_in_room(room_id)
newly_left_users.update(left_users)

# TODO: Check that these users are actually new, i.e. either they
# weren't in the previous sync *or* they left and rejoined.
changed.update(newly_joined_users)

if not changed and not newly_left_users:
defer.returnValue(DeviceLists(
changed=[],
left=newly_left_users,
))

users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)

defer.returnValue(users_who_share_room & changed)
defer.returnValue(DeviceLists(
changed=users_who_share_room & changed,
left=set(newly_left_users) - users_who_share_room,
))
else:
defer.returnValue([])
defer.returnValue(DeviceLists(
changed=[],
left=[],
))

@defer.inlineCallbacks
def _generate_sync_entry_for_to_device(self, sync_result_builder):
Expand Down Expand Up @@ -755,8 +796,8 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro
account_data_by_room(dict): Dictionary of per room account data

Returns:
Deferred(tuple): Returns a 2-tuple of
`(newly_joined_rooms, newly_joined_users)`
Deferred(tuple): Returns a 4-tuple of
`(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)`
"""
user_id = sync_result_builder.sync_config.user.to_string()
block_all_room_ephemeral = (
Expand Down Expand Up @@ -787,7 +828,7 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro
)
if not tags_by_room:
logger.debug("no-oping sync")
defer.returnValue(([], []))
defer.returnValue(([], [], [], []))

ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
"m.ignored_user_list", user_id=user_id,
Expand All @@ -800,14 +841,15 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro

if since_token:
res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
room_entries, invited, newly_joined_rooms = res
room_entries, invited, newly_joined_rooms, newly_left_rooms = res

tags_by_room = yield self.store.get_updated_tags(
user_id, since_token.account_data_key,
)
else:
res = yield self._get_all_rooms(sync_result_builder, ignored_users)
room_entries, invited, newly_joined_rooms = res
newly_left_rooms = []

tags_by_room = yield self.store.get_tags_for_user(user_id)

Expand All @@ -828,17 +870,30 @@ def handle_room_entries(room_entry):

# Now we want to get any newly joined users
newly_joined_users = set()
newly_left_users = set()
if since_token:
for joined_sync in sync_result_builder.joined:
it = itertools.chain(
joined_sync.timeline.events, joined_sync.state.values()
joined_sync.timeline.events, joined_sync.state.itervalues()
)
for event in it:
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
newly_joined_users.add(event.state_key)

defer.returnValue((newly_joined_rooms, newly_joined_users))
else:
prev_content = event.unsigned.get("prev_content", {})
prev_membership = prev_content.get("membership", None)
if prev_membership == Membership.JOIN:
newly_left_users.add(event.state_key)

newly_left_users -= newly_joined_users

defer.returnValue((
newly_joined_rooms,
newly_joined_users,
newly_left_rooms,
newly_left_users,
))

@defer.inlineCallbacks
def _have_rooms_changed(self, sync_result_builder):
Expand Down Expand Up @@ -908,15 +963,17 @@ def _get_rooms_changed(self, sync_result_builder, ignored_users):
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)

newly_joined_rooms = []
newly_left_rooms = []
room_entries = []
invited = []
for room_id, events in mem_change_events_by_room_id.items():
for room_id, events in mem_change_events_by_room_id.iteritems():
non_joins = [e for e in events if e.membership != Membership.JOIN]
has_join = len(non_joins) != len(events)

# We want to figure out if we joined the room at some point since
# the last sync (even if we have since left). This is to make sure
# we do send down the room, and with full state, where necessary
old_state_ids = None
if room_id in joined_room_ids or has_join:
old_state_ids = yield self.get_state_at(room_id, since_token)
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
Expand All @@ -934,6 +991,26 @@ def _get_rooms_changed(self, sync_result_builder, ignored_users):
if not non_joins:
continue

# Check if we have left the room. This can either be because we were
# joined before *or* that we since joined and then left.
if events[-1].membership != Membership.JOIN:
if has_join:
newly_left_rooms.append(room_id)
else:
if not old_state_ids:
old_state_ids = yield self.get_state_at(room_id, since_token)
old_mem_ev_id = old_state_ids.get(
(EventTypes.Member, user_id),
None,
)
old_mem_ev = None
if old_mem_ev_id:
old_mem_ev = yield self.store.get_event(
old_mem_ev_id, allow_none=True
)
if old_mem_ev and old_mem_ev.membership == Membership.JOIN:
newly_left_rooms.append(room_id)

# Only bother if we're still currently invited
should_invite = non_joins[-1].membership == Membership.INVITE
if should_invite:
Expand Down Expand Up @@ -1011,7 +1088,7 @@ def _get_rooms_changed(self, sync_result_builder, ignored_users):
upto_token=since_token,
))

defer.returnValue((room_entries, invited, newly_joined_rooms))
defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms))

@defer.inlineCallbacks
def _get_all_rooms(self, sync_result_builder, ignored_users):
Expand Down Expand Up @@ -1259,6 +1336,7 @@ def __init__(self, sync_config, full_state, since_token, now_token):
self.invited = []
self.archived = []
self.device = []
self.to_device = []


class RoomSyncResultBuilder(object):
Expand Down
6 changes: 2 additions & 4 deletions synapse/rest/client/v2_alpha/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,11 @@ def on_GET(self, request):

user_id = requester.user.to_string()

changed = yield self.device_handler.get_user_ids_changed(
results = yield self.device_handler.get_user_ids_changed(
user_id, from_token,
)

defer.returnValue((200, {
"changed": list(changed),
}))
defer.returnValue((200, results))


class OneTimeKeyServlet(RestServlet):
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ def encode_response(time_now, sync_result, access_token_id, filter):
"account_data": {"events": sync_result.account_data},
"to_device": {"events": sync_result.to_device},
"device_lists": {
"changed": list(sync_result.device_lists),
"changed": list(sync_result.device_lists.changed),
"left": list(sync_result.device_lists.left),
},
"presence": SyncRestServlet.encode_presence(
sync_result.presence, time_now
Expand Down