Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client throughput control: Deferring store invocation #22144

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchOper
return ((ItemBulkOperation<?>) itemOperation).getRetryPolicy().shouldRetry(operationResult).flatMap(
result -> {
if (result.shouldRetry) {
groupSink.next(itemOperation);
return Mono.empty();
return this.enqueueForRetry(result.backOffTime, groupSink, itemOperation);
} else {
return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(
itemOperation, cosmosBulkItemResponse, this.batchContext));
Expand Down Expand Up @@ -281,24 +280,38 @@ private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchExec
return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(itemOperation, exception, this.batchContext));
}

private Mono<CosmosBulkOperationResponse<TContext>> enqueueForRetry(
Duration backOffTime,
FluxSink<CosmosItemOperation> groupSink,
CosmosItemOperation itemOperation) {

if (backOffTime == null || backOffTime.isZero()) {
groupSink.next(itemOperation);
return Mono.empty();
} else {
return Mono
.delay(backOffTime)
.flatMap((dumm) -> {
groupSink.next(itemOperation);
return Mono.empty();
});
}
}

private Mono<CosmosBulkOperationResponse<TContext>> retryOtherExceptions(
CosmosItemOperation itemOperation, Exception exception, FluxSink<CosmosItemOperation> groupSink,
CosmosException cosmosException, ItemBulkOperation<?> itemBulkOperation) {
return itemBulkOperation.getRetryPolicy().shouldRetry(cosmosException).flatMap(result -> {
if (result.shouldRetry) {

groupSink.next(itemOperation);
return Mono.empty();
return this.enqueueForRetry(result.backOffTime, groupSink, itemBulkOperation);
} else {

return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(
itemOperation, exception, this.batchContext));
}
});
}

private Mono<TransactionalBatchResponse> executeBatchRequest(PartitionKeyRangeServerBatchRequest serverRequest) {

return this.docClientWrapper.executeBatchRequest(
BridgeInternal.getLink(this.container), serverRequest, null, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public Mono<StoreResponse> invokeResourceOperationAsync(Uri physicalAddress, RxD
request.requestContext.resourcePhysicalAddress = physicalAddress.toString();
}
if (this.throughputControlStore != null) {
return this.throughputControlStore.processRequest(request, this.invokeStoreAsync(physicalAddress, request));
return this.throughputControlStore.processRequest(
request,
Mono.defer(() -> this.invokeStoreAsync(physicalAddress, request)));
}

return this.invokeStoreAsync(physicalAddress, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

package com.azure.cosmos;

import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -14,14 +16,12 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

public class CosmosBulkAsyncTest extends BatchTestBase {

Expand All @@ -38,7 +38,10 @@ public CosmosBulkAsyncTest(CosmosClientBuilder clientBuilder) {
@BeforeClass(groups = {"simple"}, timeOut = SETUP_TIMEOUT)
public void before_CosmosBulkAsyncTest() {
assertThat(this.bulkClient).isNull();
this.bulkClient = getClientBuilder().buildAsyncClient();
ThrottlingRetryOptions throttlingOptions = new ThrottlingRetryOptions()
.setMaxRetryAttemptsOnThrottledRequests(1000000)
.setMaxRetryWaitTime(Duration.ofDays(1));
this.bulkClient = getClientBuilder().throttlingRetryOptions(throttlingOptions).buildAsyncClient();
bulkAsyncContainer = getSharedMultiPartitionCosmosContainer(this.bulkClient);
}

Expand All @@ -47,6 +50,75 @@ public void afterClass() {
safeCloseAsync(this.bulkClient);
}

@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void createItem_withBulkAndThroughputControl() throws InterruptedException {
int totalRequest = getTotalRequest(180, 200);

PartitionKeyDefinition pkDefinition = new PartitionKeyDefinition();
pkDefinition.setPaths(Collections.singletonList("/mypk"));
CosmosAsyncContainer bulkAsyncContainerWithThroughputControl = createCollection(
this.bulkClient,
bulkAsyncContainer.getDatabase().getId(),
new CosmosContainerProperties(UUID.randomUUID().toString(), pkDefinition));

ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder()
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
.setGroupName("test-group")
.setTargetThroughputThreshold(0.2)
.setDefault(true)
.build();
bulkAsyncContainerWithThroughputControl.enableLocalThroughputControlGroup(groupConfig);

Flux<CosmosItemOperation> cosmosItemOperationFlux = Flux.merge(
Flux.range(0, totalRequest).map(i -> {
String partitionKey = UUID.randomUUID().toString();
TestDoc testDoc = this.populateTestDoc(partitionKey);

return BulkOperations.getUpsertItemOperation(testDoc, new PartitionKey(partitionKey));
}),
Flux.range(0, totalRequest).map(i -> {
String partitionKey = UUID.randomUUID().toString();
EventDoc eventDoc = new EventDoc(UUID.randomUUID().toString(), 2, 4, "type1", partitionKey);

return BulkOperations.getUpsertItemOperation(eventDoc, new PartitionKey(partitionKey));
}));

BulkProcessingOptions<CosmosBulkAsyncTest> bulkProcessingOptions = new BulkProcessingOptions<>();
bulkProcessingOptions.setMaxMicroBatchSize(100);
bulkProcessingOptions.setMaxMicroBatchConcurrency(5);

try {
Flux<CosmosBulkOperationResponse<CosmosBulkAsyncTest>> responseFlux = bulkAsyncContainerWithThroughputControl
.processBulkOperations(cosmosItemOperationFlux, bulkProcessingOptions);

Thread.sleep(1000);

AtomicInteger processedDoc = new AtomicInteger(0);
responseFlux
.flatMap((CosmosBulkOperationResponse<CosmosBulkAsyncTest> cosmosBulkOperationResponse) -> {

processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
assertThat(cosmosBulkItemResponse.getSessionToken()).isNotNull();
assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull();
assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull();

return Mono.just(cosmosBulkItemResponse);
}).blockLast();

assertThat(processedDoc.get()).isEqualTo(totalRequest * 2);
} finally {
bulkAsyncContainerWithThroughputControl.delete().block();
}
}

@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void createItem_withBulk() {
int totalRequest = getTotalRequest();
Expand Down Expand Up @@ -79,6 +151,11 @@ public void createItem_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}

assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand Down Expand Up @@ -127,6 +204,10 @@ public void createItemMultipleTimesWithOperationOnFly_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand All @@ -150,6 +231,10 @@ public void createItemMultipleTimesWithOperationOnFly_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand Down Expand Up @@ -199,6 +284,10 @@ public void runCreateItemMultipleTimesWithFixedOperations_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand All @@ -224,6 +313,10 @@ public void runCreateItemMultipleTimesWithFixedOperations_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CONFLICT.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand Down Expand Up @@ -323,6 +416,10 @@ public void upsertItem_withbulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand All @@ -344,7 +441,7 @@ public void upsertItem_withbulk() {

@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void deleteItem_withBulk() {
int totalRequest = getTotalRequest();
int totalRequest = Math.min(getTotalRequest(), 20);

List<CosmosItemOperation> cosmosItemOperations = new ArrayList<>();

Expand Down Expand Up @@ -376,6 +473,10 @@ public void deleteItem_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.NO_CONTENT.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand Down Expand Up @@ -424,6 +525,10 @@ public void readItem_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand Down Expand Up @@ -477,6 +582,10 @@ public void readItemMultipleTimes_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand Down Expand Up @@ -539,6 +648,10 @@ public void replaceItem_withBulk() {
processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand Down Expand Up @@ -572,6 +685,10 @@ private void createItemsAndVerify(List<CosmosItemOperation> cosmosItemOperations

processedDoc.incrementAndGet();
CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
fail(cosmosBulkOperationResponse.getException().toString());
}
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
Expand All @@ -595,10 +712,14 @@ private void createItemsAndVerify(List<CosmosItemOperation> cosmosItemOperations
assertThat(distinctIndex.size()).isEqualTo(cosmosItemOperations.size());
}

private int getTotalRequest() {
int countRequest = new Random().nextInt(100) + 200;
private int getTotalRequest(int min, int max) {
int countRequest = new Random().nextInt(max - min) + min;
logger.info("Total count of request for this test case: " + countRequest);

return countRequest;
}

private int getTotalRequest() {
return getTotalRequest(200, 300);
}
}
Loading