diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java index 4b2545e007e0f..2d553dc524c92 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java @@ -234,8 +234,7 @@ private Mono> handleTransactionalBatchOper return ((ItemBulkOperation) itemOperation).getRetryPolicy().shouldRetry(operationResult).flatMap( result -> { if (result.shouldRetry) { - groupSink.next(itemOperation); - return Mono.empty(); + return this.enqueueForRetry(result.backOffTime, groupSink, itemOperation); } else { return Mono.just(BridgeInternal.createCosmosBulkOperationResponse( itemOperation, cosmosBulkItemResponse, this.batchContext)); @@ -281,16 +280,31 @@ private Mono> handleTransactionalBatchExec return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(itemOperation, exception, this.batchContext)); } + private Mono> enqueueForRetry( + Duration backOffTime, + FluxSink groupSink, + CosmosItemOperation itemOperation) { + + if (backOffTime == null || backOffTime.isZero()) { + groupSink.next(itemOperation); + return Mono.empty(); + } else { + return Mono + .delay(backOffTime) + .flatMap((dumm) -> { + groupSink.next(itemOperation); + return Mono.empty(); + }); + } + } + private Mono> retryOtherExceptions( CosmosItemOperation itemOperation, Exception exception, FluxSink groupSink, CosmosException cosmosException, ItemBulkOperation itemBulkOperation) { return itemBulkOperation.getRetryPolicy().shouldRetry(cosmosException).flatMap(result -> { if (result.shouldRetry) { - - groupSink.next(itemOperation); - return Mono.empty(); + return this.enqueueForRetry(result.backOffTime, groupSink, itemBulkOperation); } else { - return Mono.just(BridgeInternal.createCosmosBulkOperationResponse( itemOperation, exception, this.batchContext)); } @@ -298,7 +312,6 @@ private Mono> retryOtherExceptions( } private Mono executeBatchRequest(PartitionKeyRangeServerBatchRequest serverRequest) { - return this.docClientWrapper.executeBatchRequest( BridgeInternal.getLink(this.container), serverRequest, null, false); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java index c077b2adc29fe..5a5d40998fc8c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java @@ -27,7 +27,9 @@ public Mono invokeResourceOperationAsync(Uri physicalAddress, RxD request.requestContext.resourcePhysicalAddress = physicalAddress.toString(); } if (this.throughputControlStore != null) { - return this.throughputControlStore.processRequest(request, this.invokeStoreAsync(physicalAddress, request)); + return this.throughputControlStore.processRequest( + request, + Mono.defer(() -> this.invokeStoreAsync(physicalAddress, request))); } return this.invokeStoreAsync(physicalAddress, request); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java index 68f2f404260fa..2e4525bc813ff 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java @@ -3,7 +3,9 @@ package com.azure.cosmos; +import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.PartitionKeyDefinition; import io.netty.handler.codec.http.HttpResponseStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,14 +16,12 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Random; -import java.util.UUID; +import java.time.Duration; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; public class CosmosBulkAsyncTest extends BatchTestBase { @@ -38,7 +38,10 @@ public CosmosBulkAsyncTest(CosmosClientBuilder clientBuilder) { @BeforeClass(groups = {"simple"}, timeOut = SETUP_TIMEOUT) public void before_CosmosBulkAsyncTest() { assertThat(this.bulkClient).isNull(); - this.bulkClient = getClientBuilder().buildAsyncClient(); + ThrottlingRetryOptions throttlingOptions = new ThrottlingRetryOptions() + .setMaxRetryAttemptsOnThrottledRequests(1000000) + .setMaxRetryWaitTime(Duration.ofDays(1)); + this.bulkClient = getClientBuilder().throttlingRetryOptions(throttlingOptions).buildAsyncClient(); bulkAsyncContainer = getSharedMultiPartitionCosmosContainer(this.bulkClient); } @@ -47,6 +50,75 @@ public void afterClass() { safeCloseAsync(this.bulkClient); } + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void createItem_withBulkAndThroughputControl() throws InterruptedException { + int totalRequest = getTotalRequest(180, 200); + + PartitionKeyDefinition pkDefinition = new PartitionKeyDefinition(); + pkDefinition.setPaths(Collections.singletonList("/mypk")); + CosmosAsyncContainer bulkAsyncContainerWithThroughputControl = createCollection( + this.bulkClient, + bulkAsyncContainer.getDatabase().getId(), + new CosmosContainerProperties(UUID.randomUUID().toString(), pkDefinition)); + + ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() + .setGroupName("test-group") + .setTargetThroughputThreshold(0.2) + .setDefault(true) + .build(); + bulkAsyncContainerWithThroughputControl.enableLocalThroughputControlGroup(groupConfig); + + Flux cosmosItemOperationFlux = Flux.merge( + Flux.range(0, totalRequest).map(i -> { + String partitionKey = UUID.randomUUID().toString(); + TestDoc testDoc = this.populateTestDoc(partitionKey); + + return BulkOperations.getUpsertItemOperation(testDoc, new PartitionKey(partitionKey)); + }), + Flux.range(0, totalRequest).map(i -> { + String partitionKey = UUID.randomUUID().toString(); + EventDoc eventDoc = new EventDoc(UUID.randomUUID().toString(), 2, 4, "type1", partitionKey); + + return BulkOperations.getUpsertItemOperation(eventDoc, new PartitionKey(partitionKey)); + })); + + BulkProcessingOptions bulkProcessingOptions = new BulkProcessingOptions<>(); + bulkProcessingOptions.setMaxMicroBatchSize(100); + bulkProcessingOptions.setMaxMicroBatchConcurrency(5); + + try { + Flux> responseFlux = bulkAsyncContainerWithThroughputControl + .processBulkOperations(cosmosItemOperationFlux, bulkProcessingOptions); + + Thread.sleep(1000); + + AtomicInteger processedDoc = new AtomicInteger(0); + responseFlux + .flatMap((CosmosBulkOperationResponse cosmosBulkOperationResponse) -> { + + processedDoc.incrementAndGet(); + + CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } + assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); + assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); + assertThat(cosmosBulkItemResponse.getSessionToken()).isNotNull(); + assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull(); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull(); + + return Mono.just(cosmosBulkItemResponse); + }).blockLast(); + + assertThat(processedDoc.get()).isEqualTo(totalRequest * 2); + } finally { + bulkAsyncContainerWithThroughputControl.delete().block(); + } + } + @Test(groups = {"simple"}, timeOut = TIMEOUT) public void createItem_withBulk() { int totalRequest = getTotalRequest(); @@ -79,6 +151,11 @@ public void createItem_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } + assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -127,6 +204,10 @@ public void createItemMultipleTimesWithOperationOnFly_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -150,6 +231,10 @@ public void createItemMultipleTimesWithOperationOnFly_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -199,6 +284,10 @@ public void runCreateItemMultipleTimesWithFixedOperations_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -224,6 +313,10 @@ public void runCreateItemMultipleTimesWithFixedOperations_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CONFLICT.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -323,6 +416,10 @@ public void upsertItem_withbulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -344,7 +441,7 @@ public void upsertItem_withbulk() { @Test(groups = {"simple"}, timeOut = TIMEOUT) public void deleteItem_withBulk() { - int totalRequest = getTotalRequest(); + int totalRequest = Math.min(getTotalRequest(), 20); List cosmosItemOperations = new ArrayList<>(); @@ -376,6 +473,10 @@ public void deleteItem_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.NO_CONTENT.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -424,6 +525,10 @@ public void readItem_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -477,6 +582,10 @@ public void readItemMultipleTimes_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -539,6 +648,10 @@ public void replaceItem_withBulk() { processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -572,6 +685,10 @@ private void createItemsAndVerify(List cosmosItemOperations processedDoc.incrementAndGet(); CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); @@ -595,10 +712,14 @@ private void createItemsAndVerify(List cosmosItemOperations assertThat(distinctIndex.size()).isEqualTo(cosmosItemOperations.size()); } - private int getTotalRequest() { - int countRequest = new Random().nextInt(100) + 200; + private int getTotalRequest(int min, int max) { + int countRequest = new Random().nextInt(max - min) + min; logger.info("Total count of request for this test case: " + countRequest); return countRequest; } + + private int getTotalRequest() { + return getTotalRequest(200, 300); + } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java index 11299512b8e33..679f6f3d021e7 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java @@ -11,9 +11,10 @@ import com.azure.cosmos.CosmosClientBuilder; import com.azure.cosmos.CosmosDiagnostics; import com.azure.cosmos.CosmosException; +import com.azure.cosmos.GlobalThroughputControlConfig; import com.azure.cosmos.ThroughputControlGroupConfig; import com.azure.cosmos.ThroughputControlGroupConfigBuilder; -import com.azure.cosmos.GlobalThroughputControlConfig; +import com.azure.cosmos.implementation.FailureValidator; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; @@ -25,6 +26,7 @@ import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.rx.CosmosItemResponseValidator; import com.azure.cosmos.rx.TestSuiteBase; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -184,6 +186,41 @@ public void throughputLocalControlForContainerCreateDeleteWithSameName(Operation BridgeInternal.getContextClient(client).getConnectionPolicy().getConnectionMode()); } + @Test(groups = {"emulator"}, timeOut = TIMEOUT) + public void throughputLocalControl_createItem() throws InterruptedException { + // The create document in this test usually takes around 6.29RU, pick a RU here relatively close, so to test throttled scenario + ThroughputControlGroupConfig groupConfig = + new ThroughputControlGroupConfigBuilder() + .setGroupName("group-" + UUID.randomUUID()) + .setTargetThroughput(6) + .build(); + + container.enableLocalThroughputControlGroup(groupConfig); + + CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions(); + requestOptions.setContentResponseOnWriteEnabled(true); + requestOptions.setThroughputControlGroupName(groupConfig.getGroupName()); + + CosmosItemResponse createItemResponse = container.createItem(getDocumentDefinition(), requestOptions).block(); + this.validateRequestNotThrottled( + createItemResponse.getDiagnostics().toString(), + BridgeInternal.getContextClient(client).getConnectionPolicy().getConnectionMode()); + + // second request to same group. which will get throttled + TestItem itemGetThrottled = getDocumentDefinition(createItemResponse.getItem().getMypk()); + FailureValidator failureValidator = new FailureValidator.Builder().statusCode(429).build(); + validateItemFailure(container.createItem(itemGetThrottled, requestOptions), failureValidator); + + Thread.sleep(500); + + // third request to create same item in step2 + // Make sure the request really get blocked in step2 + CosmosItemResponseValidator successValidator = new CosmosItemResponseValidator.Builder>() + .withId(itemGetThrottled.getId()) + .build(); + validateItemSuccess(container.createItem(itemGetThrottled), successValidator); + } + @BeforeClass(groups = { "emulator" }, timeOut = 4 * SETUP_TIMEOUT) public void before_ThroughputBudgetControllerTest() { client = getClientBuilder().buildAsyncClient();