-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Kafka with Redistribute #32344
Fix Kafka with Redistribute #32344
Conversation
Reviewers: @scwhittle |
Assigning reviewers. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
One other benefit of the committed offsets is if it aids customer visibility in to progress of partitions as that can be queried/displayed external to dataflow. Do we need to disable the committing of offsets? I can see the argument that it might not make sense from an exactly-once perspective but given that there are other reasons and that the customer is configuring it explicitly can we just perhaps log a warning that the offsets may not reflect all processed data but still perform them? |
That is a good point, so is an argument to still allow for it. What I dont understand though, is how does reshuffling/redistributing work with commiting offsets? Here is the current graph wiht offsets enabled (pulled from KafkaIO.java incase the formatting of the graph below isn't clear): PCollection --> ParDo(ReadFromKafkaDoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord>>) --> Reshuffle() --> Map(output KafkaRecord) At that point, if redistribute is enabled, does it make more sense to substitute the Reshuffle here for the Redistribute transform? Instead of inserting a reshuffle after the Map? (this would introduce another shuffle based on runner implementation) If we go with the former approach, commits will still occur, but the commits of the commits can have duplicates (need to investigate what that can cause, or will it just be a no op if we attempt to commit the same offset internally in BEAM twice?) |
My PR to remove the Reshuffle for the commit offsets was just merged. So I think the question on if it should be a redistribute if configured might be moot now. But we could disallow commit offsets and redistribute in the expand259 path since it is still an issue there. |
e3af50a
to
aeb694c
Compare
aeb694c
to
9c37865
Compare
Perfect, thats great! |
Reminder, please take a look at this pr: @kennknowles @ahmedabu98 |
R: @scwhittle |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
Outdated
Show resolved
Hide resolved
LOG.warn( | ||
"commitOffsetsInFinalize() will not capture all work processed if set with withRedistribute()"); | ||
} | ||
if (Boolean.TRUE.equals( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haven't seen this before, is this to handle null as a one-liner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it occurs a few more times in this file as well.
if (Boolean.TRUE.equals( | ||
kafkaRead.getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))) { | ||
LOG.warn( | ||
"config.ENABLE_AUTO_COMMIT_CONFIG doesn't need to be set with withRedistribute()"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't need to but it seems like it could still be desirable if they are using the offsets for monitoring.
Should we just remove this log? It seems the correctness is the same as if we were shuffling persistently though the window is perhaps larger if a drain is being performed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, removed it.
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
Outdated
Show resolved
Hide resolved
Run Java_Kafka_IO_Direct PreCommit |
c8f6dad
to
3f6fc4e
Compare
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you change the PR description to be a little clearer too, maybe
"Change KafkaIO read withDuplicates to warn but still commit offsets if configured"
@@ -648,6 +669,29 @@ public void testCommitOffsetsInFinalizeAndRedistributeWarnings() { | |||
"Offsets committed due to usage of commitOffsetsInFinalize() may not capture all work processed due to use of withRedistribute()"); | |||
} | |||
|
|||
@Test | |||
public void testCommitOffsetsInFinalizeAndRedistributeNoWarningsWithAllowDuplicates() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like it should be NoWarningsWithNoAllowDuplicates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do want to mention commits are enabled, since if we do not commit offsets, enabling allow duplicates also has no warning.
updated test name to NoWarningsWithNoAllowDuplicatesAndCommitOffsets, and updated the other test name so its more clear that the two are testing behaviour with and without allowDuplicates=true.
@@ -1696,7 +1696,7 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) { | |||
} | |||
|
|||
if (kafkaRead.isRedistributed()) { | |||
if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) { | |||
if (kafkaRead.isCommitOffsetsInFinalizeEnabled() && kafkaRead.isAllowDuplicates()) { | |||
LOG.warn( | |||
"Offsets committed due to usage of commitOffsetsInFinalize() may not capture all work processed due to use of withRedistribute()"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update the log to reflect allow duplicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
87fe8e2
to
2bb6e3c
Compare
Change KafkaIO read withDuplicates to warn but still commit offsets if configured
Two ways to enable commits,
If the first is true, redistribute can't be enabled
If the second is true, the pipeline can still be passed, but isn't the most optimal, since if the runner wants to enable duplicates, there is no point of introducing the additional overhead of checkpointing messages within Kafka itself., though it is not semantically incorrect. The first option uses internal beam state to track which messages have been processed for preventing duplicates, which doesn't make sense if we also have a runner hint that duplicates can be allowed.
Before this fix, it just meant customers weren't able to set withRedistribute on the transform, even if commits weren't configured.
Updated tests to catch this.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.