Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding Sync: Handle timeline limit changes (take 2) #17579

Merged
merged 29 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
da5339d
Migrate to per-connection state class
erikjohnston Aug 12, 2024
baac6c5
Record with new class
erikjohnston Aug 14, 2024
0561c86
Revamp
erikjohnston Aug 15, 2024
c15b8b3
WIP receipts reading
erikjohnston Aug 13, 2024
a1b75f7
WIP comments
erikjohnston Aug 14, 2024
6b9d244
Record state
erikjohnston Aug 15, 2024
55feaae
Add tests
erikjohnston Aug 15, 2024
614c0d7
Newsfile
erikjohnston Aug 15, 2024
100927d
Comments
erikjohnston Aug 15, 2024
70d32fb
Add proper DB function for getting receipts between things
erikjohnston Aug 15, 2024
ee6efa2
Track room configs in per-connection state
erikjohnston Aug 16, 2024
009af0e
Handle timeline_limit changes
erikjohnston Aug 16, 2024
b23231e
Newsfile
erikjohnston Aug 16, 2024
aea946b
Merge remote-tracking branch 'origin/develop' into erikj/ss_room_sub2
erikjohnston Aug 19, 2024
33ec15b
Restore comments
erikjohnston Aug 19, 2024
768d150
Add docstring
erikjohnston Aug 19, 2024
a63261d
Restore comments
erikjohnston Aug 19, 2024
891ce47
Rename previous_room_configs
erikjohnston Aug 19, 2024
a4ad443
Use test helpers
erikjohnston Aug 19, 2024
0e8feed
Remove spurious set_tag
erikjohnston Aug 19, 2024
49c4645
Remove double insertion
erikjohnston Aug 19, 2024
299ab1b
Use timelime_limit not len(timeline)
erikjohnston Aug 19, 2024
ba4e63b
Add comment explaining the odd behaviour
erikjohnston Aug 19, 2024
2bba63e
Replace initial=true with unstable_expanded_timeline=true
erikjohnston Aug 19, 2024
52f4253
Improve comment
erikjohnston Aug 19, 2024
09538c2
Update synapse/handlers/sliding_sync.py
erikjohnston Aug 20, 2024
733555b
Update synapse/handlers/sliding_sync.py
erikjohnston Aug 20, 2024
76f882a
Update synapse/handlers/sliding_sync.py
erikjohnston Aug 20, 2024
bcaf4e6
Update synapse/handlers/sliding_sync.py
erikjohnston Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17579.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle changes in `timeline_limit` in experimental sliding sync.
Copy link
Contributor

@MadLittleMods MadLittleMods Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This continues to feel horrible especially given new edge cases like this comment. Highly recommend we just update the client to use an initial sync request with timeline_limit: 20 and required_state: [] (which allows us to avoid the extra bytes) to accomplish the exact same thing without introducing any of this bizarre behavior.

Previous conversation for context

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked with @erikjohnston a bit more and was trying to figure out why the initial sync request doesn't solve this completely. I basically asked the opposite question for how/why timeline trickling/unstable_expanded_timeline makes this easier. The following question unlocked a better understanding for one complication when trying to use the initial sync route.

How is the timeline stitching done for the timeline trickling? How is the problem easier when using that?

[...] its all coming down one connection, so you know you've got a consistent "current" timeline chunk that you'll get updates for (which you can then optionally stitch together with whatever timeline chunks the client currently has)

This is a valid point!

I'm still leaning towards the side of initial sync being possible to use (and better) and just requires some basic timeline stitching logic. ElementX might already have some stitching and event de-duplication logic to handle what the proxy was doing before that would also cover this case.

Since ElementX doesn't have offline support and throws away events, I think we could just do this:

For the timeline stitching logic, the client can store the latest event in timeline before our initial sync, then find that event in the initial sync timeline events and spread backwards from that point. That way, the ongoing sync loop can still append to the end of the timeline and continue seamlessly.

So if we have a timeline [103] already on the client, we store latest_event_id = 103, do our initial sync which returns [100, 101, 102, 103, 104] and we splice/spread in only [100, 101, 102, 103] accordingly (drop any new events after the latest_event_id from the initial sync response). This makes it so that even if the ongoing sync loop sends 104 before or after our initial sync does, it still appends like normal and everything is in seamless order.

If there are so many new messages sent in the time between us storing the latest_event_id and the initial sync responding that we now have a gap, we can just throw away our initial sync events because we have enough events to fill up the timeline just from our normal ongoing sync loop.


To be clear, the client doesn't need to be fancy about stitching:

If the client had more timeline like [98, 99, 100, 101, 102, 103], we store latest_event_id = 103, we start the initial sync, our ongoing sync loop races us and returns 104 which makes our timeline look like [98, 99, 100, 101, 102, 103, 104]. Then our initial sync responds with [100, 101, 102, 103, 104], we find the 103 spot in the response to slice at and place it at the 103 spot in the client timeline leaving us with [100, 101, 102, 103, 104]

Pseudo code (maybe off-by-one errors):

latest_event_id = 103

# do initial sync request

initial_sync_timeline = [100, 101, 102, 103, 104]
event_index_in_response = initial_sync_timeline.index(latest_event_id)
# Skip if we can't find the `latest_event_id` in the response.
# This means there have been so many messages sent between the time we initially
# made the initial sync and the response that this is no longer relevant.
# We already have enough events to fill up the timeline from the normal
# ongoing sync loop
if event_index_in_response is None:
	return

event_index_in_client_timeline = client_timeline.index(latest_event_id)
# Update the timeline
client_timeline = initial_sync_timeline[0:event_index_in_response] + client_timeline[event_index_in_client_timeline:-1]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need this same room sync config tracking for required_state (and probably filter/extension) changes so overall the concept isn't lost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible edge case: ElementX triggers unstable_expanded_timeline by increasing the timeline_limit and gets a chunk of timeline and we record that higher timeline_limit. I go to bed and then I wake up (or just some period of time that new messages were sent in but I didn't have the app open). How does ElementX get timeline for all of the events in the gap? If it tries to trigger unstable_expanded_timeline again, it won't work because the last recorded timeline_limit is already just as high.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine. There are two cases depending on if the timeline limit in the morning is small or high:

For the small case:

  • When the client comes online it'll get a small bit of history with limited=true.
  • The client can then either backpaginate in the room, or do a room sub with a larger timeline

For the high case:

  • When the client comes online it'll get a large chunk of history with limited=true. This is (hopefully) enough to show a screens worth of data, which is what we want.
  • If the client wants more it can just backpaginate.

What EX at least wants is to quickly be able to get enough chunks of history in rooms (in the background) to be able to show a screens worth of data. That way the UX is open the app, see a fast sync, (in the background it preloads the top N rooms with more timeline), the user clicks on one of the rooms and sees a page of timeline, and then the app can paginate in more timeline as usual (via /messages).

152 changes: 139 additions & 13 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,20 @@ async def current_sync_for_user(
# subscription and have updates we need to send (i.e. either because
# we haven't sent the room down, or we have but there are missing
# updates).
for room_id in relevant_room_map:
for room_id, room_config in relevant_room_map.items():
prev_room_sync_config = previous_connection_state.room_configs.get(
room_id
)
if prev_room_sync_config is not None:
# Always include rooms whose timeline limit has increased.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# (see the "XXX: Odd behavior" described below)
if (
prev_room_sync_config.timeline_limit
< room_config.timeline_limit
):
rooms_should_send.add(room_id)
continue

status = previous_connection_state.rooms.have_sent_room(room_id)
if (
# The room was never sent down before so the client needs to know
Expand Down Expand Up @@ -819,12 +832,15 @@ async def current_sync_for_user(
if room_id in rooms_should_send
}

new_connection_state = previous_connection_state.get_mutable()

@trace
@tag_args
async def handle_room(room_id: str) -> None:
room_sync_result = await self.get_room_sync_data(
sync_config=sync_config,
previous_connection_state=previous_connection_state,
new_connection_state=new_connection_state,
room_id=room_id,
room_sync_config=relevant_rooms_to_send_map[room_id],
room_membership_for_user_at_to_token=room_membership_for_user_map[
Expand All @@ -842,8 +858,6 @@ async def handle_room(room_id: str) -> None:
with start_active_span("sliding_sync.generate_room_entries"):
await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10)

new_connection_state = previous_connection_state.get_mutable()

extensions = await self.get_extensions_response(
sync_config=sync_config,
actual_lists=lists,
Expand Down Expand Up @@ -1955,6 +1969,7 @@ async def get_room_sync_data(
self,
sync_config: SlidingSyncConfig,
previous_connection_state: "PerConnectionState",
new_connection_state: "MutablePerConnectionState",
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
Expand Down Expand Up @@ -1998,9 +2013,27 @@ async def get_room_sync_data(
# - For an incremental sync where we haven't sent it down this
# connection before
#
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
# Relevant spec issue:
# https://github.com/matrix-org/matrix-spec/issues/1917
#
# XXX: Odd behavior - We also check if the `timeline_limit` has increased, if so
# we ignore the from bound for the timeline to send down a larger chunk of
# history and set `unstable_expanded_timeline` to true. This is only being added
# to match the behavior of the Sliding Sync proxy as we expect the ElementX
# client to feel a certain way and be able to trickle in a full page of timeline
# messages to fill up the screen. This is a bit different to the behavior of the
# Sliding Sync proxy (which sets initial=true, but then doesn't send down the
# full state again), but existing apps, e.g. ElementX, just need `limited` set.
# We don't explicitly set `limited` but this will be the case for any room that
# has more history than we're trying to pull out. Using
# `unstable_expanded_timeline` allows us to avoid contaminating what `initial`
# or `limited` mean for clients that interpret them correctly. In future this
# behavior is almost certainly going to change.
#
# TODO: Also handle changes to `required_state`
from_bound = None
initial = True
ignore_timeline_bound = False
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
if from_token and not room_membership_for_user_at_to_token.newly_joined:
room_status = previous_connection_state.rooms.have_sent_room(room_id)
if room_status.status == HaveSentRoomFlag.LIVE:
Expand All @@ -2018,7 +2051,26 @@ async def get_room_sync_data(

log_kv({"sliding_sync.room_status": room_status})

log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
if prev_room_sync_config is not None:
# Check if the timeline limit has increased, if so ignore the
# timeline bound and record the change (see "XXX: Odd behavior"
# above).
if (
prev_room_sync_config.timeline_limit
< room_sync_config.timeline_limit
):
ignore_timeline_bound = True

# TODO: Check for changes in `required_state``

log_kv(
{
"sliding_sync.from_bound": from_bound,
"sliding_sync.initial": initial,
"sliding_sync.ignore_timeline_bound": ignore_timeline_bound,
}
)

# Assemble the list of timeline events
#
Expand Down Expand Up @@ -2055,6 +2107,10 @@ async def get_room_sync_data(
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)

timeline_from_bound = from_bound
if ignore_timeline_bound:
timeline_from_bound = None

# For initial `/sync` (and other historical scenarios mentioned above), we
# want to view a historical section of the timeline; to fetch events by
# `topological_ordering` (best representation of the room DAG as others were
Expand All @@ -2080,7 +2136,7 @@ async def get_room_sync_data(
pagination_method: PaginateFunction = (
# Use `topographical_ordering` for historical events
paginate_room_events_by_topological_ordering
if from_bound is None
if timeline_from_bound is None
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
Expand All @@ -2090,7 +2146,7 @@ async def get_room_sync_data(
# (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=to_bound,
to_key=from_bound,
to_key=timeline_from_bound,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
Expand Down Expand Up @@ -2448,6 +2504,55 @@ async def get_room_sync_data(
if new_bump_event_pos.stream > 0:
bump_stamp = new_bump_event_pos.stream

unstable_expanded_timeline = False
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
# Record the `room_sync_config` if we're `ignore_timeline_bound` (which means
# that the `timeline_limit` has increased)
if ignore_timeline_bound:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# FIXME: We signal the fact that we're sending down more events to
# the client by setting `unstable_expanded_timeline` to true (see
# "XXX: Odd behavior" above).
unstable_expanded_timeline = True

new_connection_state.room_configs[room_id] = RoomSyncConfig(
timeline_limit=room_sync_config.timeline_limit,
required_state_map=room_sync_config.required_state_map,
)
elif prev_room_sync_config is not None:
# If the result is `limited` then we need to record that the
# `timeline_limit` has been reduced, as when/if the client later requests
# more timeline then we have more data to send.
#
# Otherwise (when not `limited`) we don't need to record that the
# `timeline_limit` has been reduced, as the *effective* `timeline_limit`
# (i.e. the amount of timeline we have previously sent to the client) is at
# least the previous `timeline_limit`.
#
# This is to handle the case where the `timeline_limit` e.g. goes from 10 to
# 5 to 10 again (without any timeline gaps), where there's no point sending
# down the initial historical chunk events when the `timeline_limit` is
# increased as the client already has the 10 previous events. However, if
# client has a gap in the timeline (i.e. `limited` is True), then we *do*
# need to record the reduced timeline.
#
# TODO: Handle timeline gaps (`get_timeline_gaps()`) - This is separate from
# the gaps we might see on the client because a response was `limited` we're
# talking about above.
if (
limited
and prev_room_sync_config.timeline_limit
> room_sync_config.timeline_limit
):
new_connection_state.room_configs[room_id] = RoomSyncConfig(
timeline_limit=room_sync_config.timeline_limit,
required_state_map=room_sync_config.required_state_map,
)

# TODO: Record changes in required_state.

else:
new_connection_state.room_configs[room_id] = room_sync_config

set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)

return SlidingSyncResult.RoomResult(
Expand All @@ -2462,6 +2567,7 @@ async def get_room_sync_data(
stripped_state=stripped_state,
prev_batch=prev_batch_token,
limited=limited,
unstable_expanded_timeline=unstable_expanded_timeline,
num_live=num_live,
bump_stamp=bump_stamp,
joined_count=room_membership_summary.get(
Expand Down Expand Up @@ -3262,16 +3368,30 @@ class PerConnectionState:
Attributes:
rooms: The status of each room for the events stream.
receipts: The status of each room for the receipts stream.
room_configs: Map from room_id to the `RoomSyncConfig` of all
rooms that we have previously sent down.
"""

rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap)
receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap)

room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)

def get_mutable(self) -> "MutablePerConnectionState":
"""Get a mutable copy of this state."""
room_configs = cast(MutableMapping[str, RoomSyncConfig], self.room_configs)

return MutablePerConnectionState(
rooms=self.rooms.get_mutable(),
receipts=self.receipts.get_mutable(),
room_configs=ChainMap({}, room_configs),
)

def copy(self) -> "PerConnectionState":
return PerConnectionState(
rooms=self.rooms.copy(),
receipts=self.receipts.copy(),
room_configs=dict(self.room_configs),
)


Expand All @@ -3282,8 +3402,18 @@ class MutablePerConnectionState(PerConnectionState):
rooms: MutableRoomStatusMap[RoomStreamToken]
receipts: MutableRoomStatusMap[MultiWriterStreamToken]

room_configs: typing.ChainMap[str, RoomSyncConfig]

def has_updates(self) -> bool:
return bool(self.rooms.get_updates()) or bool(self.receipts.get_updates())
return (
bool(self.rooms.get_updates())
or bool(self.receipts.get_updates())
or bool(self.get_room_config_updates())
)

def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]:
"""Get updates to the room sync config"""
return self.room_configs.maps[0]


@attr.s(auto_attribs=True)
Expand Down Expand Up @@ -3367,7 +3497,6 @@ async def record_new_state(
) -> int:
"""Record updated per-connection state, returning the connection
position associated with the new state.

If there are no changes to the state this may return the same token as
the existing per-connection state.
"""
Expand All @@ -3388,10 +3517,7 @@ async def record_new_state(

# We copy the `MutablePerConnectionState` so that the inner `ChainMap`s
# don't grow forever.
sync_statuses[new_store_token] = PerConnectionState(
rooms=new_connection_state.rooms.copy(),
receipts=new_connection_state.receipts.copy(),
)
sync_statuses[new_store_token] = new_connection_state.copy()

return new_store_token

Expand Down
5 changes: 5 additions & 0 deletions synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,11 @@ async def encode_rooms(
if room_result.initial:
serialized_rooms[room_id]["initial"] = room_result.initial

if room_result.unstable_expanded_timeline:
serialized_rooms[room_id][
"unstable_expanded_timeline"
] = room_result.unstable_expanded_timeline
Comment on lines +1047 to +1050
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better if clients provided the unstable_expanded_timeline option to opt-in to the odd behavior.

But providing unstable_expanded_timeline on the response is better than nothing; clients can at-least detect when they've triggered this behavior. And we also at-least have a chance of migrating away from this behavior.


# This will be omitted for invite/knock rooms with `stripped_state`
if (
room_result.required_state is not None
Expand Down
4 changes: 4 additions & 0 deletions synapse/types/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ class RoomResult:
their local state. When there is an update, servers MUST omit this flag
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
absence of this flag means 'false'.
unstable_expanded_timeline: Flag which is set if we're returning more historic
events due to the timeline limit having increased. See "XXX: Odd behavior"
comment ing `synapse.handlers.sliding_sync`.
required_state: The current state of the room
timeline: Latest events in the room. The last event is the most recent.
bundled_aggregations: A mapping of event ID to the bundled aggregations for
Expand Down Expand Up @@ -219,6 +222,7 @@ class StrippedHero:
heroes: Optional[List[StrippedHero]]
is_dm: bool
initial: bool
unstable_expanded_timeline: bool
# Should be empty for invite/knock rooms with `stripped_state`
required_state: List[EventBase]
# Should be empty for invite/knock rooms with `stripped_state`
Expand Down
Loading
Loading