Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read many fix #35513

Merged
merged 11 commits into from
Jun 22, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.TestConfigurations;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosItemIdentity;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
Expand All @@ -20,6 +23,7 @@
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.rx.TestSuiteBase;
import com.azure.cosmos.util.CosmosPagedIterable;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -39,6 +43,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -47,6 +52,7 @@

import static org.apache.commons.io.FileUtils.ONE_MB;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

public class CosmosItemTest extends TestSuiteBase {

Expand All @@ -55,6 +61,7 @@ public class CosmosItemTest extends TestSuiteBase {
ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor();

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Random random = new Random();
jeet1995 marked this conversation as resolved.
Show resolved Hide resolved
private CosmosClient client;
private CosmosContainer container;

Expand Down Expand Up @@ -354,84 +361,100 @@ public void readManyWithManyNonExistentItemIds() throws Exception {
assertThat(feedResponse.getResults().size()).isEqualTo(numDocuments);
}

@Test(groups = { "simple" }, timeOut = TIMEOUT)
public void readManyWithFaultyPointRead() throws JsonProcessingException {
int numDocuments = 25;
@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void readManyWithMultiplePartitionsAndSome404s() throws JsonProcessingException {

List<FeedRange> feedRanges = container.getFeedRanges();
CosmosDatabase readManyDatabase = null;
CosmosContainer readManyContainer = null;

// the container should have at least 2 physical partitions
assertThat(feedRanges.size()).isGreaterThanOrEqualTo(2);
int itemCount = 100;

for (int i = 0; i < numDocuments; i++) {
String partitionKeyValue = UUID.randomUUID().toString();
String documentId = UUID.randomUUID().toString();
ObjectNode document = getDocumentDefinition(documentId, partitionKeyValue);
container.createItem(document);
}
try {

// query 1 item
SqlQuerySpec sqlQuerySpec = new SqlQuerySpec();
StringBuilder stringBuilder = new StringBuilder();
readManyDatabase = client
.getDatabase(container.asyncContainer.getDatabase().getId());

stringBuilder.append("SELECT * from c");
stringBuilder.append(" OFFSET 0");
stringBuilder.append(" LIMIT 1");
String readManyContainerId = "container-with-multiple-partitions";

sqlQuerySpec.setQueryText(stringBuilder.toString());
CosmosContainerProperties containerProperties = new CosmosContainerProperties(readManyContainerId, "/mypk");
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(30_000);

// extract 1 item id and partition key val from 1st physical partition
AtomicReference<String> itemId1 = new AtomicReference<>("");
AtomicReference<String> pkValItem1 = new AtomicReference<>("");
readManyDatabase.createContainer(containerProperties, throughputProperties);

CosmosQueryRequestOptions cosmosQueryRequestOptions1 = new CosmosQueryRequestOptions();
cosmosQueryRequestOptions1.setFeedRange(feedRanges.get(0));
readManyContainer = readManyDatabase.getContainer(readManyContainerId);

container
.queryItems(sqlQuerySpec, cosmosQueryRequestOptions1, InternalObjectNode.class)
.iterableByPage()
.forEach(response -> {
List<InternalObjectNode> results = response.getResults();
for (int i = 0; i < itemCount; i++) {
String id = UUID.randomUUID().toString();
String myPk = UUID.randomUUID().toString();

assertThat(results).isNotNull();
assertThat(results).isNotEmpty();
assertThat(results.size()).isEqualTo(1);
ObjectNode objectNode = getDocumentDefinition(id, myPk);

itemId1.set(results.get(0).getId());
pkValItem1.set(results.get(0).getString("mypk"));
});
readManyContainer.createItem(objectNode);
}

// extract 1 partition key val from 2nd physical partition
// to create non-existent CosmosItemIdentity instance
AtomicReference<String> pkValItem2 = new AtomicReference<>("");
List<FeedRange> feedRanges = readManyContainer.getFeedRanges();

CosmosQueryRequestOptions cosmosQueryRequestOptions2 = new CosmosQueryRequestOptions();
cosmosQueryRequestOptions2.setFeedRange(feedRanges.get(1));
assertThat(feedRanges).isNotNull();
assertThat(feedRanges.size()).isGreaterThan(1);

container
.queryItems(sqlQuerySpec, cosmosQueryRequestOptions2, InternalObjectNode.class)
.iterableByPage()
.forEach(response -> {
List<InternalObjectNode> results = response.getResults();
int feedRangeCount = feedRanges.size();

assertThat(results).isNotNull();
assertThat(results).isNotEmpty();
assertThat(results.size()).isEqualTo(1);
// select 1 document per feed range
// increase the no. of documents with faulty ids
// see if documents fetched is (feed range count) - (faulty documents)
for (int faultyIdCount = 0; faultyIdCount <= feedRangeCount; faultyIdCount++) {
final Set<Integer> faultyIds = new HashSet<>();

pkValItem2.set(results.get(0).getString("mypk"));
});
while (faultyIds.size() != faultyIdCount) {
faultyIds.add(random.nextInt(feedRangeCount));
}

CosmosItemIdentity cosmosItemIdentity = new CosmosItemIdentity(new PartitionKey(pkValItem1.get()), itemId1.get());
CosmosItemIdentity nonExistentCosmosItemIdentity = new CosmosItemIdentity(new PartitionKey(pkValItem2.get()), UUID.randomUUID().toString());
SqlQuerySpec sqlQuerySpec = new SqlQuerySpec();
sqlQuerySpec.setQueryText("SELECT * FROM c OFFSET 0 LIMIT 1");

List<CosmosItemIdentity> cosmosItemIdentities = Arrays.asList(cosmosItemIdentity, nonExistentCosmosItemIdentity);
List<ImmutablePair<String, String>> idToPkPairs = new ArrayList<>();

FeedResponse<InternalObjectNode> feedResponse = container.readMany(cosmosItemIdentities, InternalObjectNode.class);
for (int k = 0; k < feedRangeCount; k++) {
CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
cosmosQueryRequestOptions.setFeedRange(feedRanges.get(k));

assertThat(feedResponse).isNotNull();
assertThat(feedResponse.getResults()).isNotNull();
// there could be a case where 0 items were created in physical partition 1
assertThat(feedResponse.getResults().size()).isLessThanOrEqualTo(1);
int finalK = k;

readManyContainer
.queryItems(sqlQuerySpec, cosmosQueryRequestOptions, InternalObjectNode.class)
.iterableByPage()
.forEach(response -> {
InternalObjectNode queriedItem = response.getResults().get(0);

if (faultyIds.contains(finalK)) {
idToPkPairs.add(new ImmutablePair<>(queriedItem.getId(), UUID.randomUUID().toString()));
} else {
idToPkPairs.add(new ImmutablePair<>(queriedItem.getId(), queriedItem.getString("mypk")));
}
});
}

if (idToPkPairs.size() == feedRangeCount) {

List<CosmosItemIdentity> cosmosItemIdentities = idToPkPairs
.stream()
.map(pkToIdPair -> new CosmosItemIdentity(new PartitionKey(pkToIdPair.getRight()), pkToIdPair.getLeft()))
.collect(Collectors.toList());

FeedResponse<InternalObjectNode> readManyResult = readManyContainer
.readMany(cosmosItemIdentities, InternalObjectNode.class);

assertThat(readManyResult).isNotNull();
assertThat(readManyResult.getResults()).isNotNull();
assertThat(readManyResult.getResults().size()).isEqualTo(feedRangeCount - faultyIdCount);
} else {
fail("Not all physical partitions have data!");
}
}

} finally {
readManyContainer.delete();
}
}

@Test(groups = { "simple" }, timeOut = TIMEOUT)
Expand Down
2 changes: 2 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#### Breaking Changes

#### Bugs Fixed
* Fixes the `readMany` API to not drop existing documents from the response in point-read scenarios when
there are non-existent document IDs also passed through the API - See [PR 35513](https://github.com/Azure/azure-sdk-for-java/pull/35513)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair;
import com.azure.cosmos.implementation.batch.BatchResponseParser;
import com.azure.cosmos.implementation.batch.PartitionKeyRangeServerBatchRequest;
import com.azure.cosmos.implementation.batch.ServerBatchRequest;
Expand Down Expand Up @@ -2671,8 +2672,8 @@ private <T> Flux<FeedResponse<Document>> pointReadsForReadMany(
Map<PartitionKeyRange, List<CosmosItemIdentity>> singleItemPartitionRequestMap,
String resourceLink,
CosmosQueryRequestOptions queryRequestOptions,
Class<T> klass
) {
Class<T> klass) {

return Flux.fromIterable(singleItemPartitionRequestMap.values())
.flatMap(cosmosItemIdentityList -> {
if (cosmosItemIdentityList.size() == 1) {
Expand All @@ -2682,52 +2683,56 @@ private <T> Flux<FeedResponse<Document>> pointReadsForReadMany(
.getCosmosQueryRequestOptionsAccessor()
.toRequestOptions(queryRequestOptions);
requestOptions.setPartitionKey(firstIdentity.getPartitionKey());
return this.readDocument((resourceLink + firstIdentity.getId()), requestOptions);
}
return Mono.empty();
})
.flatMap(resourceResponse -> {
CosmosItemResponse<T> cosmosItemResponse =
ModelBridgeInternal.createCosmosAsyncItemResponse(resourceResponse, klass, getItemDeserializer());
FeedResponse<Document> feedResponse = ModelBridgeInternal.createFeedResponse(
Arrays.asList(InternalObjectNode.fromObject(cosmosItemResponse.getItem())),
cosmosItemResponse.getResponseHeaders());

diagnosticsAccessor.addClientSideDiagnosticsToFeed(
feedResponse.getCosmosDiagnostics(),
Collections.singleton(
BridgeInternal.getClientSideRequestStatics(cosmosItemResponse.getDiagnostics())));

return Mono.just(feedResponse);
})
.onErrorResume(throwable -> {

Throwable unwrappedThrowable = Exceptions.unwrap(throwable);
return this.readDocument((resourceLink + firstIdentity.getId()), requestOptions)
.flatMap(resourceResponse -> Mono.just(
new ImmutablePair<ResourceResponse<Document>, CosmosException>(resourceResponse, null)
))
.onErrorResume(throwable -> {
Throwable unwrappedThrowable = Exceptions.unwrap(throwable);

if (unwrappedThrowable instanceof CosmosException) {
if (unwrappedThrowable instanceof CosmosException) {

CosmosException cosmosException = (CosmosException) unwrappedThrowable;
CosmosException cosmosException = (CosmosException) unwrappedThrowable;

int statusCode = cosmosException.getStatusCode();
int subStatusCode = cosmosException.getSubStatusCode();
int statusCode = cosmosException.getStatusCode();
int subStatusCode = cosmosException.getSubStatusCode();

CosmosDiagnostics diagnostics = cosmosException.getDiagnostics();

if (statusCode == HttpConstants.StatusCodes.NOTFOUND && subStatusCode == HttpConstants.SubStatusCodes.UNKNOWN) {
FeedResponse<Document> feedResponse = ModelBridgeInternal.createFeedResponse(new ArrayList<>(), cosmosException.getResponseHeaders());

diagnosticsAccessor.addClientSideDiagnosticsToFeed(
feedResponse.getCosmosDiagnostics(),
Collections.singleton(
BridgeInternal.getClientSideRequestStatics(diagnostics)
)
);
if (statusCode == HttpConstants.StatusCodes.NOTFOUND && subStatusCode == HttpConstants.SubStatusCodes.UNKNOWN) {
return Mono.just(new ImmutablePair<ResourceResponse<Document>, CosmosException>(null, cosmosException));
}
}

return Mono.just(feedResponse);
}
return Mono.error(unwrappedThrowable);
});
}
return Mono.empty();
})
.flatMap(resourceResponseToExceptionPair -> {

ResourceResponse<Document> resourceResponse = resourceResponseToExceptionPair.getLeft();
CosmosException cosmosException = resourceResponseToExceptionPair.getRight();
FeedResponse<Document> feedResponse;

if (cosmosException != null) {
feedResponse = ModelBridgeInternal.createFeedResponse(new ArrayList<>(), cosmosException.getResponseHeaders());
diagnosticsAccessor.addClientSideDiagnosticsToFeed(
feedResponse.getCosmosDiagnostics(),
Collections.singleton(
BridgeInternal.getClientSideRequestStatics(cosmosException.getDiagnostics())));
} else {
CosmosItemResponse<T> cosmosItemResponse =
ModelBridgeInternal.createCosmosAsyncItemResponse(resourceResponse, klass, getItemDeserializer());
feedResponse = ModelBridgeInternal.createFeedResponse(
Arrays.asList(InternalObjectNode.fromObject(cosmosItemResponse.getItem())),
cosmosItemResponse.getResponseHeaders());

diagnosticsAccessor.addClientSideDiagnosticsToFeed(
feedResponse.getCosmosDiagnostics(),
Collections.singleton(
BridgeInternal.getClientSideRequestStatics(cosmosItemResponse.getDiagnostics())));
}

return Mono.error(unwrappedThrowable);
return Mono.just(feedResponse);
});
}

Expand Down