Skip to content

Commit

Permalink
Refactor SyncResultBuilder assembly to its own function (#17202)
Browse files Browse the repository at this point in the history
We will re-use `get_sync_result_builder(...)` in
#17167

Split out from #17167
  • Loading branch information
MadLittleMods authored May 16, 2024
1 parent fe07995 commit c856ae4
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 116 deletions.
1 change: 1 addition & 0 deletions changelog.d/17202.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor `SyncResultBuilder` assembly to its own function.
264 changes: 148 additions & 116 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1569,12 +1569,158 @@ async def generate_sync_result(
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()

sync_result_builder = await self.get_sync_result_builder(
sync_config,
since_token,
full_state,
)

logger.debug(
"Calculating sync response for %r between %s and %s",
sync_config.user,
sync_result_builder.since_token,
sync_result_builder.now_token,
)

logger.debug("Fetching account data")

# Global account data is included if it is not filtered out.
if not sync_config.filter_collection.blocks_all_global_account_data():
await self._generate_sync_entry_for_account_data(sync_result_builder)

# Presence data is included if the server has it enabled and not filtered out.
include_presence_data = bool(
self.hs_config.server.presence_enabled
and not sync_config.filter_collection.blocks_all_presence()
)
# Device list updates are sent if a since token is provided.
include_device_list_updates = bool(since_token and since_token.device_list_key)

# If we do not care about the rooms or things which depend on the room
# data (namely presence and device list updates), then we can skip
# this process completely.
device_lists = DeviceListUpdates()
if (
not sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
or include_presence_data
or include_device_list_updates
):
logger.debug("Fetching room data")

# Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which
# is used in calculate_user_changes below.
(
newly_joined_rooms,
newly_left_rooms,
) = await self._generate_sync_entry_for_rooms(sync_result_builder)

# Work out which users have joined or left rooms we're in. We use this
# to build the presence and device_list parts of the sync response in
# `_generate_sync_entry_for_presence` and
# `_generate_sync_entry_for_device_list` respectively.
if include_presence_data or include_device_list_updates:
# This uses the sync_result_builder.joined which is set in
# `_generate_sync_entry_for_rooms`, if that didn't find any joined
# rooms for some reason it is a no-op.
(
newly_joined_or_invited_or_knocked_users,
newly_left_users,
) = sync_result_builder.calculate_user_changes()

if include_presence_data:
logger.debug("Fetching presence data")
await self._generate_sync_entry_for_presence(
sync_result_builder,
newly_joined_rooms,
newly_joined_or_invited_or_knocked_users,
)

if include_device_list_updates:
device_lists = await self._generate_sync_entry_for_device_list(
sync_result_builder,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms,
newly_left_users=newly_left_users,
)

logger.debug("Fetching to-device data")
await self._generate_sync_entry_for_to_device(sync_result_builder)

logger.debug("Fetching OTK data")
device_id = sync_config.device_id
one_time_keys_count: JsonMapping = {}
unused_fallback_key_types: List[str] = []
if device_id:
# TODO: We should have a way to let clients differentiate between the states of:
# * no change in OTK count since the provided since token
# * the server has zero OTKs left for this device
# Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
one_time_keys_count = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
unused_fallback_key_types = list(
await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
)

num_events = 0

# debug for https://github.com/matrix-org/synapse/issues/9424
for joined_room in sync_result_builder.joined:
num_events += len(joined_room.timeline.events)

log_kv(
{
"joined_rooms_in_result": len(sync_result_builder.joined),
"events_in_result": num_events,
}
)

logger.debug("Sync response calculation complete")
return SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
joined=sync_result_builder.joined,
invited=sync_result_builder.invited,
knocked=sync_result_builder.knocked,
archived=sync_result_builder.archived,
to_device=sync_result_builder.to_device,
device_lists=device_lists,
device_one_time_keys_count=one_time_keys_count,
device_unused_fallback_key_types=unused_fallback_key_types,
next_batch=sync_result_builder.now_token,
)

async def get_sync_result_builder(
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> "SyncResultBuilder":
"""
Assemble a `SyncResultBuilder` with all of the initial context to
start building up the sync response:
- Membership changes between the last sync and the current sync.
- Joined room IDs (minus any rooms to exclude).
- Rooms that became fully-stated/un-partial stated since the last sync.
Args:
sync_config: Config/info necessary to process the sync request.
since_token: The point in the stream to sync from.
full_state: Whether to return the full state for each room.
Returns:
`SyncResultBuilder` ready to start generating parts of the sync response.
"""
user_id = sync_config.user.to_string()

# Note: we get the users room list *before* we get the current token, this
# avoids checking back in history if rooms are joined after the token is fetched.
token_before_rooms = self.event_sources.get_current_token()
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))

# NB: The now_token gets changed by some of the generate_sync_* methods,
# NB: The `now_token` gets changed by some of the `generate_sync_*` methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
Expand Down Expand Up @@ -1675,13 +1821,6 @@ async def generate_sync_result(
if room_id not in mutable_rooms_to_exclude
)

logger.debug(
"Calculating sync response for %r between %s and %s",
sync_config.user,
since_token,
now_token,
)

sync_result_builder = SyncResultBuilder(
sync_config,
full_state,
Expand All @@ -1693,114 +1832,7 @@ async def generate_sync_result(
membership_change_events=membership_change_events,
)

logger.debug("Fetching account data")

# Global account data is included if it is not filtered out.
if not sync_config.filter_collection.blocks_all_global_account_data():
await self._generate_sync_entry_for_account_data(sync_result_builder)

# Presence data is included if the server has it enabled and not filtered out.
include_presence_data = bool(
self.hs_config.server.presence_enabled
and not sync_config.filter_collection.blocks_all_presence()
)
# Device list updates are sent if a since token is provided.
include_device_list_updates = bool(since_token and since_token.device_list_key)

# If we do not care about the rooms or things which depend on the room
# data (namely presence and device list updates), then we can skip
# this process completely.
device_lists = DeviceListUpdates()
if (
not sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
or include_presence_data
or include_device_list_updates
):
logger.debug("Fetching room data")

# Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which
# is used in calculate_user_changes below.
(
newly_joined_rooms,
newly_left_rooms,
) = await self._generate_sync_entry_for_rooms(sync_result_builder)

# Work out which users have joined or left rooms we're in. We use this
# to build the presence and device_list parts of the sync response in
# `_generate_sync_entry_for_presence` and
# `_generate_sync_entry_for_device_list` respectively.
if include_presence_data or include_device_list_updates:
# This uses the sync_result_builder.joined which is set in
# `_generate_sync_entry_for_rooms`, if that didn't find any joined
# rooms for some reason it is a no-op.
(
newly_joined_or_invited_or_knocked_users,
newly_left_users,
) = sync_result_builder.calculate_user_changes()

if include_presence_data:
logger.debug("Fetching presence data")
await self._generate_sync_entry_for_presence(
sync_result_builder,
newly_joined_rooms,
newly_joined_or_invited_or_knocked_users,
)

if include_device_list_updates:
device_lists = await self._generate_sync_entry_for_device_list(
sync_result_builder,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms,
newly_left_users=newly_left_users,
)

logger.debug("Fetching to-device data")
await self._generate_sync_entry_for_to_device(sync_result_builder)

logger.debug("Fetching OTK data")
device_id = sync_config.device_id
one_time_keys_count: JsonMapping = {}
unused_fallback_key_types: List[str] = []
if device_id:
# TODO: We should have a way to let clients differentiate between the states of:
# * no change in OTK count since the provided since token
# * the server has zero OTKs left for this device
# Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
one_time_keys_count = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
unused_fallback_key_types = list(
await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
)

num_events = 0

# debug for https://github.com/matrix-org/synapse/issues/9424
for joined_room in sync_result_builder.joined:
num_events += len(joined_room.timeline.events)

log_kv(
{
"joined_rooms_in_result": len(sync_result_builder.joined),
"events_in_result": num_events,
}
)

logger.debug("Sync response calculation complete")
return SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
joined=sync_result_builder.joined,
invited=sync_result_builder.invited,
knocked=sync_result_builder.knocked,
archived=sync_result_builder.archived,
to_device=sync_result_builder.to_device,
device_lists=device_lists,
device_one_time_keys_count=one_time_keys_count,
device_unused_fallback_key_types=unused_fallback_key_types,
next_batch=sync_result_builder.now_token,
)
return sync_result_builder

@measure_func("_generate_sync_entry_for_device_list")
async def _generate_sync_entry_for_device_list(
Expand Down

0 comments on commit c856ae4

Please sign in to comment.