Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Optimise calculating device_list changes in /sync. (#11974)
Browse files Browse the repository at this point in the history
For users with large accounts it is inefficient to calculate the set of
users they share a room with (and takes a lot of space in the cache).
Instead we can look at users whose devices have changed since the last
sync and check if they share a room with the syncing user.
  • Loading branch information
erikjohnston authored Feb 15, 2022
1 parent bab2394 commit 2b5643b
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 15 deletions.
1 change: 1 addition & 0 deletions changelog.d/11974.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimise calculating device_list changes in `/sync`.
68 changes: 53 additions & 15 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1289,23 +1289,54 @@ async def _generate_sync_entry_for_device_list(
# room with by looking at all users that have left a room plus users
# that were in a room we've left.

users_who_share_room = await self.store.get_users_who_share_room_with_user(
user_id
)

# Always tell the user about their own devices. We check as the user
# ID is almost certainly already included (unless they're not in any
# rooms) and taking a copy of the set is relatively expensive.
if user_id not in users_who_share_room:
users_who_share_room = set(users_who_share_room)
users_who_share_room.add(user_id)
users_that_have_changed = set()

tracked_users = users_who_share_room
joined_rooms = sync_result_builder.joined_room_ids

# Step 1a, check for changes in devices of users we share a room with
users_that_have_changed = await self.store.get_users_whose_devices_changed(
since_token.device_list_key, tracked_users
# Step 1a, check for changes in devices of users we share a room
# with
#
# We do this in two different ways depending on what we have cached.
# If we already have a list of all the user that have changed since
# the last sync then it's likely more efficient to compare the rooms
# they're in with the rooms the syncing user is in.
#
# If we don't have that info cached then we get all the users that
# share a room with our user and check if those users have changed.
changed_users = self.store.get_cached_device_list_changes(
since_token.device_list_key
)
if changed_users is not None:
result = await self.store.get_rooms_for_users_with_stream_ordering(
changed_users
)

for changed_user_id, entries in result.items():
# Check if the changed user shares any rooms with the user,
# or if the changed user is the syncing user (as we always
# want to include device list updates of their own devices).
if user_id == changed_user_id or any(
e.room_id in joined_rooms for e in entries
):
users_that_have_changed.add(changed_user_id)
else:
users_who_share_room = (
await self.store.get_users_who_share_room_with_user(user_id)
)

# Always tell the user about their own devices. We check as the user
# ID is almost certainly already included (unless they're not in any
# rooms) and taking a copy of the set is relatively expensive.
if user_id not in users_who_share_room:
users_who_share_room = set(users_who_share_room)
users_who_share_room.add(user_id)

tracked_users = users_who_share_room
users_that_have_changed = (
await self.store.get_users_whose_devices_changed(
since_token.device_list_key, tracked_users
)
)

# Step 1b, check for newly joined rooms
for room_id in newly_joined_rooms:
Expand All @@ -1329,7 +1360,14 @@ async def _generate_sync_entry_for_device_list(
newly_left_users.update(left_users)

# Remove any users that we still share a room with.
newly_left_users -= users_who_share_room
left_users_rooms = (
await self.store.get_rooms_for_users_with_stream_ordering(
newly_left_users
)
)
for user_id, entries in left_users_rooms.items():
if any(e.room_id in joined_rooms for e in entries):
newly_left_users.discard(user_id)

return DeviceLists(changed=users_that_have_changed, left=newly_left_users)
else:
Expand Down
10 changes: 10 additions & 0 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,16 @@ async def get_cached_devices_for_user(self, user_id: str) -> Dict[str, JsonDict]
device["device_id"]: db_to_json(device["content"]) for device in devices
}

def get_cached_device_list_changes(
self,
from_key: int,
) -> Optional[Set[str]]:
"""Get set of users whose devices have changed since `from_key`, or None
if that information is not in our cache.
"""

return self._device_list_stream_cache.get_all_entities_changed(from_key)

async def get_users_whose_devices_changed(
self, from_key: int, user_ids: Iterable[str]
) -> Set[str]:
Expand Down
62 changes: 62 additions & 0 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,68 @@ def _get_rooms_for_user_with_stream_ordering_txn(
for room_id, instance, stream_id in txn
)

@cachedList(
cached_method_name="get_rooms_for_user_with_stream_ordering",
list_name="user_ids",
)
async def get_rooms_for_users_with_stream_ordering(
self, user_ids: Collection[str]
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:
"""A batched version of `get_rooms_for_user_with_stream_ordering`.
Returns:
Map from user_id to set of rooms that is currently in.
"""
return await self.db_pool.runInteraction(
"get_rooms_for_users_with_stream_ordering",
self._get_rooms_for_users_with_stream_ordering_txn,
user_ids,
)

def _get_rooms_for_users_with_stream_ordering_txn(
self, txn, user_ids: Collection[str]
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:

clause, args = make_in_list_sql_clause(
self.database_engine,
"c.state_key",
user_ids,
)

if self._current_state_events_membership_up_to_date:
sql = f"""
SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND c.membership = ?
AND {clause}
"""
else:
sql = f"""
SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN room_memberships AS m USING (room_id, event_id)
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND m.membership = ?
AND {clause}
"""

txn.execute(sql, [Membership.JOIN] + args)

result = {user_id: set() for user_id in user_ids}
for user_id, room_id, instance, stream_id in txn:
result[user_id].add(
GetRoomsForUserWithStreamOrdering(
room_id, PersistedEventPosition(instance, stream_id)
)
)

return {user_id: frozenset(v) for user_id, v in result.items()}

async def get_users_server_still_shares_room_with(
self, user_ids: Collection[str]
) -> Set[str]:
Expand Down

0 comments on commit 2b5643b

Please sign in to comment.