Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust MicroBatchSize dynamically based on throttling rate in BulkExecutor #22290

Merged

Conversation

FabianMeiswinkel
Copy link
Member

@FabianMeiswinkel FabianMeiswinkel commented Jun 15, 2021

This PR changes the way how we determine the micro-batch size when using bulk execution (CosmosAsyncContainer.processBulkOperations). Instead of relying on a user provided static micro batch size the size gets dynamically adjusted based on the percentage of throttled operations (either because the entire batch request is throttled or when the batch request is partially successful with some operations being throttled).
To be able to do this in the BulkExecutor I had to change the behavior in the ClientRetryPolicy's ResourceThrottlingRetryPolicy to allow 429s to bubble up to the BulkExecutor - so that the BulkExecutor's ResourceThrottlingRetryPolicy could trigger the retry and account for the throttled operations that way.
The riskiest change was that reactor's bufferTimeout operator doesn't allow specifying a dynamic maxBufferSize - so I had to switch to another operator (bufferUntil) and implement a custom timer triggered mechanism to flush the buffers to drain the remaining operations form the buffers. This timer based mechanism would only be triggered after the input Flux (the user provided Flux of Operations to be executed) has been closed.

From my initial tests with the Spark end-to-end samples this approach works very well. It reduces the percentage of throttled requests significantly when no client-throughput control is enabled and with client throughput control it helps reducing the micro batch size so that the achievable throughput is as expected while also allowing the throughput to be limited reasonably well.

@FabianMeiswinkel FabianMeiswinkel changed the title [DRAFT - do not review yet]Adjust MicroBatchSize dynamically based on throttling rate in BulkExecutor Adjust MicroBatchSize dynamically based on throttling rate in BulkExecutor Jun 16, 2021
@@ -14,7 +14,9 @@
public static final int MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST = 100;

public static final int DEFAULT_MAX_MICRO_BATCH_INTERVAL_IN_MILLISECONDS = 100;
public static final int DEFAULT_MAX_MICRO_BATCH_CONCURRENCY = 2;
public static final int DEFAULT_MAX_MICRO_BATCH_CONCURRENCY = 1;
Copy link
Member

@xinlian12 xinlian12 Jun 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change the concurrency here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because concurrency of 2 is pretty much always wrong. Even a single client with 4 cores can easily saturate the 10,000 RU of a physical partition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this change was intentional - but leaving the comment open for others to chime in as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the concurrency should always be 1, then should we remove the setMaxMicroBatchConcurrency public api from bulkOption?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only scenario where I think a higher maxConcurrency makes sense is if you build a webservice accepting requests containing info that results in multiple documents you want to ingest - so where your call to processBulkOperations would only contain let's say a couple dozen/hundred documents. I can imagine that latency might be better with higher concurrency - but this is an edge case - the concurrency can still be modified by customers but 1 as default seems to be meeting most scenarios better

@FabianMeiswinkel FabianMeiswinkel merged commit ddafb5f into Azure:main Jun 23, 2021
azure-sdk pushed a commit to azure-sdk/azure-sdk-for-java that referenced this pull request Feb 28, 2023
[Hub Generated] Review request for Microsoft.DBforPostgreSQL to add version stable/2022-11-08 (Azure#22124)

* Adds base for updating Microsoft.DBforPostgreSQL from version preview/2020-10-05-privatepreview to version 2022-11-08

* Updates readme

* Updates API version in new specs and examples

* empty commit (Azure#21932)

Co-authored-by: Ozan Saka <[email protected]>

* 2022 11 08 (Azure#22025)

* Add private endpoint con. and private link endpoints

* Add private link list example

* Cluster related endpoint and exampled changes

* Cluster changes and configuration updates

* Long running operation header additions

* Change server group to cluster

* Add systemData to examples, fix private endpoint con. property

* Fix private link resource name parameter

* Fix INVALID_TYPE boolean given as strings

* Update approve private endp. con. request body

* Fix OBJECT_ADDITIONAL_PROPERTIES and READONLY_PROPERTY_NOT_ALLOWED_IN_REQUEST

* Fix INVALID_TYPE

* Fix ModelValidation Errors

* Fix lint errors

* Fix lint errors

* Remove location from cluster examples

* Update example details, fix common types

* Update readme files, fix configuration example name

* Fix SDK generation by removing wrong pageable tags

* Update configuration response types, fix cluster name pattern

* Update configuration example names

* Readme changes and minor final state via fix

* Fix indentation on suppression items

* Update node count description, remove pec id

* Update cluster examples

* Style fix

* Update description

Co-authored-by: Ozan Saka <[email protected]>
Co-authored-by: Sena Gungor <[email protected]>

* Fix credscan password (Azure#22127)

Co-authored-by: Ozan Saka <[email protected]>

* Add required parameters (Azure#22244)

* Add 200 resp. code to put&patch, change boolean to enum (Azure#22290)

* Add 200 resp. code to put&patch, change boolean to enum

* Update 200 responses

* Update 200 response schemas, enum casing

* Update configuration put requests

* Remove role no-op case

* fix private endp con list error, remove 200 code from role example

* Add coord/node config list endpoints

* fix sdk errors

* add 200 resp for configuration put requests

* Update readme

* Fix arm feedbacks (Azure#22334)

* Fix arm feedbacks

* Add pointInTimeUTC

* Update passwords and readme file

* Update response codes to 201, address feedbacks (Azure#22386)

* Update response codes to 201

* Add server configuration get

* Update password

* Fix duplicate operation id

* Update configuration example files

* Update configuration example files

* Fix credscan issue (Azure#22389)

* Fix PutRequestResponseScheme error (Azure#22408)

* Fix PutRequestResponseScheme error

* Update readme file

* Update x-ms-mutability

* Update property description

* Fix server properties (Azure#22772)

* Fix server properties

---------

Co-authored-by: mozansaka <[email protected]>
Co-authored-by: Ozan Saka <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants