From ecfe8fa701ce6196dfc2e18089c1a8f014f26d0f Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Thu, 22 Jun 2023 11:10:40 -0400 Subject: [PATCH] Read many fix (#35513) * Preliminary fix for readMany. * Refactorings. * Added tests. * Refactorings. * Refactorings. * Preliminary changes. * Added log to CHANGELOG.md. * Addressed review comments. * Addressed review comments. * Addressed review comments. --- .../java/com/azure/cosmos/CosmosItemTest.java | 139 ++++++++++-------- sdk/cosmos/azure-cosmos/CHANGELOG.md | 2 + .../implementation/RxDocumentClientImpl.java | 87 +++++------ 3 files changed, 128 insertions(+), 100 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java index f6b022e0d82b4..ce3d170d31d12 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java @@ -11,6 +11,8 @@ import com.azure.cosmos.implementation.InternalObjectNode; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair; +import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosItemIdentity; import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosItemResponse; @@ -20,6 +22,7 @@ import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.SqlQuerySpec; +import com.azure.cosmos.models.ThroughputProperties; import com.azure.cosmos.rx.TestSuiteBase; import com.azure.cosmos.util.CosmosPagedIterable; import com.fasterxml.jackson.core.JsonProcessingException; @@ -41,12 +44,14 @@ import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.apache.commons.io.FileUtils.ONE_MB; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; public class CosmosItemTest extends TestSuiteBase { @@ -192,7 +197,7 @@ public void readMany() throws Exception { assertThat(feedResponse.getResults()).isNotNull(); assertThat(feedResponse.getResults().size()).isEqualTo(numDocuments); assertThat(diagnosticsAccessor.getClientSideRequestStatistics(feedResponse.getCosmosDiagnostics())).isNotNull(); - assertThat(diagnosticsAccessor.getClientSideRequestStatistics(feedResponse.getCosmosDiagnostics()).size()).isGreaterThan(1); + assertThat(diagnosticsAccessor.getClientSideRequestStatistics(feedResponse.getCosmosDiagnostics()).size()).isGreaterThanOrEqualTo(1); for (int i = 0; i < feedResponse.getResults().size(); i++) { InternalObjectNode fetchedResult = feedResponse.getResults().get(i); @@ -354,84 +359,100 @@ public void readManyWithManyNonExistentItemIds() throws Exception { assertThat(feedResponse.getResults().size()).isEqualTo(numDocuments); } - @Test(groups = { "simple" }, timeOut = TIMEOUT) - public void readManyWithFaultyPointRead() throws JsonProcessingException { - int numDocuments = 25; + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void readManyWithMultiplePartitionsAndSome404s() throws JsonProcessingException { - List feedRanges = container.getFeedRanges(); + CosmosDatabase readManyDatabase = null; + CosmosContainer readManyContainer = null; - // the container should have at least 2 physical partitions - assertThat(feedRanges.size()).isGreaterThanOrEqualTo(2); + int itemCount = 100; - for (int i = 0; i < numDocuments; i++) { - String partitionKeyValue = UUID.randomUUID().toString(); - String documentId = UUID.randomUUID().toString(); - ObjectNode document = getDocumentDefinition(documentId, partitionKeyValue); - container.createItem(document); - } + try { - // query 1 item - SqlQuerySpec sqlQuerySpec = new SqlQuerySpec(); - StringBuilder stringBuilder = new StringBuilder(); + readManyDatabase = client + .getDatabase(container.asyncContainer.getDatabase().getId()); - stringBuilder.append("SELECT * from c"); - stringBuilder.append(" OFFSET 0"); - stringBuilder.append(" LIMIT 1"); + String readManyContainerId = "container-with-multiple-partitions"; - sqlQuerySpec.setQueryText(stringBuilder.toString()); + CosmosContainerProperties containerProperties = new CosmosContainerProperties(readManyContainerId, "/mypk"); + ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(30_000); - // extract 1 item id and partition key val from 1st physical partition - AtomicReference itemId1 = new AtomicReference<>(""); - AtomicReference pkValItem1 = new AtomicReference<>(""); + readManyDatabase.createContainer(containerProperties, throughputProperties); - CosmosQueryRequestOptions cosmosQueryRequestOptions1 = new CosmosQueryRequestOptions(); - cosmosQueryRequestOptions1.setFeedRange(feedRanges.get(0)); + readManyContainer = readManyDatabase.getContainer(readManyContainerId); - container - .queryItems(sqlQuerySpec, cosmosQueryRequestOptions1, InternalObjectNode.class) - .iterableByPage() - .forEach(response -> { - List results = response.getResults(); + for (int i = 0; i < itemCount; i++) { + String id = UUID.randomUUID().toString(); + String myPk = UUID.randomUUID().toString(); - assertThat(results).isNotNull(); - assertThat(results).isNotEmpty(); - assertThat(results.size()).isEqualTo(1); + ObjectNode objectNode = getDocumentDefinition(id, myPk); - itemId1.set(results.get(0).getId()); - pkValItem1.set(results.get(0).getString("mypk")); - }); + readManyContainer.createItem(objectNode); + } - // extract 1 partition key val from 2nd physical partition - // to create non-existent CosmosItemIdentity instance - AtomicReference pkValItem2 = new AtomicReference<>(""); + List feedRanges = readManyContainer.getFeedRanges(); - CosmosQueryRequestOptions cosmosQueryRequestOptions2 = new CosmosQueryRequestOptions(); - cosmosQueryRequestOptions2.setFeedRange(feedRanges.get(1)); + assertThat(feedRanges).isNotNull(); + assertThat(feedRanges.size()).isGreaterThan(1); - container - .queryItems(sqlQuerySpec, cosmosQueryRequestOptions2, InternalObjectNode.class) - .iterableByPage() - .forEach(response -> { - List results = response.getResults(); + int feedRangeCount = feedRanges.size(); - assertThat(results).isNotNull(); - assertThat(results).isNotEmpty(); - assertThat(results.size()).isEqualTo(1); + // select 1 document per feed range + // increase the no. of documents with faulty ids + // see if documents fetched is (feed range count) - (faulty documents) + for (int faultyIdCount = 0; faultyIdCount <= feedRangeCount; faultyIdCount++) { + final Set faultyIds = new HashSet<>(); - pkValItem2.set(results.get(0).getString("mypk")); - }); + while (faultyIds.size() != faultyIdCount) { + faultyIds.add(ThreadLocalRandom.current().nextInt(feedRangeCount)); + } - CosmosItemIdentity cosmosItemIdentity = new CosmosItemIdentity(new PartitionKey(pkValItem1.get()), itemId1.get()); - CosmosItemIdentity nonExistentCosmosItemIdentity = new CosmosItemIdentity(new PartitionKey(pkValItem2.get()), UUID.randomUUID().toString()); + SqlQuerySpec sqlQuerySpec = new SqlQuerySpec(); + sqlQuerySpec.setQueryText("SELECT * FROM c OFFSET 0 LIMIT 1"); - List cosmosItemIdentities = Arrays.asList(cosmosItemIdentity, nonExistentCosmosItemIdentity); + List> idToPkPairs = new ArrayList<>(); - FeedResponse feedResponse = container.readMany(cosmosItemIdentities, InternalObjectNode.class); + for (int k = 0; k < feedRangeCount; k++) { + CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions(); + cosmosQueryRequestOptions.setFeedRange(feedRanges.get(k)); - assertThat(feedResponse).isNotNull(); - assertThat(feedResponse.getResults()).isNotNull(); - // there could be a case where 0 items were created in physical partition 1 - assertThat(feedResponse.getResults().size()).isLessThanOrEqualTo(1); + int finalK = k; + + readManyContainer + .queryItems(sqlQuerySpec, cosmosQueryRequestOptions, InternalObjectNode.class) + .iterableByPage() + .forEach(response -> { + InternalObjectNode queriedItem = response.getResults().get(0); + + if (faultyIds.contains(finalK)) { + idToPkPairs.add(new ImmutablePair<>(queriedItem.getId(), UUID.randomUUID().toString())); + } else { + idToPkPairs.add(new ImmutablePair<>(queriedItem.getId(), queriedItem.getString("mypk"))); + } + }); + } + + if (idToPkPairs.size() == feedRangeCount) { + + List cosmosItemIdentities = idToPkPairs + .stream() + .map(pkToIdPair -> new CosmosItemIdentity(new PartitionKey(pkToIdPair.getRight()), pkToIdPair.getLeft())) + .collect(Collectors.toList()); + + FeedResponse readManyResult = readManyContainer + .readMany(cosmosItemIdentities, InternalObjectNode.class); + + assertThat(readManyResult).isNotNull(); + assertThat(readManyResult.getResults()).isNotNull(); + assertThat(readManyResult.getResults().size()).isEqualTo(feedRangeCount - faultyIdCount); + } else { + fail("Not all physical partitions have data!"); + } + } + + } finally { + readManyContainer.delete(); + } } @Test(groups = { "simple" }, timeOut = TIMEOUT) diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index e7b87c7623783..7849ef842984f 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -7,6 +7,8 @@ #### Breaking Changes #### Bugs Fixed +* Fixes the `readMany` API to not drop existing documents from the response in point-read scenarios when +there are non-existent document IDs also passed through the API - See [PR 35513](https://github.com/Azure/azure-sdk-for-java/pull/35513) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 2938cf4753b04..3581fbba4a2ec 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -15,6 +15,7 @@ import com.azure.cosmos.DirectConnectionConfig; import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair; import com.azure.cosmos.implementation.batch.BatchResponseParser; import com.azure.cosmos.implementation.batch.PartitionKeyRangeServerBatchRequest; import com.azure.cosmos.implementation.batch.ServerBatchRequest; @@ -2671,8 +2672,8 @@ private Flux> pointReadsForReadMany( Map> singleItemPartitionRequestMap, String resourceLink, CosmosQueryRequestOptions queryRequestOptions, - Class klass - ) { + Class klass) { + return Flux.fromIterable(singleItemPartitionRequestMap.values()) .flatMap(cosmosItemIdentityList -> { if (cosmosItemIdentityList.size() == 1) { @@ -2682,52 +2683,56 @@ private Flux> pointReadsForReadMany( .getCosmosQueryRequestOptionsAccessor() .toRequestOptions(queryRequestOptions); requestOptions.setPartitionKey(firstIdentity.getPartitionKey()); - return this.readDocument((resourceLink + firstIdentity.getId()), requestOptions); - } - return Mono.empty(); - }) - .flatMap(resourceResponse -> { - CosmosItemResponse cosmosItemResponse = - ModelBridgeInternal.createCosmosAsyncItemResponse(resourceResponse, klass, getItemDeserializer()); - FeedResponse feedResponse = ModelBridgeInternal.createFeedResponse( - Arrays.asList(InternalObjectNode.fromObject(cosmosItemResponse.getItem())), - cosmosItemResponse.getResponseHeaders()); - - diagnosticsAccessor.addClientSideDiagnosticsToFeed( - feedResponse.getCosmosDiagnostics(), - Collections.singleton( - BridgeInternal.getClientSideRequestStatics(cosmosItemResponse.getDiagnostics()))); - - return Mono.just(feedResponse); - }) - .onErrorResume(throwable -> { - - Throwable unwrappedThrowable = Exceptions.unwrap(throwable); + return this.readDocument((resourceLink + firstIdentity.getId()), requestOptions) + .flatMap(resourceResponse -> Mono.just( + new ImmutablePair, CosmosException>(resourceResponse, null) + )) + .onErrorResume(throwable -> { + Throwable unwrappedThrowable = Exceptions.unwrap(throwable); - if (unwrappedThrowable instanceof CosmosException) { + if (unwrappedThrowable instanceof CosmosException) { - CosmosException cosmosException = (CosmosException) unwrappedThrowable; + CosmosException cosmosException = (CosmosException) unwrappedThrowable; - int statusCode = cosmosException.getStatusCode(); - int subStatusCode = cosmosException.getSubStatusCode(); + int statusCode = cosmosException.getStatusCode(); + int subStatusCode = cosmosException.getSubStatusCode(); - CosmosDiagnostics diagnostics = cosmosException.getDiagnostics(); - - if (statusCode == HttpConstants.StatusCodes.NOTFOUND && subStatusCode == HttpConstants.SubStatusCodes.UNKNOWN) { - FeedResponse feedResponse = ModelBridgeInternal.createFeedResponse(new ArrayList<>(), cosmosException.getResponseHeaders()); - - diagnosticsAccessor.addClientSideDiagnosticsToFeed( - feedResponse.getCosmosDiagnostics(), - Collections.singleton( - BridgeInternal.getClientSideRequestStatics(diagnostics) - ) - ); + if (statusCode == HttpConstants.StatusCodes.NOTFOUND && subStatusCode == HttpConstants.SubStatusCodes.UNKNOWN) { + return Mono.just(new ImmutablePair, CosmosException>(null, cosmosException)); + } + } - return Mono.just(feedResponse); - } + return Mono.error(unwrappedThrowable); + }); + } + return Mono.empty(); + }) + .flatMap(resourceResponseToExceptionPair -> { + + ResourceResponse resourceResponse = resourceResponseToExceptionPair.getLeft(); + CosmosException cosmosException = resourceResponseToExceptionPair.getRight(); + FeedResponse feedResponse; + + if (cosmosException != null) { + feedResponse = ModelBridgeInternal.createFeedResponse(new ArrayList<>(), cosmosException.getResponseHeaders()); + diagnosticsAccessor.addClientSideDiagnosticsToFeed( + feedResponse.getCosmosDiagnostics(), + Collections.singleton( + BridgeInternal.getClientSideRequestStatics(cosmosException.getDiagnostics()))); + } else { + CosmosItemResponse cosmosItemResponse = + ModelBridgeInternal.createCosmosAsyncItemResponse(resourceResponse, klass, getItemDeserializer()); + feedResponse = ModelBridgeInternal.createFeedResponse( + Arrays.asList(InternalObjectNode.fromObject(cosmosItemResponse.getItem())), + cosmosItemResponse.getResponseHeaders()); + + diagnosticsAccessor.addClientSideDiagnosticsToFeed( + feedResponse.getCosmosDiagnostics(), + Collections.singleton( + BridgeInternal.getClientSideRequestStatics(cosmosItemResponse.getDiagnostics()))); } - return Mono.error(unwrappedThrowable); + return Mono.just(feedResponse); }); }