Skip to content

Commit

Permalink
Fix bug in calculating state for non-gappy syncs
Browse files Browse the repository at this point in the history
Unfortunately, the optimisation we applied here for non-gappy syncs is not
actually valid.
  • Loading branch information
richvdh committed Feb 21, 2024
1 parent a50a7f7 commit 96fed6b
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 54 deletions.
1 change: 1 addition & 0 deletions changelog.d/16942.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations.
91 changes: 37 additions & 54 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,8 @@ async def _compute_state_delta_for_full_sync(
)

if batch:
# Strictly speaking, this returns the state *after* the first event in the
# timeline, but that is good enough here.
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
Expand Down Expand Up @@ -1247,25 +1249,25 @@ async def _compute_state_delta_for_incremental_sync(
await_full_state = True
lazy_load_members = False

if batch.limited:
if batch:
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
room_id,
stream_position=end_token,
if batch:
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)

if batch.limited:
# for now, we disable LL for gappy syncs - see
# https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
# N.B. this slows down incr syncs as we are now processing way
Expand All @@ -1280,47 +1282,28 @@ async def _compute_state_delta_for_incremental_sync(
# about them).
state_filter = StateFilter.all()

state_at_previous_sync = await self.get_state_at(
room_id,
stream_position=since_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_at_previous_sync = await self.get_state_at(
room_id,
stream_position=since_token,
state_filter=state_filter,
await_full_state=await_full_state,
)

state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)

state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
timeline_end=state_at_timeline_end,
previous_timeline_end=state_at_previous_sync,
lazy_load_members=lazy_load_members,
)

state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
timeline_end=state_at_timeline_end,
previous_timeline_end=state_at_previous_sync,
lazy_load_members=lazy_load_members,
)
else:
state_ids = {}
if lazy_load_members:
if members_to_fetch and batch.events:
# We're returning an incremental sync, with no
# "gap" since the previous sync, so normally there would be
# no state to return.
# But we're lazy-loading, so the client might need some more
# member events to understand the events in this timeline.
# So we fish out all the member events corresponding to the
# timeline here. The caller will then dedupe any redundant ones.

state_ids = await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
# we only want members!
state_filter=StateFilter.from_types(
(EventTypes.Member, member) for member in members_to_fetch
),
await_full_state=False,
)
return state_ids

async def _find_missing_partial_state_memberships(
Expand Down
105 changes: 105 additions & 0 deletions tests/handlers/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,111 @@ def test_state_includes_changes_on_forks_when_events_excluded(self) -> None:
[s2_event],
)

def test_state_includes_changes_on_ungappy_syncs(self) -> None:
"""Test `state` where the sync is not gappy.
We start with a DAG like this:
E1
↗ ↖
| S2
|
--|---
|
E3
... and initialsync with `limit=1`, represented by the horizontal dashed line.
At this point, we do not expect S2 to appear in the response at all (since
it is excluded from the timeline by the `limit`, and the state is based on the
state after the most recent event before the sync token (E3), which doesn't
include S2.
Now more events arrive, and we do an incremental sync:
E1
↗ ↖
| S2
| ↑
E3 |
↑ |
--|------|----
| |
E4 |
↖ /
E5
This is the last chance for us to tell the client about S2, so it *must* be
included in the response.
"""
alice = self.register_user("alice", "password")
alice_tok = self.login(alice, "password")
alice_requester = create_requester(alice)
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)

# Do an initial sync to get a known starting point.
initial_sync_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester, generate_sync_config(alice)
)
)
last_room_creation_event_id = (
initial_sync_result.joined[0].timeline.events[-1].event_id
)

# Send a state event, and a regular event, both using the same prev ID
with self._patch_get_latest_events([last_room_creation_event_id]):
s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
"event_id"
]
e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]

# Another initial sync, with limit=1
initial_sync_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(
alice,
filter_collection=FilterCollection(
self.hs, {"room": {"timeline": {"limit": 1}}}
),
),
)
)
room_sync = initial_sync_result.joined[0]
self.assertEqual(room_sync.room_id, room_id)
self.assertEqual(
[e.event_id for e in room_sync.timeline.events],
[e3_event],
)
self.assertNotIn(s2_event, [e.event_id for e in room_sync.state.values()])

# More events, E4 and E5
with self._patch_get_latest_events([e3_event]):
e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"]
e5_event = self.helper.send(room_id, "e5", tok=alice_tok)["event_id"]

# Now incremental sync
incremental_sync = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(alice),
since_token=initial_sync_result.next_batch,
)
)

# The state event should appear in the 'state' section of the response.
room_sync = incremental_sync.joined[0]
self.assertEqual(room_sync.room_id, room_id)
self.assertFalse(room_sync.timeline.limited)
self.assertEqual(
[e.event_id for e in room_sync.timeline.events],
[e4_event, e5_event],
)
self.assertEqual(
[e.event_id for e in room_sync.state.values()],
[s2_event],
)

@parameterized.expand(
[
(False, False),
Expand Down

0 comments on commit 96fed6b

Please sign in to comment.