Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Remove get rooms for user with stream ordering (#13991)
Browse files Browse the repository at this point in the history
By getting the joined rooms before the current token we avoid any reading
history to confirm a user *was* in a room. We can then use any membership
change events, which we already fetch during sync, to determine the final
list of joined room IDs.
  • Loading branch information
Fizzadar authored Oct 4, 2022
1 parent 2b6d41e commit 0506bb1
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 80 deletions.
1 change: 1 addition & 0 deletions changelog.d/13991.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimise queries used to get a users rooms during sync. Contributed by Nick @ Beeper (@fizzadar).
149 changes: 69 additions & 80 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1317,36 +1317,91 @@ async def generate_sync_result(
At the end, we transfer data from the `sync_result_builder` to a new `SyncResult`
instance to signify that the sync calculation is complete.
"""

user_id = sync_config.user.to_string()
app_service = self.store.get_app_service_by_user_id(user_id)
if app_service:
# We no longer support AS users using /sync directly.
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()

# Note: we get the users room list *before* we get the current token, this
# avoids checking back in history if rooms are joined after the token is fetched.
token_before_rooms = self.event_sources.get_current_token()
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))

# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = self.event_sources.get_current_token()
log_kv({"now_token": now_token})

# Since we fetched the users room list before the token, there's a small window
# during which membership events may have been persisted, so we fetch these now
# and modify the joined room list for any changes between the get_rooms_for_user
# call and the get_current_token call.
membership_change_events = []
if since_token:
membership_change_events = await self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
)

mem_last_change_by_room_id: Dict[str, EventBase] = {}
for event in membership_change_events:
mem_last_change_by_room_id[event.room_id] = event

# For the latest membership event in each room found, add/remove the room ID
# from the joined room list accordingly. In this case we only care if the
# latest change is JOIN.

for room_id, event in mem_last_change_by_room_id.items():
assert event.internal_metadata.stream_ordering
if (
event.internal_metadata.stream_ordering
< token_before_rooms.room_key.stream
):
continue

logger.info(
"User membership change between getting rooms and current token: %s %s %s",
user_id,
event.membership,
room_id,
)
# User joined a room - we have to then check the room state to ensure we
# respect any bans if there's a race between the join and ban events.
if event.membership == Membership.JOIN:
user_ids_in_room = await self.store.get_users_in_room(room_id)
if user_id in user_ids_in_room:
mutable_joined_room_ids.add(room_id)
# The user left the room, or left and was re-invited but not joined yet
else:
mutable_joined_room_ids.discard(room_id)

# Now we have our list of joined room IDs, exclude as configured and freeze
joined_room_ids = frozenset(
(
room_id
for room_id in mutable_joined_room_ids
if room_id not in self.rooms_to_exclude
)
)

logger.debug(
"Calculating sync response for %r between %s and %s",
sync_config.user,
since_token,
now_token,
)

user_id = sync_config.user.to_string()
app_service = self.store.get_app_service_by_user_id(user_id)
if app_service:
# We no longer support AS users using /sync directly.
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
else:
joined_room_ids = await self.get_rooms_for_user_at(
user_id, now_token.room_key
)
sync_result_builder = SyncResultBuilder(
sync_config,
full_state,
since_token=since_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
membership_change_events=membership_change_events,
)

logger.debug("Fetching account data")
Expand Down Expand Up @@ -1827,19 +1882,12 @@ async def _have_rooms_changed(
Does not modify the `sync_result_builder`.
"""
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
membership_change_events = sync_result_builder.membership_change_events

assert since_token

# Get a list of membership change events that have happened to the user
# requesting the sync.
membership_changes = await self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key
)

if membership_changes:
if membership_change_events:
return True

stream_id = since_token.room_key.stream
Expand Down Expand Up @@ -1878,16 +1926,10 @@ async def _get_rooms_changed(
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
membership_change_events = sync_result_builder.membership_change_events

assert since_token

# TODO: we've already called this function and ran this query in
# _have_rooms_changed. We could keep the results in memory to avoid a
# second query, at the cost of more complicated source code.
membership_change_events = await self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
)

mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
for event in membership_change_events:
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
Expand Down Expand Up @@ -2415,60 +2457,6 @@ async def _generate_room_entry(
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)

async def get_rooms_for_user_at(
self,
user_id: str,
room_key: RoomStreamToken,
) -> FrozenSet[str]:
"""Get set of joined rooms for a user at the given stream ordering.
The stream ordering *must* be recent, otherwise this may throw an
exception if older than a month. (This function is called with the
current token, which should be perfectly fine).
Args:
user_id
stream_ordering
ReturnValue:
Set of room_ids the user is in at given stream_ordering.
"""
joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id)

joined_room_ids = set()

# We need to check that the stream ordering of the join for each room
# is before the stream_ordering asked for. This might not be the case
# if the user joins a room between us getting the current token and
# calling `get_rooms_for_user_with_stream_ordering`.
# If the membership's stream ordering is after the given stream
# ordering, we need to go and work out if the user was in the room
# before.
# We also need to check whether the room should be excluded from sync
# responses as per the homeserver config.
for joined_room in joined_rooms:
if joined_room.room_id in self.rooms_to_exclude:
continue

if not joined_room.event_pos.persisted_after(room_key):
joined_room_ids.add(joined_room.room_id)
continue

logger.info("User joined room after current token: %s", joined_room.room_id)

extrems = (
await self.store.get_forward_extremities_for_room_at_stream_ordering(
joined_room.room_id, joined_room.event_pos.stream
)
)
user_ids_in_room = await self.state.get_current_user_ids_in_room(
joined_room.room_id, extrems
)
if user_id in user_ids_in_room:
joined_room_ids.add(joined_room.room_id)

return frozenset(joined_room_ids)


def _action_has_highlight(actions: List[JsonDict]) -> bool:
for action in actions:
Expand Down Expand Up @@ -2565,6 +2553,7 @@ class SyncResultBuilder:
since_token: Optional[StreamToken]
now_token: StreamToken
joined_room_ids: FrozenSet[str]
membership_change_events: List[EventBase]

presence: List[UserPresenceState] = attr.Factory(list)
account_data: List[JsonDict] = attr.Factory(list)
Expand Down

0 comments on commit 0506bb1

Please sign in to comment.