Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve BatchElements documentation #32082

Merged
merged 7 commits into from
Aug 30, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/transforms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,13 @@ class BatchElements(PTransform):
corresponding to its contents. Each batch is emitted with a timestamp at
the end of their window.

When the max_batch_duration_secs arg is provided, a stateful implementation
of BatchElements is used to batch elements across bundles. This is most
impactful in streaming applications where many bundles only contain one
element. Larger max_batch_duration_secs values will reduce the throughput of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Larger max_batch_duration_secs values will reduce the throughput

is xput the right term here? I feel like longer duration should increase xput, because we reduce the per-element overhead. At least, if we measure xput for over a sufficiently long duration, say elements per hour.

However the added latency might result in increased data freshness reading for downstream stages. https://cloud.google.com/dataflow/docs/guides/using-monitoring-intf#data_freshness_streaming.

WDYT about the following:

Larger max_batch_duration_secs values might increase the overall the throughput of the transform, but might negatively impact the data freshness on downstream transforms due to added latency. Smaller values will have less impact on data freshness, but might make batches smaller than the target batch size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throughput is the right term, that batching can create a bottleneck.

The documentation at https://beam.apache.org/documentation/patterns/batch-elements/ should outline it more clearly as far as tuning, I think routing users there along with the new docstring content will help a lot

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so batching can cause the pipeline to emit less # of elements per sufficiently large unit of time?

Copy link
Contributor Author

@jrmccluskey jrmccluskey Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially, yes. The slowdown is most pronounced when looking at the incomplete bundle case. If we weren't using the stateful batching, elements are emitted downstream as they arrive to BatchElements since they're single-element bundles. If nine elements arrive within the span of five seconds, you've emitted those nine elements in that span as well (abstracting away any overhead from the code emitting the bundles of 1.) Meanwhile, if we're statefully batching and the target batch size is greater than 9 and our maximum buffer time is greater than 5 seconds, we'd be emitting those 9 elements at a later time, but together. We're just artificially increasing the denominator in the fraction. The hope is that this tradeoff has some benefit to the downstream operation that is worth this bottleneck potential, but the documentation around that tradeoff was lacking prior

the transform, while smaller values will improve the throughput but make it
jrmccluskey marked this conversation as resolved.
Show resolved Hide resolved
more likely that batches are smaller than the target batch size.

Args:
min_batch_size: (optional) the smallest size of a batch
max_batch_size: (optional) the largest size of a batch
Expand Down
Loading