You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have a streaming job inserting records into an Elasticsearch cluster. I set the batch size appropriately big, but I found out this is not causing any effect at all: I found that all elements are inserted in batches of 1 or 2 elements.
The reason seems to be that this is a streaming pipeline, which may result in tiny bundles. Since ElasticsearchIO uses @FinishBundle to flush a batch, this will result in equally small batches.
This results in a huge amount of bulk requests with just one element, grinding the Elasticsearch cluster to a halt.
I have now been able to work around this by using a GroupIntoBatches operation before the insert, but this results in 3 steps (mapping to a key, applying GroupIntoBatches, stripping key and outputting all collected elements), making the process quite awkward.
A much better approach would be to internalize this into the ElasticsearchIO write transform.. Use a timer that flushes the batch at batch size or end of window, not at the end of a bundle.
Imported from Jira BEAM-6886. Original Jira may contain additional context.
Reported by: MadEgg.
The text was updated successfully, but these errors were encountered:
The original is a bit of an old ticket, but this is now addressed as of #14347. TL;DR one can now use .withUseStatefulBatches(true) to employ GroupIntoBatches internally such that bulk API requests will have maxBatchSize elements (sometimes fewer if also using non-global windowing).
IMO this ticket can be closed because the feature is implemented 🙂
I have a streaming job inserting records into an Elasticsearch cluster. I set the batch size appropriately big, but I found out this is not causing any effect at all: I found that all elements are inserted in batches of 1 or 2 elements.
The reason seems to be that this is a streaming pipeline, which may result in tiny bundles. Since ElasticsearchIO uses
@FinishBundle
to flush a batch, this will result in equally small batches.This results in a huge amount of bulk requests with just one element, grinding the Elasticsearch cluster to a halt.
I have now been able to work around this by using a
GroupIntoBatches
operation before the insert, but this results in 3 steps (mapping to a key, applying GroupIntoBatches, stripping key and outputting all collected elements), making the process quite awkward.A much better approach would be to internalize this into the ElasticsearchIO write transform.. Use a timer that flushes the batch at batch size or end of window, not at the end of a bundle.
Imported from Jira BEAM-6886. Original Jira may contain additional context.
Reported by: MadEgg.
The text was updated successfully, but these errors were encountered: