fix(sink): align init epoch for coordinated sinks #20598
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Resolve #19337
In coordinated sinks, different parallelisms coordinate with each other by their epoch progress. More specifically, they should write the data of the same epoch simultaneously, and then commit together on the epoch, and then write the same epoch truncate offset to storage. Since different parallelisms should work on the same epoch, we need to ensure that after recovery they are initialized from the same epoch. However, in sink log store, since we only need to ensure at-least-once delivery, the truncation offset set by log reader is not guaranteed to be applied and written to persistent storage. This works for normal sinks in which different parallelisms work independently. But for coordinated sink, it may happen that, for example, the truncate offset on epoch1 of parallelism1 is applied to storage, but on parallelism2, the offset is not applied. After recovery, parallelism1 will be initialized from the next epoch of epoch1, but parallelism2 will still be initialized from epoch1, and then the two parallelisms won't be initialized at the same epoch, and then cause error in #19337.
In this PR, we introduce a flag to KvLogStoreFactory named
align_epoch_on_init
. The flag will be set to true for coordinated sinks. When this feature is on, in initialization, we won't use left unbounded to read the historical stream. Instead, we will load the truncate offset globally from all vnodes from the committed snapshot, and use the latest truncate offset as the left bound to read historical stream. In the example above, parallelism2 can see that some other parallelisms have written truncate offset on epoch1, and then it will skip epoch1 and initialize from the next epoch, though itself never writes the truncate offset yet.Checklist
Documentation
Release note