Skip to content

Commit

Permalink
Sliding Sync: Use stream_ordering based timeline pagination for inc…
Browse files Browse the repository at this point in the history
…remental sync (#17510)

Use `stream_ordering` based `timeline` pagination for incremental
`/sync` in Sliding Sync. Previously, we were always using a
`topological_ordering` but we should only be using that for historical
scenarios (initial `/sync`, newly joined, or haven't sent the room down
the connection before).

This is slightly different than what the [spec
suggests](https://spec.matrix.org/v1.10/client-server-api/#syncing)

> Events are ordered in this API according to the arrival time of the
event on the homeserver. This can conflict with other APIs which order
events based on their partial ordering in the event graph. This can
result in duplicate events being received (once per distinct API
called). Clients SHOULD de-duplicate events based on the event ID when
this happens.

But we've had a [discussion below in this
PR](#17510 (comment))
and this matches what Sync v2 already does and seems like it makes
sense. Created a spec issue
matrix-org/matrix-spec#1917 to clarify this.

Related issues:

 - matrix-org/matrix-spec#1917
 - matrix-org/matrix-spec#852
 - matrix-org/matrix-spec-proposals#4033
  • Loading branch information
MadLittleMods authored Aug 7, 2024
1 parent 30e9f6e commit 11db575
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 126 deletions.
1 change: 1 addition & 0 deletions changelog.d/17510.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix timeline ordering (using `stream_ordering` instead of topological ordering) in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
6 changes: 4 additions & 2 deletions docs/development/room-dag-concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ incrementing integer, but backfilled events start with `stream_ordering=-1` and

---

- `/sync` returns things in the order they arrive at the server (`stream_ordering`).
- `/messages` (and `/backfill` in the federation API) return them in the order determined by the event graph `(topological_ordering, stream_ordering)`.
- Incremental `/sync?since=xxx` returns things in the order they arrive at the server
(`stream_ordering`).
- Initial `/sync`, `/messages` (and `/backfill` in the federation API) return them in
the order determined by the event graph `(topological_ordering, stream_ordering)`.

The general idea is that, if you're following a room in real-time (i.e.
`/sync`), you probably want to see the messages as they arrive at your server,
Expand Down
10 changes: 8 additions & 2 deletions synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,14 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") ->
# events that we have and then filtering, this isn't the most
# efficient method perhaps but it does guarantee we get everything.
while True:
events, _ = await self._store.paginate_room_events(
room_id, from_key, to_key, limit=100, direction=Direction.FORWARDS
events, _ = (
await self._store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_key,
to_key=to_key,
limit=100,
direction=Direction.FORWARDS,
)
)
if not events:
break
Expand Down
32 changes: 18 additions & 14 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,13 +507,15 @@ async def get_messages(

# Initially fetch the events from the database. With any luck, we can return
# these without blocking on backfill (handled below).
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
events, next_key = (
await self.store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)
)

if pagin_config.direction == Direction.BACKWARDS:
Expand Down Expand Up @@ -582,13 +584,15 @@ async def get_messages(
# If we did backfill something, refetch the events from the database to
# catch anything new that might have been added since we last fetched.
if did_backfill:
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
events, next_key = (
await self.store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)
)
else:
# Otherwise, we can backfill in the background for eventual
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1750,7 +1750,7 @@ async def get_new_events(
from_key=from_key,
to_key=to_key,
limit=limit or 10,
order="ASC",
direction=Direction.FORWARDS,
)

events = list(room_events)
Expand Down
40 changes: 37 additions & 3 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@
ROOM_UNKNOWN_SENTINEL,
Sentinel as StateSentinel,
)
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
from synapse.storage.databases.main.stream import (
CurrentStateDeltaMembership,
PaginateFunction,
)
from synapse.storage.roommember import MemberSummary
from synapse.types import (
DeviceListUpdates,
Expand Down Expand Up @@ -1863,10 +1866,13 @@ async def get_room_sync_data(
# We should return historical messages (before token range) in the
# following cases because we want clients to be able to show a basic
# screen of information:
#
# - Initial sync (because no `from_token` to limit us anyway)
# - When users `newly_joined`
# - For an incremental sync where we haven't sent it down this
# connection before
#
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
from_bound = None
initial = True
if from_token and not room_membership_for_user_at_to_token.newly_joined:
Expand Down Expand Up @@ -1927,7 +1933,36 @@ async def get_room_sync_data(
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)

timeline_events, new_room_key = await self.store.paginate_room_events(
# For initial `/sync` (and other historical scenarios mentioned above), we
# want to view a historical section of the timeline; to fetch events by
# `topological_ordering` (best representation of the room DAG as others were
# seeing it at the time). This also aligns with the order that `/messages`
# returns events in.
#
# For incremental `/sync`, we want to get all updates for rooms since
# the last `/sync` (regardless if those updates arrived late or happened
# a while ago in the past); to fetch events by `stream_ordering` (in the
# order they were received by the server).
#
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
#
# FIXME: Using workaround for mypy,
# https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and
# https://github.com/python/mypy/issues/17479
paginate_room_events_by_topological_ordering: PaginateFunction = (
self.store.paginate_room_events_by_topological_ordering
)
paginate_room_events_by_stream_ordering: PaginateFunction = (
self.store.paginate_room_events_by_stream_ordering
)
pagination_method: PaginateFunction = (
# Use `topographical_ordering` for historical events
paginate_room_events_by_topological_ordering
if from_bound is None
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
timeline_events, new_room_key = await pagination_method(
room_id=room_id,
# The bounds are reversed so we can paginate backwards
# (from newer to older events) starting at to_bound.
Expand All @@ -1938,7 +1973,6 @@ async def get_room_sync_data(
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=room_sync_config.timeline_limit + 1,
event_filter=None,
)

# We want to return the events in ascending order (the last event is the
Expand Down
69 changes: 51 additions & 18 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

from synapse.api.constants import (
AccountDataTypes,
Direction,
EventContentFields,
EventTypes,
JoinRules,
Expand All @@ -64,6 +65,7 @@
)
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.stream import PaginateFunction
from synapse.storage.roommember import MemberSummary
from synapse.types import (
DeviceListUpdates,
Expand Down Expand Up @@ -879,22 +881,49 @@ async def _load_filtered_recents(
since_key = since_token.room_key

while limited and len(recents) < timeline_limit and max_repeat:
# If we have a since_key then we are trying to get any events
# that have happened since `since_key` up to `end_key`, so we
# can just use `get_room_events_stream_for_room`.
# Otherwise, we want to return the last N events in the room
# in topological ordering.
if since_key:
events, end_key = await self.store.get_room_events_stream_for_room(
room_id,
limit=load_limit + 1,
from_key=since_key,
to_key=end_key,
)
else:
events, end_key = await self.store.get_recent_events_for_room(
room_id, limit=load_limit + 1, end_token=end_key
)
# For initial `/sync`, we want to view a historical section of the
# timeline; to fetch events by `topological_ordering` (best
# representation of the room DAG as others were seeing it at the time).
# This also aligns with the order that `/messages` returns events in.
#
# For incremental `/sync`, we want to get all updates for rooms since
# the last `/sync` (regardless if those updates arrived late or happened
# a while ago in the past); to fetch events by `stream_ordering` (in the
# order they were received by the server).
#
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
#
# FIXME: Using workaround for mypy,
# https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and
# https://github.com/python/mypy/issues/17479
paginate_room_events_by_topological_ordering: PaginateFunction = (
self.store.paginate_room_events_by_topological_ordering
)
paginate_room_events_by_stream_ordering: PaginateFunction = (
self.store.paginate_room_events_by_stream_ordering
)
pagination_method: PaginateFunction = (
# Use `topographical_ordering` for historical events
paginate_room_events_by_topological_ordering
if since_key is None
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
events, end_key = await pagination_method(
room_id=room_id,
# The bounds are reversed so we can paginate backwards
# (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=end_key,
to_key=since_key,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=load_limit + 1,
)
# We want to return the events in ascending order (the last event is the
# most recent).
events.reverse()

log_kv({"loaded_recents": len(events)})

Expand Down Expand Up @@ -2641,9 +2670,10 @@ async def _get_room_changes_for_incremental_sync(
# a "gap" in the timeline, as described by the spec for /sync.
room_to_events = await self.store.get_room_events_stream_for_rooms(
room_ids=sync_result_builder.joined_room_ids,
from_key=since_token.room_key,
to_key=now_token.room_key,
from_key=now_token.room_key,
to_key=since_token.room_key,
limit=timeline_limit + 1,
direction=Direction.BACKWARDS,
)

# We loop through all room ids, even if there are no new events, in case
Expand All @@ -2654,6 +2684,9 @@ async def _get_room_changes_for_incremental_sync(
newly_joined = room_id in newly_joined_rooms
if room_entry:
events, start_key = room_entry
# We want to return the events in ascending order (the last event is the
# most recent).
events.reverse()

prev_batch_token = now_token.copy_and_replace(
StreamKeyType.ROOM, start_key
Expand Down
Loading

0 comments on commit 11db575

Please sign in to comment.