Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix missing sync events during historical batch imports #12319

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12319.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug with incremental sync missing events when rejoining/backfilling. Contributed by Nick @ Beeper.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
9 changes: 4 additions & 5 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,11 @@ async def get_state_events(
state_filter = state_filter or StateFilter.all()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is how this changes clients back paginating using a prev_batch token from a /sync timeline section. Those tokens are currently stream tokens, but /messages will return them in topological order with a topological next token. This change means that the first /messages will return events in stream order but will still return a topological next token, meaning that the next /messages will return events in a topological order.

-- @erikjohnston, #12319 (comment)

/messages could be updated to always return in topological_ordering regardless of what type of pagination token is given. That's what it was doing before this PR anyway.

I don't think that is a change we really want to make. My understanding is that that behaviour works correctly currently? Or does that also have issues with backfilled/historical events?

I think we have two assumptions that we should probably enforce:

  • /messages should always sort by topological_ordering regardless of pagination token given
  • /sync should always sort by stream_ordering regardless of pagination token given

This also aligns with what we have documented:

  • /sync returns things in the order they arrive at the server (stream_ordering).
  • /messages (and /backfill in the federation API) return them in the order determined by the event graph (topological_ordering, stream_ordering).

-- Room DAG concepts -> Depth and stream ordering

The current behavior of /messages and /sync returns events as expected but not state.

There is a bug in how incremental /sync returns state which this PR aims to fix. The best way to understand the problem is probably reading #12281 (comment) - but basically when calculating what state to return, /sync is ordering events by topological_ordering even though it should be stream_ordering.


I initially assumed we would want flexibility in how these endpoints sorted events according to the type of pagination token given but it's sounding like we actually want to enforce a given sort according to the endpoint. In which case, we can revert to @Fizzadar initial approach of plumbing a order_by argument which we can set.

Copy link
Member

@erikjohnston erikjohnston Apr 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Sorry for not getting to this yesterday, it sat at about the second thing on my todo list all day :/)

Thanks for the clarification. There's the added bonus that /sync will return events in pagination order for the first batch of events for the room, i.e. in an initial sync or when a user joins the room. (The rationale being that it's equivalent to getting no events then immediately doing a back pagination). This is relevant as we use get_recent_events_for_room for that case as well.

I initially assumed we would want flexibility in how these endpoints sorted events according to the type of pagination token given but it's sounding like we actually want to enforce a given sort according to the endpoint. In which case, we can revert to @Fizzadar initial approach of plumbing a order_by argument which we can set.

I wonder if we're just making this harder for ourselves by re-using the same query to do both /messages and when looking up the state for /sync? I think the query we want to run is simply:

SELECT event_id FROM events
WHERE room_id = ? AND stream_ordering <= ?
ORDER BY stream_ordering DESC
LIMIT 1

So it might be simpler to just have a separate get_last_event_in_room_before function that we call when getting the state instead? Rather than extending the already slightly horrific _paginate_room_events_txn function?


Entirely separately, and I'm not suggesting we necessarily do this now, but it occurs to me that we have a get_forward_extremities_for_room_at_stream_ordering store function that maps from room_id to the list of forward extremities in the room at the time. This can then be used to calculate the "current" state of the room at the time more accurately than the current method. The downside of this approach is that get_forward_extremities_for_room_at_stream_ordering will fail if the stream ordering is too old.

Another approach that has also literally just now occurred to me is that we could use the current_state_delta_stream to work out the changes in state between two stream orderings in the room, as the stream_id column correspond to the stream orderings of the changes (I think?).

Anyway, just wanted to record my thoughts here before I forget them

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fizzadar sorry for going around the houses really slowly on this 😞

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries at all @erikjohnston - totally agree a separate function makes sense here; I've pushed that up in 9a78d14. I just wrapped an existing function get_room_event_before_stream_ordering (which doesn't actually return an event) and pulled out the event.

Have undone the pagination ordering changes too!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Will have a look after I've finished doing the v1.57.0rc1 release ❤️


if at_token:
# FIXME this claims to get the state at a stream position, but
# get_recent_events_for_room operates by topo ordering. This therefore
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = await self.store.get_recent_events_for_room(
room_id, end_token=at_token.room_key, limit=1
room_id,
end_token=at_token.room_key,
limit=1,
order_by="stream",
)

if not last_events:
Expand Down
9 changes: 4 additions & 5 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,12 +683,11 @@ async def get_state_at(
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
"""
# FIXME this claims to get the state at a stream position, but
# get_recent_events_for_room operates by topo ordering. This therefore
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = await self.store.get_recent_events_for_room(
room_id, end_token=stream_position.room_key, limit=1
room_id,
end_token=stream_position.room_key,
limit=1,
order_by="stream",
)
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a test for #3305 so it doesn't regress.

I don't have clear reproduction steps for this one and maybe we can just consider the test we write for #12281 as a subset of this problem and only need one test 🤷


if last_events:
Expand Down
42 changes: 36 additions & 6 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,22 +652,31 @@ def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
return ret

async def get_recent_events_for_room(
self, room_id: str, limit: int, end_token: RoomStreamToken
self,
room_id: str,
limit: int,
end_token: RoomStreamToken,
order_by: str = "topological",
) -> Tuple[List[EventBase], RoomStreamToken]:
"""Get the most recent events in the room in topological ordering.

Args:
room_id
limit
end_token: The stream token representing now.
order_by: Either 'topological' or 'stream' to indicate the order in
which results should be returned.

Returns:
A list of events and a token pointing to the start of the returned
events. The events returned are in ascending topological order.
"""

rows, token = await self.get_recent_event_ids_for_room(
room_id, limit, end_token
room_id,
limit,
end_token,
order_by,
)

events = await self.get_events_as_list(
Expand All @@ -679,14 +688,20 @@ async def get_recent_events_for_room(
return events, token

async def get_recent_event_ids_for_room(
self, room_id: str, limit: int, end_token: RoomStreamToken
self,
room_id: str,
limit: int,
end_token: RoomStreamToken,
order_by: str = "topological",
) -> Tuple[List[_EventDictReturn], RoomStreamToken]:
"""Get the most recent events in the room in topological ordering.

Args:
room_id
limit
end_token: The stream token representing now.
order_by: Either 'topological' or 'stream' to indicate the order in
which results should be returned.

Returns:
A list of _EventDictReturn and a token pointing to the start of the
Expand All @@ -701,6 +716,7 @@ async def get_recent_event_ids_for_room(
self._paginate_room_events_txn,
room_id,
from_token=end_token,
order_by=order_by,
limit=limit,
)

Expand Down Expand Up @@ -1099,6 +1115,7 @@ def _paginate_room_events_txn(
from_token: RoomStreamToken,
to_token: Optional[RoomStreamToken] = None,
direction: str = "b",
order_by: str = "topological",
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
limit: int = -1,
event_filter: Optional[Filter] = None,
) -> Tuple[List[_EventDictReturn], RoomStreamToken]:
Expand All @@ -1111,6 +1128,8 @@ def _paginate_room_events_txn(
to_token: A token which if given limits the results to only those before
direction: Either 'b' or 'f' to indicate whether we are paginating
forwards or backwards from `from_key`.
order_by: Either 'topological' or 'stream' to indicate the order in
which results should be returned.
limit: The maximum number of events to return.
event_filter: If provided filters the events to
those that match the filter.
Expand All @@ -1123,6 +1142,7 @@ def _paginate_room_events_txn(
"""

assert int(limit) >= 0
assert order_by in ("topological", "stream")

# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
Expand All @@ -1133,6 +1153,12 @@ def _paginate_room_events_txn(
else:
order = "ASC"

order_clause = """ORDER BY event.topological_ordering %(order)s, event.stream_ordering %(order)s"""
if order_by == "stream":
order_clause = """ORDER BY event.stream_ordering %(order)s, event.topological_ordering %(order)s"""

order_clause = order_clause % {"order": order}

# The bounds for the stream tokens are complicated by the fact
# that we need to handle the instance_map part of the tokens. We do this
# by fetching all events between the min stream token and the maximum
Expand Down Expand Up @@ -1228,13 +1254,13 @@ def _paginate_room_events_txn(
FROM events AS event
%(join_clause)s
WHERE event.outlier = ? AND event.room_id = ? AND %(bounds)s
ORDER BY event.topological_ordering %(order)s,
event.stream_ordering %(order)s LIMIT ?
%(order_clause)s
LIMIT ?
""" % {
"select_keywords": select_keywords,
"join_clause": join_clause,
"bounds": bounds,
"order": order,
"order_clause": order_clause,
}

txn.execute(sql, args)
Expand Down Expand Up @@ -1275,6 +1301,7 @@ async def paginate_room_events(
from_key: RoomStreamToken,
to_key: Optional[RoomStreamToken] = None,
direction: str = "b",
order_by: str = "topological",
limit: int = -1,
event_filter: Optional[Filter] = None,
) -> Tuple[List[EventBase], RoomStreamToken]:
Expand All @@ -1286,6 +1313,8 @@ async def paginate_room_events(
to_key: A token which if given limits the results to only those before
direction: Either 'b' or 'f' to indicate whether we are paginating
forwards or backwards from `from_key`.
order_by: Either 'topological' or 'stream' to indicate the order in
which results should be returned.
limit: The maximum number of events to return.
event_filter: If provided filters the events to those that match the filter.

Expand All @@ -1303,6 +1332,7 @@ async def paginate_room_events(
from_key,
to_key,
direction,
order_by,
limit,
event_filter,
)
Expand Down