From 20d8416c9308f2ea9d7f4a1688c043c2f3af0893 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Mon, 7 Jun 2021 22:35:50 +0000 Subject: [PATCH 1/6] Dummy --- .../implementation/DiagnosticsClientContext.java | 4 ++++ .../cosmos/implementation/RxDocumentClientImpl.java | 10 ++++++++++ .../implementation/RxDocumentServiceRequest.java | 2 +- .../cosmos/implementation/batch/BulkExecutor.java | 13 +++++++++++-- .../ThroughputRequestThrottler.java | 13 ++++++++++++- .../java/com/azure/cosmos/CosmosBulkAsyncTest.java | 12 +++++++++--- 6 files changed, 47 insertions(+), 7 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java index e734c3a5e1c8a..2e6baebf9b4b7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java @@ -29,6 +29,10 @@ public interface DiagnosticsClientContext { CosmosDiagnostics createDiagnostics(); + public void incrementRetryCount() ; + + public int getRetryCount(); + static final class ClientContextSerializer extends StdSerializer { private final static Logger logger = LoggerFactory.getLogger(ClientContextSerializer.class); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 6d65e1cc98a31..a5b532d81fe88 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -177,6 +177,16 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization private final AtomicBoolean throughputControlEnabled; private ThroughputControlStore throughputControlStore; + private Integer retryCount = 0; + + public void incrementRetryCount() { + this.retryCount += 1; + } + + public int getRetryCount() { + return this.retryCount; + } + public RxDocumentClientImpl(URI serviceEndpoint, String masterKeyOrResourceToken, List permissionFeed, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index ce66909dde7f7..f09fcafa87498 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -30,7 +30,7 @@ */ public class RxDocumentServiceRequest implements Cloneable { - private final DiagnosticsClientContext clientContext; + public final DiagnosticsClientContext clientContext; public volatile boolean forcePartitionKeyRangeRefresh; public volatile boolean forceCollectionRoutingMapRefresh; private String resourceId; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java index 4b2545e007e0f..62c46c9b4fa44 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java @@ -234,8 +234,17 @@ private Mono> handleTransactionalBatchOper return ((ItemBulkOperation) itemOperation).getRetryPolicy().shouldRetry(operationResult).flatMap( result -> { if (result.shouldRetry) { - groupSink.next(itemOperation); - return Mono.empty(); + if (result.backOffTime == null || result.backOffTime.isZero()) { + groupSink.next(itemOperation); + return Mono.empty(); + } else { + return Mono + .delay(result.backOffTime) + .flatMap((dumm) -> { + groupSink.next(itemOperation); + return Mono.empty(); + }); + } } else { return Mono.just(BridgeInternal.createCosmosBulkOperationResponse( itemOperation, cosmosBulkItemResponse, this.batchContext)); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputRequestThrottler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputRequestThrottler.java index 3a1197bc55e2d..1f0f8ecc52f76 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputRequestThrottler.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputRequestThrottler.java @@ -8,6 +8,7 @@ import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.RequestRateTooLargeException; +import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.RxDocumentServiceResponse; import com.azure.cosmos.implementation.Utils; @@ -90,7 +91,17 @@ public Mono processRequest(RxDocumentServiceRequest request, Mono orig return value; })); - if (this.availableThroughput.get() > 0) { + boolean shouldAccept; + if (request.getOperationType() == OperationType.Batch && + request.getResourceType() == ResourceType.Document ) { + request.clientContext.incrementRetryCount(); + shouldAccept = request.clientContext.getRetryCount() >= 2; + } else { + shouldAccept = this.availableThroughput.get() > 0; + } + + + if (shouldAccept) { if (StringUtils.isEmpty(request.requestContext.throughputControlCycleId)) { request.requestContext.throughputControlCycleId = this.cycleId; } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java index 68f2f404260fa..5cce67dfee35f 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java @@ -49,20 +49,26 @@ public void afterClass() { @Test(groups = {"simple"}, timeOut = TIMEOUT) public void createItem_withBulk() { - int totalRequest = getTotalRequest(); + int totalRequest = 1000; + ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() + .setGroupName("test-group") + .setTargetThroughputThreshold(0.001) + .setDefault(true) + .build(); + bulkAsyncContainer.enableLocalThroughputControlGroup(groupConfig); Flux cosmosItemOperationFlux = Flux.merge( Flux.range(0, totalRequest).map(i -> { String partitionKey = UUID.randomUUID().toString(); TestDoc testDoc = this.populateTestDoc(partitionKey); - return BulkOperations.getCreateItemOperation(testDoc, new PartitionKey(partitionKey)); + return BulkOperations.getUpsertItemOperation(testDoc, new PartitionKey(partitionKey)); }), Flux.range(0, totalRequest).map(i -> { String partitionKey = UUID.randomUUID().toString(); EventDoc eventDoc = new EventDoc(UUID.randomUUID().toString(), 2, 4, "type1", partitionKey); - return BulkOperations.getCreateItemOperation(eventDoc, new PartitionKey(partitionKey)); + return BulkOperations.getUpsertItemOperation(eventDoc, new PartitionKey(partitionKey)); })); BulkProcessingOptions bulkProcessingOptions = new BulkProcessingOptions<>(); From efa86e896355b27d629ef1b89c33d31c568ef2d8 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 8 Jun 2021 10:09:14 +0000 Subject: [PATCH 2/6] Deferring teh request when ClinetThrouhgputControl is enabled --- .../implementation/directconnectivity/TransportClient.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java index c077b2adc29fe..5a5d40998fc8c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java @@ -27,7 +27,9 @@ public Mono invokeResourceOperationAsync(Uri physicalAddress, RxD request.requestContext.resourcePhysicalAddress = physicalAddress.toString(); } if (this.throughputControlStore != null) { - return this.throughputControlStore.processRequest(request, this.invokeStoreAsync(physicalAddress, request)); + return this.throughputControlStore.processRequest( + request, + Mono.defer(() -> this.invokeStoreAsync(physicalAddress, request))); } return this.invokeStoreAsync(physicalAddress, request); From ea15141d19a06204c5bee7d8af587d36a26d0ecf Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 8 Jun 2021 15:45:46 +0000 Subject: [PATCH 3/6] Client throughput control: defer store invocation --- .../DiagnosticsClientContext.java | 4 - .../implementation/RxDocumentClientImpl.java | 10 -- .../RxDocumentServiceRequest.java | 2 +- .../ThroughputRequestThrottler.java | 13 +- .../com/azure/cosmos/CosmosBulkAsyncTest.java | 123 +++++++++++++----- 5 files changed, 96 insertions(+), 56 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java index 2e6baebf9b4b7..e734c3a5e1c8a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java @@ -29,10 +29,6 @@ public interface DiagnosticsClientContext { CosmosDiagnostics createDiagnostics(); - public void incrementRetryCount() ; - - public int getRetryCount(); - static final class ClientContextSerializer extends StdSerializer { private final static Logger logger = LoggerFactory.getLogger(ClientContextSerializer.class); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index a5b532d81fe88..6d65e1cc98a31 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -177,16 +177,6 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization private final AtomicBoolean throughputControlEnabled; private ThroughputControlStore throughputControlStore; - private Integer retryCount = 0; - - public void incrementRetryCount() { - this.retryCount += 1; - } - - public int getRetryCount() { - return this.retryCount; - } - public RxDocumentClientImpl(URI serviceEndpoint, String masterKeyOrResourceToken, List permissionFeed, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index f09fcafa87498..ce66909dde7f7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -30,7 +30,7 @@ */ public class RxDocumentServiceRequest implements Cloneable { - public final DiagnosticsClientContext clientContext; + private final DiagnosticsClientContext clientContext; public volatile boolean forcePartitionKeyRangeRefresh; public volatile boolean forceCollectionRoutingMapRefresh; private String resourceId; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputRequestThrottler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputRequestThrottler.java index 1f0f8ecc52f76..3a1197bc55e2d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputRequestThrottler.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputRequestThrottler.java @@ -8,7 +8,6 @@ import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.RequestRateTooLargeException; -import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.RxDocumentServiceResponse; import com.azure.cosmos.implementation.Utils; @@ -91,17 +90,7 @@ public Mono processRequest(RxDocumentServiceRequest request, Mono orig return value; })); - boolean shouldAccept; - if (request.getOperationType() == OperationType.Batch && - request.getResourceType() == ResourceType.Document ) { - request.clientContext.incrementRetryCount(); - shouldAccept = request.clientContext.getRetryCount() >= 2; - } else { - shouldAccept = this.availableThroughput.get() > 0; - } - - - if (shouldAccept) { + if (this.availableThroughput.get() > 0) { if (StringUtils.isEmpty(request.requestContext.throughputControlCycleId)) { request.requestContext.throughputControlCycleId = this.cycleId; } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java index 5cce67dfee35f..cab62bb57c029 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java @@ -3,7 +3,9 @@ package com.azure.cosmos; +import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.PartitionKeyDefinition; import io.netty.handler.codec.http.HttpResponseStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,14 +16,12 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Random; -import java.util.UUID; +import java.time.Duration; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; public class CosmosBulkAsyncTest extends BatchTestBase { @@ -38,7 +38,10 @@ public CosmosBulkAsyncTest(CosmosClientBuilder clientBuilder) { @BeforeClass(groups = {"simple"}, timeOut = SETUP_TIMEOUT) public void before_CosmosBulkAsyncTest() { assertThat(this.bulkClient).isNull(); - this.bulkClient = getClientBuilder().buildAsyncClient(); + ThrottlingRetryOptions throttlingOptions = new ThrottlingRetryOptions() + .setMaxRetryAttemptsOnThrottledRequests(1000000) + .setMaxRetryWaitTime(Duration.ofDays(1)); + this.bulkClient = getClientBuilder().throttlingRetryOptions(throttlingOptions).buildAsyncClient(); bulkAsyncContainer = getSharedMultiPartitionCosmosContainer(this.bulkClient); } @@ -48,14 +51,22 @@ public void afterClass() { } @Test(groups = {"simple"}, timeOut = TIMEOUT) - public void createItem_withBulk() { - int totalRequest = 1000; + public void createItem_withBulk() throws InterruptedException { + int totalRequest = getTotalRequest(180, 200); + + PartitionKeyDefinition pkDefinition = new PartitionKeyDefinition(); + pkDefinition.setPaths(Collections.singletonList("/mypk")); + CosmosAsyncContainer bulkAsyncContainerWithThroughputControl = createCollection( + this.bulkClient, + bulkAsyncContainer.getDatabase().getId(), + new CosmosContainerProperties(UUID.randomUUID().toString(), pkDefinition)); + ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("test-group") - .setTargetThroughputThreshold(0.001) + .setTargetThroughputThreshold(0.2) .setDefault(true) .build(); - bulkAsyncContainer.enableLocalThroughputControlGroup(groupConfig); + bulkAsyncContainerWithThroughputControl.enableLocalThroughputControlGroup(groupConfig); Flux cosmosItemOperationFlux = Flux.merge( Flux.range(0, totalRequest).map(i -> { @@ -75,27 +86,37 @@ public void createItem_withBulk() { bulkProcessingOptions.setMaxMicroBatchSize(100); bulkProcessingOptions.setMaxMicroBatchConcurrency(5); - Flux> responseFlux = bulkAsyncContainer - .processBulkOperations(cosmosItemOperationFlux, bulkProcessingOptions); + try { + Flux> responseFlux = bulkAsyncContainerWithThroughputControl + .processBulkOperations(cosmosItemOperationFlux, bulkProcessingOptions); - AtomicInteger processedDoc = new AtomicInteger(0); - responseFlux - .flatMap((CosmosBulkOperationResponse cosmosBulkOperationResponse) -> { + Thread.sleep(1000); - processedDoc.incrementAndGet(); + AtomicInteger processedDoc = new AtomicInteger(0); + responseFlux + .flatMap((CosmosBulkOperationResponse cosmosBulkOperationResponse) -> { - CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); - assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); - assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); - assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); - assertThat(cosmosBulkItemResponse.getSessionToken()).isNotNull(); - assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull(); - assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull(); + processedDoc.incrementAndGet(); - return Mono.just(cosmosBulkItemResponse); - }).blockLast(); + CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } + assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); + assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); + assertThat(cosmosBulkItemResponse.getSessionToken()).isNotNull(); + assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull(); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull(); - assertThat(processedDoc.get()).isEqualTo(totalRequest * 2); + return Mono.just(cosmosBulkItemResponse); + }).blockLast(); + + assertThat(processedDoc.get()).isEqualTo(totalRequest * 2); + } finally { + bulkAsyncContainerWithThroughputControl.delete().block(); + } } @Test(groups = {"simple"}, timeOut = TIMEOUT) @@ -133,6 +154,10 @@ public void createItemMultipleTimesWithOperationOnFly_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -156,6 +181,10 @@ public void createItemMultipleTimesWithOperationOnFly_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -205,6 +234,10 @@ public void runCreateItemMultipleTimesWithFixedOperations_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -230,6 +263,10 @@ public void runCreateItemMultipleTimesWithFixedOperations_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CONFLICT.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -329,6 +366,10 @@ public void upsertItem_withbulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -350,7 +391,7 @@ public void upsertItem_withbulk() { @Test(groups = {"simple"}, timeOut = TIMEOUT) public void deleteItem_withBulk() { - int totalRequest = getTotalRequest(); + int totalRequest = Math.min(getTotalRequest(), 20); List cosmosItemOperations = new ArrayList<>(); @@ -382,6 +423,10 @@ public void deleteItem_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.NO_CONTENT.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -430,6 +475,10 @@ public void readItem_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -483,6 +532,10 @@ public void readItemMultipleTimes_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -545,6 +598,10 @@ public void replaceItem_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -578,6 +635,10 @@ private void createItemsAndVerify(List cosmosItemOperations processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -601,10 +662,14 @@ private void createItemsAndVerify(List cosmosItemOperations assertThat(distinctIndex.size()).isEqualTo(cosmosItemOperations.size()); } - private int getTotalRequest() { - int countRequest = new Random().nextInt(100) + 200; + private int getTotalRequest(int min, int max) { + int countRequest = new Random().nextInt(max - min) + min; logger.info("Total count of request for this test case: " + countRequest); return countRequest; } + + private int getTotalRequest() { + return getTotalRequest(200, 300); + } } From f293e4af92bd3451d9b17f4a23ae666626824493 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 8 Jun 2021 15:58:42 +0000 Subject: [PATCH 4/6] Adding additional test-coverage in throughput tests --- .../ThroughputControlTests.java | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java index 11299512b8e33..679f6f3d021e7 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java @@ -11,9 +11,10 @@ import com.azure.cosmos.CosmosClientBuilder; import com.azure.cosmos.CosmosDiagnostics; import com.azure.cosmos.CosmosException; +import com.azure.cosmos.GlobalThroughputControlConfig; import com.azure.cosmos.ThroughputControlGroupConfig; import com.azure.cosmos.ThroughputControlGroupConfigBuilder; -import com.azure.cosmos.GlobalThroughputControlConfig; +import com.azure.cosmos.implementation.FailureValidator; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; @@ -25,6 +26,7 @@ import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.rx.CosmosItemResponseValidator; import com.azure.cosmos.rx.TestSuiteBase; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -184,6 +186,41 @@ public void throughputLocalControlForContainerCreateDeleteWithSameName(Operation BridgeInternal.getContextClient(client).getConnectionPolicy().getConnectionMode()); } + @Test(groups = {"emulator"}, timeOut = TIMEOUT) + public void throughputLocalControl_createItem() throws InterruptedException { + // The create document in this test usually takes around 6.29RU, pick a RU here relatively close, so to test throttled scenario + ThroughputControlGroupConfig groupConfig = + new ThroughputControlGroupConfigBuilder() + .setGroupName("group-" + UUID.randomUUID()) + .setTargetThroughput(6) + .build(); + + container.enableLocalThroughputControlGroup(groupConfig); + + CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions(); + requestOptions.setContentResponseOnWriteEnabled(true); + requestOptions.setThroughputControlGroupName(groupConfig.getGroupName()); + + CosmosItemResponse createItemResponse = container.createItem(getDocumentDefinition(), requestOptions).block(); + this.validateRequestNotThrottled( + createItemResponse.getDiagnostics().toString(), + BridgeInternal.getContextClient(client).getConnectionPolicy().getConnectionMode()); + + // second request to same group. which will get throttled + TestItem itemGetThrottled = getDocumentDefinition(createItemResponse.getItem().getMypk()); + FailureValidator failureValidator = new FailureValidator.Builder().statusCode(429).build(); + validateItemFailure(container.createItem(itemGetThrottled, requestOptions), failureValidator); + + Thread.sleep(500); + + // third request to create same item in step2 + // Make sure the request really get blocked in step2 + CosmosItemResponseValidator successValidator = new CosmosItemResponseValidator.Builder>() + .withId(itemGetThrottled.getId()) + .build(); + validateItemSuccess(container.createItem(itemGetThrottled), successValidator); + } + @BeforeClass(groups = { "emulator" }, timeOut = 4 * SETUP_TIMEOUT) public void before_ThroughputBudgetControllerTest() { client = getClientBuilder().buildAsyncClient(); From c2a11775f3c6bcf5939ff9c572874c3a022d8949 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 8 Jun 2021 16:58:59 +0000 Subject: [PATCH 5/6] Reacting to code review feedback. --- .../implementation/batch/BulkExecutor.java | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java index 62c46c9b4fa44..2d553dc524c92 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java @@ -234,17 +234,7 @@ private Mono> handleTransactionalBatchOper return ((ItemBulkOperation) itemOperation).getRetryPolicy().shouldRetry(operationResult).flatMap( result -> { if (result.shouldRetry) { - if (result.backOffTime == null || result.backOffTime.isZero()) { - groupSink.next(itemOperation); - return Mono.empty(); - } else { - return Mono - .delay(result.backOffTime) - .flatMap((dumm) -> { - groupSink.next(itemOperation); - return Mono.empty(); - }); - } + return this.enqueueForRetry(result.backOffTime, groupSink, itemOperation); } else { return Mono.just(BridgeInternal.createCosmosBulkOperationResponse( itemOperation, cosmosBulkItemResponse, this.batchContext)); @@ -290,16 +280,31 @@ private Mono> handleTransactionalBatchExec return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(itemOperation, exception, this.batchContext)); } + private Mono> enqueueForRetry( + Duration backOffTime, + FluxSink groupSink, + CosmosItemOperation itemOperation) { + + if (backOffTime == null || backOffTime.isZero()) { + groupSink.next(itemOperation); + return Mono.empty(); + } else { + return Mono + .delay(backOffTime) + .flatMap((dumm) -> { + groupSink.next(itemOperation); + return Mono.empty(); + }); + } + } + private Mono> retryOtherExceptions( CosmosItemOperation itemOperation, Exception exception, FluxSink groupSink, CosmosException cosmosException, ItemBulkOperation itemBulkOperation) { return itemBulkOperation.getRetryPolicy().shouldRetry(cosmosException).flatMap(result -> { if (result.shouldRetry) { - - groupSink.next(itemOperation); - return Mono.empty(); + return this.enqueueForRetry(result.backOffTime, groupSink, itemBulkOperation); } else { - return Mono.just(BridgeInternal.createCosmosBulkOperationResponse( itemOperation, exception, this.batchContext)); } @@ -307,7 +312,6 @@ private Mono> retryOtherExceptions( } private Mono executeBatchRequest(PartitionKeyRangeServerBatchRequest serverRequest) { - return this.docClientWrapper.executeBatchRequest( BridgeInternal.getLink(this.container), serverRequest, null, false); } From 1af579d824e228058faa0e646c862b61f2e58d30 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 8 Jun 2021 17:03:45 +0000 Subject: [PATCH 6/6] Adding back createItem_withBulk test --- .../com/azure/cosmos/CosmosBulkAsyncTest.java | 52 ++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java index cab62bb57c029..2e4525bc813ff 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java @@ -51,7 +51,7 @@ public void afterClass() { } @Test(groups = {"simple"}, timeOut = TIMEOUT) - public void createItem_withBulk() throws InterruptedException { + public void createItem_withBulkAndThroughputControl() throws InterruptedException { int totalRequest = getTotalRequest(180, 200); PartitionKeyDefinition pkDefinition = new PartitionKeyDefinition(); @@ -119,6 +119,56 @@ public void createItem_withBulk() throws InterruptedException { } } + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void createItem_withBulk() { + int totalRequest = getTotalRequest(); + + Flux cosmosItemOperationFlux = Flux.merge( + Flux.range(0, totalRequest).map(i -> { + String partitionKey = UUID.randomUUID().toString(); + TestDoc testDoc = this.populateTestDoc(partitionKey); + + return BulkOperations.getCreateItemOperation(testDoc, new PartitionKey(partitionKey)); + }), + Flux.range(0, totalRequest).map(i -> { + String partitionKey = UUID.randomUUID().toString(); + EventDoc eventDoc = new EventDoc(UUID.randomUUID().toString(), 2, 4, "type1", partitionKey); + + return BulkOperations.getCreateItemOperation(eventDoc, new PartitionKey(partitionKey)); + })); + + BulkProcessingOptions bulkProcessingOptions = new BulkProcessingOptions<>(); + bulkProcessingOptions.setMaxMicroBatchSize(100); + bulkProcessingOptions.setMaxMicroBatchConcurrency(5); + + Flux> responseFlux = bulkAsyncContainer + .processBulkOperations(cosmosItemOperationFlux, bulkProcessingOptions); + + AtomicInteger processedDoc = new AtomicInteger(0); + responseFlux + .flatMap((CosmosBulkOperationResponse cosmosBulkOperationResponse) -> { + + processedDoc.incrementAndGet(); + + CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } + + assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); + assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); + assertThat(cosmosBulkItemResponse.getSessionToken()).isNotNull(); + assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull(); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull(); + + return Mono.just(cosmosBulkItemResponse); + }).blockLast(); + + assertThat(processedDoc.get()).isEqualTo(totalRequest * 2); + } + @Test(groups = {"simple"}, timeOut = TIMEOUT) public void createItemMultipleTimesWithOperationOnFly_withBulk() { int totalRequest = getTotalRequest();