Skip to content

Commit

Permalink
Read many fix (#35513)
Browse files Browse the repository at this point in the history
* Preliminary fix for readMany.

* Refactorings.

* Added tests.

* Refactorings.

* Refactorings.

* Preliminary changes.

* Added log to CHANGELOG.md.

* Addressed review comments.

* Addressed review comments.

* Addressed review comments.
  • Loading branch information
jeet1995 authored Jun 22, 2023
1 parent e5060d6 commit ecfe8fa
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosItemIdentity;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
Expand All @@ -20,6 +22,7 @@
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.rx.TestSuiteBase;
import com.azure.cosmos.util.CosmosPagedIterable;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -41,12 +44,14 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.apache.commons.io.FileUtils.ONE_MB;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

public class CosmosItemTest extends TestSuiteBase {

Expand Down Expand Up @@ -192,7 +197,7 @@ public void readMany() throws Exception {
assertThat(feedResponse.getResults()).isNotNull();
assertThat(feedResponse.getResults().size()).isEqualTo(numDocuments);
assertThat(diagnosticsAccessor.getClientSideRequestStatistics(feedResponse.getCosmosDiagnostics())).isNotNull();
assertThat(diagnosticsAccessor.getClientSideRequestStatistics(feedResponse.getCosmosDiagnostics()).size()).isGreaterThan(1);
assertThat(diagnosticsAccessor.getClientSideRequestStatistics(feedResponse.getCosmosDiagnostics()).size()).isGreaterThanOrEqualTo(1);

for (int i = 0; i < feedResponse.getResults().size(); i++) {
InternalObjectNode fetchedResult = feedResponse.getResults().get(i);
Expand Down Expand Up @@ -354,84 +359,100 @@ public void readManyWithManyNonExistentItemIds() throws Exception {
assertThat(feedResponse.getResults().size()).isEqualTo(numDocuments);
}

@Test(groups = { "simple" }, timeOut = TIMEOUT)
public void readManyWithFaultyPointRead() throws JsonProcessingException {
int numDocuments = 25;
@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void readManyWithMultiplePartitionsAndSome404s() throws JsonProcessingException {

List<FeedRange> feedRanges = container.getFeedRanges();
CosmosDatabase readManyDatabase = null;
CosmosContainer readManyContainer = null;

// the container should have at least 2 physical partitions
assertThat(feedRanges.size()).isGreaterThanOrEqualTo(2);
int itemCount = 100;

for (int i = 0; i < numDocuments; i++) {
String partitionKeyValue = UUID.randomUUID().toString();
String documentId = UUID.randomUUID().toString();
ObjectNode document = getDocumentDefinition(documentId, partitionKeyValue);
container.createItem(document);
}
try {

// query 1 item
SqlQuerySpec sqlQuerySpec = new SqlQuerySpec();
StringBuilder stringBuilder = new StringBuilder();
readManyDatabase = client
.getDatabase(container.asyncContainer.getDatabase().getId());

stringBuilder.append("SELECT * from c");
stringBuilder.append(" OFFSET 0");
stringBuilder.append(" LIMIT 1");
String readManyContainerId = "container-with-multiple-partitions";

sqlQuerySpec.setQueryText(stringBuilder.toString());
CosmosContainerProperties containerProperties = new CosmosContainerProperties(readManyContainerId, "/mypk");
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(30_000);

// extract 1 item id and partition key val from 1st physical partition
AtomicReference<String> itemId1 = new AtomicReference<>("");
AtomicReference<String> pkValItem1 = new AtomicReference<>("");
readManyDatabase.createContainer(containerProperties, throughputProperties);

CosmosQueryRequestOptions cosmosQueryRequestOptions1 = new CosmosQueryRequestOptions();
cosmosQueryRequestOptions1.setFeedRange(feedRanges.get(0));
readManyContainer = readManyDatabase.getContainer(readManyContainerId);

container
.queryItems(sqlQuerySpec, cosmosQueryRequestOptions1, InternalObjectNode.class)
.iterableByPage()
.forEach(response -> {
List<InternalObjectNode> results = response.getResults();
for (int i = 0; i < itemCount; i++) {
String id = UUID.randomUUID().toString();
String myPk = UUID.randomUUID().toString();

assertThat(results).isNotNull();
assertThat(results).isNotEmpty();
assertThat(results.size()).isEqualTo(1);
ObjectNode objectNode = getDocumentDefinition(id, myPk);

itemId1.set(results.get(0).getId());
pkValItem1.set(results.get(0).getString("mypk"));
});
readManyContainer.createItem(objectNode);
}

// extract 1 partition key val from 2nd physical partition
// to create non-existent CosmosItemIdentity instance
AtomicReference<String> pkValItem2 = new AtomicReference<>("");
List<FeedRange> feedRanges = readManyContainer.getFeedRanges();

CosmosQueryRequestOptions cosmosQueryRequestOptions2 = new CosmosQueryRequestOptions();
cosmosQueryRequestOptions2.setFeedRange(feedRanges.get(1));
assertThat(feedRanges).isNotNull();
assertThat(feedRanges.size()).isGreaterThan(1);

container
.queryItems(sqlQuerySpec, cosmosQueryRequestOptions2, InternalObjectNode.class)
.iterableByPage()
.forEach(response -> {
List<InternalObjectNode> results = response.getResults();
int feedRangeCount = feedRanges.size();

assertThat(results).isNotNull();
assertThat(results).isNotEmpty();
assertThat(results.size()).isEqualTo(1);
// select 1 document per feed range
// increase the no. of documents with faulty ids
// see if documents fetched is (feed range count) - (faulty documents)
for (int faultyIdCount = 0; faultyIdCount <= feedRangeCount; faultyIdCount++) {
final Set<Integer> faultyIds = new HashSet<>();

pkValItem2.set(results.get(0).getString("mypk"));
});
while (faultyIds.size() != faultyIdCount) {
faultyIds.add(ThreadLocalRandom.current().nextInt(feedRangeCount));
}

CosmosItemIdentity cosmosItemIdentity = new CosmosItemIdentity(new PartitionKey(pkValItem1.get()), itemId1.get());
CosmosItemIdentity nonExistentCosmosItemIdentity = new CosmosItemIdentity(new PartitionKey(pkValItem2.get()), UUID.randomUUID().toString());
SqlQuerySpec sqlQuerySpec = new SqlQuerySpec();
sqlQuerySpec.setQueryText("SELECT * FROM c OFFSET 0 LIMIT 1");

List<CosmosItemIdentity> cosmosItemIdentities = Arrays.asList(cosmosItemIdentity, nonExistentCosmosItemIdentity);
List<ImmutablePair<String, String>> idToPkPairs = new ArrayList<>();

FeedResponse<InternalObjectNode> feedResponse = container.readMany(cosmosItemIdentities, InternalObjectNode.class);
for (int k = 0; k < feedRangeCount; k++) {
CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
cosmosQueryRequestOptions.setFeedRange(feedRanges.get(k));

assertThat(feedResponse).isNotNull();
assertThat(feedResponse.getResults()).isNotNull();
// there could be a case where 0 items were created in physical partition 1
assertThat(feedResponse.getResults().size()).isLessThanOrEqualTo(1);
int finalK = k;

readManyContainer
.queryItems(sqlQuerySpec, cosmosQueryRequestOptions, InternalObjectNode.class)
.iterableByPage()
.forEach(response -> {
InternalObjectNode queriedItem = response.getResults().get(0);

if (faultyIds.contains(finalK)) {
idToPkPairs.add(new ImmutablePair<>(queriedItem.getId(), UUID.randomUUID().toString()));
} else {
idToPkPairs.add(new ImmutablePair<>(queriedItem.getId(), queriedItem.getString("mypk")));
}
});
}

if (idToPkPairs.size() == feedRangeCount) {

List<CosmosItemIdentity> cosmosItemIdentities = idToPkPairs
.stream()
.map(pkToIdPair -> new CosmosItemIdentity(new PartitionKey(pkToIdPair.getRight()), pkToIdPair.getLeft()))
.collect(Collectors.toList());

FeedResponse<InternalObjectNode> readManyResult = readManyContainer
.readMany(cosmosItemIdentities, InternalObjectNode.class);

assertThat(readManyResult).isNotNull();
assertThat(readManyResult.getResults()).isNotNull();
assertThat(readManyResult.getResults().size()).isEqualTo(feedRangeCount - faultyIdCount);
} else {
fail("Not all physical partitions have data!");
}
}

} finally {
readManyContainer.delete();
}
}

@Test(groups = { "simple" }, timeOut = TIMEOUT)
Expand Down
2 changes: 2 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#### Breaking Changes

#### Bugs Fixed
* Fixes the `readMany` API to not drop existing documents from the response in point-read scenarios when
there are non-existent document IDs also passed through the API - See [PR 35513](https://github.com/Azure/azure-sdk-for-java/pull/35513)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair;
import com.azure.cosmos.implementation.batch.BatchResponseParser;
import com.azure.cosmos.implementation.batch.PartitionKeyRangeServerBatchRequest;
import com.azure.cosmos.implementation.batch.ServerBatchRequest;
Expand Down Expand Up @@ -2671,8 +2672,8 @@ private <T> Flux<FeedResponse<Document>> pointReadsForReadMany(
Map<PartitionKeyRange, List<CosmosItemIdentity>> singleItemPartitionRequestMap,
String resourceLink,
CosmosQueryRequestOptions queryRequestOptions,
Class<T> klass
) {
Class<T> klass) {

return Flux.fromIterable(singleItemPartitionRequestMap.values())
.flatMap(cosmosItemIdentityList -> {
if (cosmosItemIdentityList.size() == 1) {
Expand All @@ -2682,52 +2683,56 @@ private <T> Flux<FeedResponse<Document>> pointReadsForReadMany(
.getCosmosQueryRequestOptionsAccessor()
.toRequestOptions(queryRequestOptions);
requestOptions.setPartitionKey(firstIdentity.getPartitionKey());
return this.readDocument((resourceLink + firstIdentity.getId()), requestOptions);
}
return Mono.empty();
})
.flatMap(resourceResponse -> {
CosmosItemResponse<T> cosmosItemResponse =
ModelBridgeInternal.createCosmosAsyncItemResponse(resourceResponse, klass, getItemDeserializer());
FeedResponse<Document> feedResponse = ModelBridgeInternal.createFeedResponse(
Arrays.asList(InternalObjectNode.fromObject(cosmosItemResponse.getItem())),
cosmosItemResponse.getResponseHeaders());

diagnosticsAccessor.addClientSideDiagnosticsToFeed(
feedResponse.getCosmosDiagnostics(),
Collections.singleton(
BridgeInternal.getClientSideRequestStatics(cosmosItemResponse.getDiagnostics())));

return Mono.just(feedResponse);
})
.onErrorResume(throwable -> {

Throwable unwrappedThrowable = Exceptions.unwrap(throwable);
return this.readDocument((resourceLink + firstIdentity.getId()), requestOptions)
.flatMap(resourceResponse -> Mono.just(
new ImmutablePair<ResourceResponse<Document>, CosmosException>(resourceResponse, null)
))
.onErrorResume(throwable -> {
Throwable unwrappedThrowable = Exceptions.unwrap(throwable);

if (unwrappedThrowable instanceof CosmosException) {
if (unwrappedThrowable instanceof CosmosException) {

CosmosException cosmosException = (CosmosException) unwrappedThrowable;
CosmosException cosmosException = (CosmosException) unwrappedThrowable;

int statusCode = cosmosException.getStatusCode();
int subStatusCode = cosmosException.getSubStatusCode();
int statusCode = cosmosException.getStatusCode();
int subStatusCode = cosmosException.getSubStatusCode();

CosmosDiagnostics diagnostics = cosmosException.getDiagnostics();

if (statusCode == HttpConstants.StatusCodes.NOTFOUND && subStatusCode == HttpConstants.SubStatusCodes.UNKNOWN) {
FeedResponse<Document> feedResponse = ModelBridgeInternal.createFeedResponse(new ArrayList<>(), cosmosException.getResponseHeaders());

diagnosticsAccessor.addClientSideDiagnosticsToFeed(
feedResponse.getCosmosDiagnostics(),
Collections.singleton(
BridgeInternal.getClientSideRequestStatics(diagnostics)
)
);
if (statusCode == HttpConstants.StatusCodes.NOTFOUND && subStatusCode == HttpConstants.SubStatusCodes.UNKNOWN) {
return Mono.just(new ImmutablePair<ResourceResponse<Document>, CosmosException>(null, cosmosException));
}
}

return Mono.just(feedResponse);
}
return Mono.error(unwrappedThrowable);
});
}
return Mono.empty();
})
.flatMap(resourceResponseToExceptionPair -> {

ResourceResponse<Document> resourceResponse = resourceResponseToExceptionPair.getLeft();
CosmosException cosmosException = resourceResponseToExceptionPair.getRight();
FeedResponse<Document> feedResponse;

if (cosmosException != null) {
feedResponse = ModelBridgeInternal.createFeedResponse(new ArrayList<>(), cosmosException.getResponseHeaders());
diagnosticsAccessor.addClientSideDiagnosticsToFeed(
feedResponse.getCosmosDiagnostics(),
Collections.singleton(
BridgeInternal.getClientSideRequestStatics(cosmosException.getDiagnostics())));
} else {
CosmosItemResponse<T> cosmosItemResponse =
ModelBridgeInternal.createCosmosAsyncItemResponse(resourceResponse, klass, getItemDeserializer());
feedResponse = ModelBridgeInternal.createFeedResponse(
Arrays.asList(InternalObjectNode.fromObject(cosmosItemResponse.getItem())),
cosmosItemResponse.getResponseHeaders());

diagnosticsAccessor.addClientSideDiagnosticsToFeed(
feedResponse.getCosmosDiagnostics(),
Collections.singleton(
BridgeInternal.getClientSideRequestStatics(cosmosItemResponse.getDiagnostics())));
}

return Mono.error(unwrappedThrowable);
return Mono.just(feedResponse);
});
}

Expand Down

0 comments on commit ecfe8fa

Please sign in to comment.