-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: stuck queries when too many events with same timestamp #586
Conversation
SQL Server tests not able to start the database to begin with. Looks like this path
Will try pinning to an earlier version instead. |
e0f3d01
to
6505f54
Compare
take(2) shouldBe Set("A1", "A2") | ||
take(3) shouldBe Set("A3", "B3", "C3") | ||
take(2) shouldBe Set("A4", "B4") | ||
take(2) shouldBe Set("A5", "A6") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Postgres does return the events with the same seq number in the inserted order, but probably shouldn't rely on that for tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
...with a few suggestions
core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
90aeaac
to
f2ace61
Compare
Test failing with |
I've updated to track the timestamp from the previous query, and only apply the extra seq nr filter when it's already known to be the same timestamp for the next query: 09e05fc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good
// only filter by highest seen seq nr when the next query is the same timestamp (or when unknown for initial queries) | ||
private def highestSeenSeqNr(previous: TimestampOffset, latest: TimestampOffset): Option[Long] = | ||
Option.when((previous == TimestampOffset.Zero || previous.timestamp == latest.timestamp) && latest.seen.nonEmpty) { | ||
latest.seen.values.max |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that sequence numbers are per persistence id, so a later timestamp can have an earlier sequence number
That is correct. How is that handled? I guess it is caught by backtracking in the same way as when visibility of pid-A may be seen before another pid-B even though timestamp of pid-B is before pid-A.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the events are there, it handles it by only filtering by highest sequence number for the same timestamp as the query starting timestamp. See also where the query is adjusted, and the comment there.
So if there are events (ordered by timestamp, seq nr):
timestamp | seq nr | pid | event |
---|---|---|---|
t1 | 1 | pid1 | A1 |
t1 | 1 | pid2 | B1 |
t1 | 1 | pid3 | C1 |
t1 | 2 | pid1 | A2 |
t1 | 2 | pid2 | B2 |
t1 | 3 | pid1 | A3 |
t2 | 2 | pid3 | C2 |
t2 | 3 | pid2 | B3 |
t2 | 4 | pid1 | A4 |
And a buffer size of 4. First query only gets to the 4th event (A2). Before it would restart from timestamp >= t1
, stuck on repeating the same 4 events (A1, B1, C1, A2). Now it will use the query timestamp >= t1 AND (timestamp != t1 OR seq_nr >= 2)
for the next query, starting from the 4th event now, processing (A2 (deduplicated), B2, A3, C2). The seq number filter only applies to the starting timestamp, so that the query is otherwise just by timestamp as usual.
Applies for the both the regular queries and the backtracking queries. Backtracking catching any late arriving events as usual.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good, that should work fine. What I was thinking about would be classified as late arriving event.
Some( | ||
dao | ||
.rowsBySlices( | ||
entityType, | ||
minSlice, | ||
maxSlice, | ||
fromTimestamp, | ||
fromSeqNr, | ||
toTimestamp, | ||
behindCurrentTime, | ||
backtracking = newState.backtracking) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to adjust BySliceQuery.deserializeAndAddOffset
? It has a check on the buffer size and throws IllegalStateException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That check is whether the seen map exceeds the buffer size (more persistence ids on the same timestamp than the buffer size). We could adjust it to only throw if all the sequence numbers are the same, which would not be handled by this fix (would be stuck on both timestamp and seq number). But that many persistence ids with events on the exact same timestamp already feels exceptional, so thought that it's useful to leave as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree 👍
Also discussed on internal issue: we may want to have a limit on the number of events that can be persisted together in the first place, as these are committed atomically in a transaction, and it could indicate a bug in user code. Or if we want to support writing large numbers of events in the same operation, we could group events in batches with separate transactions (wouldn't be atomic across all events though). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
We've seen a case where more than 1000 events were persisted together (so have the same timestamp), and more than the default query buffer size, and then queries will be stuck on the same timestamp and never make further progress.
Since events are ordered by timestamp and sequence number, update the queries to also filter by the highest seen seq number for the latest timestamp (only for events with the same timestamp as this starting timestamp). Fixes the case where all the events are for the same persistence id. Queries will still duplicate on the events with both the same timestamp and seq number (different persistence ids), to handle events across the buffer limit. The very edge case of more events than the buffer size all with the same timestamp and the same seq number (from different persistent ids) would not be handled.
So this always ends up adding an additional filter to queries. We could look at only adding this extra conditional check when the latest timestamp is the same as the previous query (that returned results), to only handle this particular case. The backtracking filtered adjustment (updated in this PR) would need to be aware of this too.
Another alternative is to go further, and always do all the filtering on the database side. Adding conditions for all the latest seen sequence numbers, conditional per persistence id. We would then expect no duplicated events from queries. It complicates the queries even more though, with something like
db_timestamp >= :from_timestamp AND ((persistence_id NOT IN (pid1, pid2, ..., pidN) OR (persistence_id = pid1 AND seq_nr > pid1SeqNr) OR (persistence_id = pid2 AND seq_nr > pid2SeqNr) OR ... OR (persistence_id = pidN AND seq_nr > pidNSeqNr))
. But this would also guarantee progress in all cases, and the additional conditions are only for the 'seen' persistence ids that share the same latest timestamp.Marking draft while this is under discussion.