-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove expensive shuffle of read data in KafkaIO when using sdf and commit offsets #31682
Remove expensive shuffle of read data in KafkaIO when using sdf and commit offsets #31682
Conversation
@kennknowles I'd appreciate you to take a look and let me know if this makes sense with the beam model in general. I don't think that the Reshuffle adds any more guarantees about barriers for other runners than the combine itself would but could use some confirmation. |
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
I think the problem is that no runner actually implements RequiresStableInput. If Dataflow implemented it, the implemention would be a shuffle. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we assume RequiresStableInput does not work (which I'm almost certain it does not) then this is incorrect, no? It is easy to see that KafkaCommitOffset might have written offets to Kafka so it will never vend the messages again, but the main message processing could fail and need to retrieve them
There is still a shuffle of the offsets due to the combine-per-key before the offset commit fn. If there was not that shuffle, having RequiresStableInput insert a shuffle would be ok because this shuffle would just be of the commit offsets not the read data and thus cheap. We need to guarantee that offsets are committed to kafka only once the records have been processed by the system such that no records are lost if the pipeline is drained. For other runners such as Flink, is Reshuffle more powerful than any other groupby key, i.e. does it insert some sort of checkpoint barrier?
With flink, if data is just flowing through before checkpoint barrier, there is nothing to prevent offsets from being committed to Kafka before the checkpoint passes, so it seems this reshuffle doesn't provide any further guarantees. |
Yes, for Flink this really needs to be something that happens after the checkpoint is known to be durable. To do this right, we probably could use a semantic construct for that, which would also solve it for Dataflow. Given that everything about this implementation depends on Dataflow's idiosyncracies, I'm OK with modifying it to further depend on them but have better performance. |
cc5e2b0
to
f997145
Compare
This change causes an error when updating the Dataflow pipeline from previous version, but the update can be allowed by passing:
@kennknowles Is that sufficient or should I add some option to maintain the previous expensive behavior? Are there any other concerns with this change? Thanks! |
Thanks for raising this. I hate to bring it in, because it is a pain, but we do have a mechanism where the user can pass For you, the work is to keep the old code as-is in a deprecated fork of This allows us to make update-incompatible changes to the default codepath without breaking users with long-running pipeline that want to upgrade the SDK for some other reason. It is fine for users who want this new improvement to have to pass the parameter you mentioned, or even to have to drain. |
f997145
to
5b1c418
Compare
@kennknowles PTAL, I added the legacy support back via the option you mentioned. |
5b1c418
to
4f81e6c
Compare
Assigning reviewers. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
if (Comparators.lexicographical(Comparator.<String>naturalOrder()) | ||
.compare(requestedVersion, targetVersion) | ||
< 0) { | ||
useLegacyImplementation = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Readability: I prefer the approach of a hard fork, where the first thing you do with expand is to take a different path depending on the version. This keeps the cyclomatic complexity of the main logic to a minimum. It is also more precise than "legacy" which has some connotations but isn't as good as expand_2_59_0
. I can see how here you are only changing a tiny bit of logic based on this, but I would still prefer distinct methods called where each method is a straight-line implementation for a particular version range.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK to merge and get it in the release ASAP but I really care about this follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified PTAL. Fixed the test serialization issue. Running some manual update compatability checks.
8716f29
to
10d8cd9
Compare
Manually verified against dataflow service that:
|
|
||
@RequiresStableInput | ||
@ProcessElement | ||
@SuppressWarnings("nullness") // startBundle guaranteed to initialize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: don't suppress. Even though startbundle is guaranteed to initialize, it is not guaranteed that whoever is calling this class obeys the contract of calling methods in a particular order. In fact, it is incredibly common to get it wrong. (this style of class is bad, but it is too late now)
if (Comparators.lexicographical(Comparator.<String>naturalOrder()) | ||
.compare(requestedVersion, targetVersion) | ||
< 0) { | ||
return expand259Commits( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: would be preferable to have the parallel constructs appear as parallel in the code, e.g.
if (...259) {
expand259commits
} else {
expandcommits
}
whereas now we have some inline and some factored even though they are analogous
The reshuffle adds a barrier so that the read records were committed before possibly committing offsets. However this is unnecessary as the DoFn is annotated with RequiresStableInput and the fusion barrier can be introduced with just shuffling the offsets.
In both cases it is possible for a commit offset to be committed back to kafka before the data has been entirely processed by the pipeline. However with support to drain data from the pipeline (such as in Cloud Dataflow) this allows for exactly-once semantics for KafkaIO via the committed offsets to the topics across pipeline drain and restart.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.