-
Notifications
You must be signed in to change notification settings - Fork 212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sliding Sync: Handle timeline limit changes (take 2) #17579
Changes from 14 commits
da5339d
baac6c5
0561c86
c15b8b3
a1b75f7
6b9d244
55feaae
614c0d7
100927d
70d32fb
ee6efa2
009af0e
b23231e
aea946b
33ec15b
768d150
a63261d
891ce47
a4ad443
0e8feed
49c4645
299ab1b
ba4e63b
2bba63e
52f4253
09538c2
733555b
76f882a
bcaf4e6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Handle changes in `timeline_limit` in experimental sliding sync. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We will need this same room sync config tracking for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Possible edge case: ElementX triggers There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is fine. There are two cases depending on if the timeline limit in the morning is small or high: For the small case:
For the high case:
What EX at least wants is to quickly be able to get enough chunks of history in rooms (in the background) to be able to show a screens worth of data. That way the UX is open the app, see a fast sync, (in the background it preloads the top N rooms with more timeline), the user clicks on one of the rooms and sees a page of timeline, and then the app can paginate in more timeline as usual (via |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -787,7 +787,19 @@ async def current_sync_for_user( | |
# subscription and have updates we need to send (i.e. either because | ||
# we haven't sent the room down, or we have but there are missing | ||
# updates). | ||
for room_id in relevant_room_map: | ||
for room_id, room_config in relevant_room_map.items(): | ||
prev_room_sync_config = ( | ||
previous_connection_state.previous_room_configs.get(room_id) | ||
) | ||
if prev_room_sync_config is not None: | ||
# Always include rooms whose timeline limit has increased. | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if ( | ||
prev_room_sync_config.timeline_limit | ||
< room_config.timeline_limit | ||
): | ||
rooms_should_send.add(room_id) | ||
continue | ||
|
||
status = previous_connection_state.rooms.have_sent_room(room_id) | ||
if ( | ||
# The room was never sent down before so the client needs to know | ||
|
@@ -819,12 +831,17 @@ async def current_sync_for_user( | |
if room_id in rooms_should_send | ||
} | ||
|
||
new_connection_state = previous_connection_state.get_mutable() | ||
|
||
@trace | ||
@tag_args | ||
async def handle_room(room_id: str) -> None: | ||
set_tag("room_id", room_id) | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
room_sync_result = await self.get_room_sync_data( | ||
sync_config=sync_config, | ||
previous_connection_state=previous_connection_state, | ||
new_connection_state=new_connection_state, | ||
room_id=room_id, | ||
room_sync_config=relevant_rooms_to_send_map[room_id], | ||
room_membership_for_user_at_to_token=room_membership_for_user_map[ | ||
|
@@ -842,8 +859,6 @@ async def handle_room(room_id: str) -> None: | |
with start_active_span("sliding_sync.generate_room_entries"): | ||
await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10) | ||
|
||
new_connection_state = previous_connection_state.get_mutable() | ||
|
||
extensions = await self.get_extensions_response( | ||
sync_config=sync_config, | ||
actual_lists=lists, | ||
|
@@ -1955,6 +1970,7 @@ async def get_room_sync_data( | |
self, | ||
sync_config: SlidingSyncConfig, | ||
previous_connection_state: "PerConnectionState", | ||
new_connection_state: "MutablePerConnectionState", | ||
room_id: str, | ||
room_sync_config: RoomSyncConfig, | ||
room_membership_for_user_at_to_token: _RoomMembershipForUser, | ||
|
@@ -1999,8 +2015,15 @@ async def get_room_sync_data( | |
# connection before | ||
# | ||
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 | ||
# | ||
# We also need to check if the timeline limit has increased, if so we ignore | ||
# the from bound for the timeline to send down a larger chunk of | ||
# history. | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# | ||
# TODO: Also handle changes to `required_state` | ||
from_bound = None | ||
initial = True | ||
ignore_timeline_bound = False | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if from_token and not room_membership_for_user_at_to_token.newly_joined: | ||
room_status = previous_connection_state.rooms.have_sent_room(room_id) | ||
if room_status.status == HaveSentRoomFlag.LIVE: | ||
|
@@ -2018,7 +2041,39 @@ async def get_room_sync_data( | |
|
||
log_kv({"sliding_sync.room_status": room_status}) | ||
|
||
log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial}) | ||
prev_room_sync_config = previous_connection_state.previous_room_configs.get( | ||
room_id | ||
) | ||
if prev_room_sync_config is not None: | ||
# Check if the timeline limit has increased, if so ignore the | ||
# timeline bound and record the change. | ||
if ( | ||
prev_room_sync_config.timeline_limit | ||
< room_sync_config.timeline_limit | ||
): | ||
ignore_timeline_bound = True | ||
new_connection_state.previous_room_configs[room_id] = ( | ||
room_sync_config | ||
) | ||
|
||
if ( | ||
room_status.status != HaveSentRoomFlag.LIVE | ||
and prev_room_sync_config.timeline_limit | ||
> room_sync_config.timeline_limit | ||
): | ||
new_connection_state.previous_room_configs[room_id] = ( | ||
room_sync_config | ||
) | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# TODO: Record changes in required_state. | ||
|
||
log_kv( | ||
{ | ||
"sliding_sync.from_bound": from_bound, | ||
"sliding_sync.initial": initial, | ||
"sliding_sync.ignore_timeline_bound": ignore_timeline_bound, | ||
} | ||
) | ||
|
||
# Assemble the list of timeline events | ||
# | ||
|
@@ -2055,6 +2110,10 @@ async def get_room_sync_data( | |
room_membership_for_user_at_to_token.event_pos.to_room_stream_token() | ||
) | ||
|
||
timeline_from_bound = from_bound | ||
if ignore_timeline_bound: | ||
timeline_from_bound = None | ||
|
||
# For initial `/sync` (and other historical scenarios mentioned above), we | ||
# want to view a historical section of the timeline; to fetch events by | ||
# `topological_ordering` (best representation of the room DAG as others were | ||
|
@@ -2080,7 +2139,7 @@ async def get_room_sync_data( | |
pagination_method: PaginateFunction = ( | ||
# Use `topographical_ordering` for historical events | ||
paginate_room_events_by_topological_ordering | ||
if from_bound is None | ||
if timeline_from_bound is None | ||
# Use `stream_ordering` for updates | ||
else paginate_room_events_by_stream_ordering | ||
) | ||
|
@@ -2090,7 +2149,7 @@ async def get_room_sync_data( | |
# (from newer to older events) starting at to_bound. | ||
# This ensures we fill the `limit` with the newest events first, | ||
from_key=to_bound, | ||
to_key=from_bound, | ||
to_key=timeline_from_bound, | ||
direction=Direction.BACKWARDS, | ||
# We add one so we can determine if there are enough events to saturate | ||
# the limit or not (see `limited`) | ||
|
@@ -2448,6 +2507,47 @@ async def get_room_sync_data( | |
if new_bump_event_pos.stream > 0: | ||
bump_stamp = new_bump_event_pos.stream | ||
|
||
prev_room_sync_config = previous_connection_state.previous_room_configs.get( | ||
room_id | ||
) | ||
if ignore_timeline_bound: | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# FIXME: We signal the fact that we're sending down more events to | ||
# the client by setting `initial=true` *without* sending down all | ||
# the state/metadata again, which is what the proxy does. We should | ||
# update the protocol to do something less silly. | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
initial = True | ||
|
||
new_connection_state.previous_room_configs[room_id] = RoomSyncConfig( | ||
timeline_limit=len(timeline_events), | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
required_state_map=room_sync_config.required_state_map, | ||
) | ||
elif prev_room_sync_config is not None: | ||
# If the result isn't limited then we don't need to record that the | ||
# timeline_limit has been reduced, as the *effective* timeline limit | ||
# (i.e. the amount of timeline we have previously sent) is at least | ||
# the previous timeline limit. | ||
# | ||
# This is to handle the case where the timeline limit e.g. goes from | ||
# 10 to 5 to 10 again (without any timeline gaps), where there's no | ||
# point sending down extra events when the timeline limit is | ||
# increased as the client already has the 10 previous events. | ||
# However, if is a gap (i.e. limited is True), then we *do* need to | ||
# record the reduced timeline. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Overall, I just don't get it. Related to the comment above
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, looks like I moved the code and then just didn't remove it from the old place. I think the example explains the motivating case we're trying to handle? Another way of putting it is that if the response isn't limited, then we know we have sent down at least the previous There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After talking with @erikjohnston and some more thinking, I think I understand the optimization here. The goal of timeline trickling/ If the response is I'm not sure this explanation would have made it more clear from the beginning. |
||
if ( | ||
limited | ||
and prev_room_sync_config.timeline_limit | ||
> room_sync_config.timeline_limit | ||
): | ||
new_connection_state.previous_room_configs[room_id] = RoomSyncConfig( | ||
timeline_limit=len(timeline_events), | ||
required_state_map=room_sync_config.required_state_map, | ||
) | ||
|
||
# TODO: Record changes in required_state. | ||
|
||
else: | ||
new_connection_state.previous_room_configs[room_id] = room_sync_config | ||
|
||
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) | ||
|
||
return SlidingSyncResult.RoomResult( | ||
|
@@ -3267,11 +3367,25 @@ class PerConnectionState: | |
rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap) | ||
receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap) | ||
|
||
previous_room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict) | ||
|
||
def get_mutable(self) -> "MutablePerConnectionState": | ||
"""Get a mutable copy of this state.""" | ||
previous_room_configs = cast( | ||
MutableMapping[str, RoomSyncConfig], self.previous_room_configs | ||
) | ||
|
||
return MutablePerConnectionState( | ||
rooms=self.rooms.get_mutable(), | ||
receipts=self.receipts.get_mutable(), | ||
previous_room_configs=ChainMap({}, previous_room_configs), | ||
) | ||
|
||
def copy(self) -> "PerConnectionState": | ||
return PerConnectionState( | ||
rooms=self.rooms.copy(), | ||
receipts=self.receipts.copy(), | ||
previous_room_configs=dict(self.previous_room_configs), | ||
) | ||
|
||
|
||
|
@@ -3282,8 +3396,18 @@ class MutablePerConnectionState(PerConnectionState): | |
rooms: MutableRoomStatusMap[RoomStreamToken] | ||
receipts: MutableRoomStatusMap[MultiWriterStreamToken] | ||
|
||
previous_room_configs: typing.ChainMap[str, RoomSyncConfig] | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def has_updates(self) -> bool: | ||
return bool(self.rooms.get_updates()) or bool(self.receipts.get_updates()) | ||
return ( | ||
bool(self.rooms.get_updates()) | ||
or bool(self.receipts.get_updates()) | ||
or bool(self.get_room_config_updates()) | ||
) | ||
|
||
def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]: | ||
"""Get updates to the room sync config""" | ||
return self.previous_room_configs.maps[0] | ||
|
||
|
||
@attr.s(auto_attribs=True) | ||
|
@@ -3315,7 +3439,7 @@ class SlidingSyncConnectionStore: | |
to mapping of room ID to `HaveSentRoom`. | ||
""" | ||
|
||
# `(user_id, conn_id)` -> `connection_position` -> `PerConnectionState` | ||
# `(user_id, conn_id)` -> `token` -> `PerConnectionState` | ||
_connections: Dict[Tuple[str, str], Dict[int, PerConnectionState]] = attr.Factory( | ||
dict | ||
) | ||
|
@@ -3345,8 +3469,8 @@ async def get_per_connection_state( | |
|
||
connection_position = from_token.connection_position | ||
if connection_position == 0: | ||
# Initial sync (request without a `from_token`) starts at `0` so | ||
# there is no existing per-connection state | ||
# The '0' values is a special value to indicate there is no | ||
# per-connection state. | ||
return PerConnectionState() | ||
|
||
conn_key = self._get_connection_key(sync_config) | ||
|
@@ -3365,12 +3489,6 @@ async def record_new_state( | |
from_token: Optional[SlidingSyncStreamToken], | ||
new_connection_state: MutablePerConnectionState, | ||
) -> int: | ||
"""Record updated per-connection state, returning the connection | ||
position associated with the new state. | ||
|
||
If there are no changes to the state this may return the same token as | ||
the existing per-connection state. | ||
""" | ||
prev_connection_token = 0 | ||
if from_token is not None: | ||
prev_connection_token = from_token.connection_position | ||
|
@@ -3386,12 +3504,7 @@ async def record_new_state( | |
new_store_token = prev_connection_token + 1 | ||
sync_statuses.pop(new_store_token, None) | ||
|
||
# We copy the `MutablePerConnectionState` so that the inner `ChainMap`s | ||
# don't grow forever. | ||
sync_statuses[new_store_token] = PerConnectionState( | ||
rooms=new_connection_state.rooms.copy(), | ||
receipts=new_connection_state.receipts.copy(), | ||
) | ||
sync_statuses[new_store_token] = new_connection_state.copy() | ||
|
||
return new_store_token | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This continues to feel horrible especially given new edge cases like this comment. Highly recommend we just update the client to use an initial sync request with
timeline_limit: 20
andrequired_state: []
(which allows us to avoid the extra bytes) to accomplish the exact same thing without introducing any of this bizarre behavior.Previous conversation for context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked with @erikjohnston a bit more and was trying to figure out why the initial sync request doesn't solve this completely. I basically asked the opposite question for how/why timeline trickling/
unstable_expanded_timeline
makes this easier. The following question unlocked a better understanding for one complication when trying to use the initial sync route.This is a valid point!
I'm still leaning towards the side of initial sync being possible to use (and better) and just requires some basic timeline stitching logic. ElementX might already have some stitching and event de-duplication logic to handle what the proxy was doing before that would also cover this case.
Since ElementX doesn't have offline support and throws away events, I think we could just do this:
For the timeline stitching logic, the client can store the latest event in timeline before our initial sync, then find that event in the initial sync
timeline
events and spread backwards from that point. That way, the ongoing sync loop can still append to the end of the timeline and continue seamlessly.So if we have a timeline
[103]
already on the client, we storelatest_event_id = 103
, do our initial sync which returns[100, 101, 102, 103, 104]
and we splice/spread in only[100, 101, 102, 103]
accordingly (drop any new events after thelatest_event_id
from the initial sync response). This makes it so that even if the ongoing sync loop sends104
before or after our initial sync does, it still appends like normal and everything is in seamless order.If there are so many new messages sent in the time between us storing the
latest_event_id
and the initial sync responding that we now have a gap, we can just throw away our initial sync events because we have enough events to fill up the timeline just from our normal ongoing sync loop.To be clear, the client doesn't need to be fancy about stitching:
If the client had more timeline like
[98, 99, 100, 101, 102, 103]
, we storelatest_event_id = 103
, we start the initial sync, our ongoing sync loop races us and returns104
which makes our timeline look like[98, 99, 100, 101, 102, 103, 104]
. Then our initial sync responds with[100, 101, 102, 103, 104]
, we find the103
spot in the response to slice at and place it at the103
spot in the client timeline leaving us with[100, 101, 102, 103, 104]
Pseudo code (maybe off-by-one errors):