Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Datastream for batch #31950

Closed
wants to merge 38 commits into from
Closed

Conversation

jto
Copy link
Contributor

@jto jto commented Jul 23, 2024

This PR contains several optimizations for the Datastream API when used in Batch mode. In it's current state, using Datastream for batch is much slower than Dataset, this is an attempt to get it to the same performance level.

The following optimizations are implemented:

Same optimisation as #28045 for Datastream.

It has the same benefits with Datastream as with Dataset (up to 20% speedup).
I discovered the patch was also necessary for Datastream while migrating existing workflows from dataset to datastream by passing --useDataStreamForBatch.

Use a lazy enumerator for bounded IOs reads

The existing enumerator eagerly distributes splits to workers. When splits are not all equal in size, the distribution causes a lot of skew. The new implementation is mimicking the behaviours of Flink's StaticFileSplitEnumerator where splits are lazily distributed to workers as they are consumed which results in better load balancing.

Set the serializer on Bounded reads.

For some reason serializer was not set on Bounded reads.

TODO

Fix BQ IO issue

BQ writes do not behave the same with Datastream and garbage collection is much much slower. In dataset the IO will create 1 temp file per worker, this is not true with Datastream where it creates a lot (20x) more files.

Fix double encoding of window in GBK and CombinePerKey

Before shuffle KV are converted to KeyedWorkItem, however the actual stream type is:
DataStream<WindowedValue<KeyedWorkItem<K, byte[]>>>
Both KeyedWorkItem and WindowedValue serialize the window. Since the conversion happens before keyBy (shuffle), the duplication directly results in network overhead.

I tried the simplest fix: move conversion after keyBy but WindowDoFnOperator needs the stream it transforms be keyed so turning ToBinaryKeyedWorkItem -> keyBy -> transform(doFnOperator) into keyBy -> ToBinaryKeyedWorkItem -> transform(doFnOperator) is not possible.

I also tried a similar fix using reinterpretAsKeyedStream to avoid this problem. The chain becomes: ToBinaryKV -> keyBy -> ToKeyedWorkItem -> reinterpretAsKeyedStream -> transform(doFnOperator) but reinterpretAsKeyedStream breaks operator chaining between ToBinaryKeyedWorkItem and the following operator which degrades performances even more.

The best fix would be to not need KeyedWorkItem but that'd be a large change in Beam.

Missing pre-shuffle combine on redure operator

The Dataset translation will translate reduce into a partial reduce -> shuffle -> reduce. The Datastream translator is missing this optimization which make reduce operations much slower.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@jto jto force-pushed the julient/patched-2.56 branch from 568e44c to 9641b2a Compare July 24, 2024 09:43
@jto jto changed the title Reduce the maximum size of input splits in Flink to better distribute work in Datastream API Optimize Datastream API for batch Jul 26, 2024
@jto jto changed the title Optimize Datastream API for batch Optimize Datastream for batch Jul 26, 2024
@jto jto changed the base branch from master to release-2.56.0 July 26, 2024 10:06
@github-actions github-actions bot added the core label Aug 20, 2024
@jto jto closed this Aug 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant