-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reduce the maximum size of input splits in Flink to better distribute work #28045
Conversation
90d56d7
to
1b25ca2
Compare
Assigning reviewers. If you would like to opt out of this review, comment R: @robertwb added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
1b25ca2
to
40d2093
Compare
If I understand correctly, you're setting this cap to reduce the ratio between the largest and smallest shard? This seems a reasonable workaround to handle pathologically bad situations. (BTW, the way that Dataflow handles this is that it is not limited by the initial splitting, but rather creates further splits dynamically when some workers have finished their chunk and others have substantial work left to do. This is especially handy when sources are not byte based (or the processing is not evenly spread out among the bytes).) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think we should make a note about this in the top-level CHANGES.md file.
...link/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
Outdated
Show resolved
Hide resolved
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
Outdated
Show resolved
Hide resolved
Thanks for the review @robertwb. I made the suggested changes and also changed the option to be in Mb directly for easier config. |
6e169d0
to
9579fe0
Compare
6114826
to
19f9423
Compare
Thanks. Looks good. |
In the current implementation of file sources, the Flink runner will try to fix the size of input splits using the formula
source estimated size / parallelism
. Most of the time, the resulting desired split size will be much larger than the size of files read by this source. In that case each individual file becomes full split, and the number of splits is larger than parallelism.This creates an issue when the size of individual files varies a lot. At Spotify we commonly read datasets stored on GCS where individual files range from a few Mb to 1 or 2 Gb. Because of this, work distribution is sub-optimal because some workers will be consuming larges splits while other worker consume smaller splits.
This PR limits the maximum size of each split to 128Mb (configurable), which is a decent tradeoff between balancing work and limiting the overhead of consuming splits. I believe Dataflow uses the same strategy.
In our test on real workflows, we measured an improvement in execution time (and therefore resource consumption) of up to 20%. Of course the overall improvement varies a lot from one workflow to another, but we never encountered a case where performances where noticeably degraded.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.