Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache refresh for container recreate with same id #12747

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.
package com.azure.cosmos.implementation;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
Expand Down Expand Up @@ -38,6 +39,17 @@ static public <T> Mono<T> executeRetry(Callable<Mono<T>> callbackMethod,
}).retryWhen(RetryUtils.toRetryWhenFunc(retryPolicy));
}

static public <T> Flux<T> fluxExecuteRetry(Callable<Flux<T>> callbackMethod, IRetryPolicy retryPolicy) {

return Flux.defer(() -> {
try {
return callbackMethod.call();
} catch (Exception e) {
return Flux.error(e);
}
}).retryWhen(RetryUtils.toRetryWhenFunc(retryPolicy));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WOW such a compact code. Equivalent .NET one is reasonably verbose.

}

static public <T> Mono<T> executeAsync(
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> callbackMethod, IRetryPolicy retryPolicy,
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> inBackoffAlternateCallbackMethod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public InvalidPartitionExceptionRetryPolicy(RxCollectionCache collectionCache,
@Override
public void onBeforeSendRequest(RxDocumentServiceRequest request) {
this.request = request;
this.nextPolicy.onBeforeSendRequest(request);
if (this.nextPolicy != null) {
this.nextPolicy.onBeforeSendRequest(request);
}
}

@Override
Expand All @@ -56,7 +58,7 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
// TODO: this is blocking. is that fine?
if(this.cosmosQueryRequestOptions != null) {
this.clientCollectionCache.refresh(
BridgeInternal.getMetaDataDiagnosticContext(this.request.requestContext.cosmosDiagnostics),
this.request != null ? BridgeInternal.getMetaDataDiagnosticContext(this.request.requestContext.cosmosDiagnostics) : null,
collectionLink,
ModelBridgeInternal.getPropertiesFromQueryRequestOptions(this.cosmosQueryRequestOptions));
} else {
Expand All @@ -73,6 +75,9 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
}
}

return this.nextPolicy.shouldRetry(e);
if (this.nextPolicy != null) {
return this.nextPolicy.shouldRetry(e);
}
return Mono.just(ShouldRetryResult.error(e));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,21 @@ static public <T> Mono<T> inlineIfPossibleAsObs(Callable<Mono<T>> function, IRet
return BackoffRetryUtility.executeRetry(() -> function.call(), retryPolicy);
}
}

static public <T> Flux<T> fluxInlineIfPossibleAsObs(Callable<Flux<T>> function, IRetryPolicy retryPolicy) {

if (retryPolicy == null) {
// shortcut
return Flux.defer(() -> {
try {
return function.call();
} catch (Exception e) {
return Flux.error(e);
}
});

} else {
return BackoffRetryUtility.fluxExecuteRetry(() -> function.call(), retryPolicy);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -560,18 +560,42 @@ private String parentResourceLinkToQueryLink(String parentResouceLink, ResourceT
}

private <T extends Resource> Flux<FeedResponse<T>> createQuery(
String parentResourceLink,
String parentResourceLink,
SqlQuerySpec sqlQuery,
CosmosQueryRequestOptions options,
Class<T> klass,
ResourceType resourceTypeEnum) {

String resourceLink = parentResourceLinkToQueryLink(parentResourceLink, resourceTypeEnum);
UUID activityId = Utils.randomUUID();
IDocumentQueryClient queryClient = documentQueryClientImpl(RxDocumentClientImpl.this);

// Trying to put this logic as low as the query pipeline
// Since for parallelQuery, each partition will have its own request, so at this point, there will be no request associate with this retry policy.
// For default document context, it already wired up InvalidPartitionExceptionRetry, but there is no harm to wire it again here
InvalidPartitionExceptionRetryPolicy invalidPartitionExceptionRetryPolicy = new InvalidPartitionExceptionRetryPolicy(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I picked up the best places to wire up the invalid partitionExceptionRetryPolicy. Several thoughts and questions:

  1. Trying to put this logic as low as the query pipeline
  2. Since for parallelQuery, each partition will have its own request, so at this point, there will be no request associate with this retry policy.
  3. I see usually we define next policy, but in this case, I do not think we need to define any. does it make sense?
  4. For default document context, it already wired up InvalidPartitionExecptionRetry, but I do not see harm to wire it again here
  5. Currently, for pipelinedDocumentQuery, we have retry policy for each partition. For example throttleRetryPolicy, does it mean each partition will retry n times? should this number be shared across all the partitions?

this.collectionCache,
null,
resourceLink,
options);

return ObservableHelper.fluxInlineIfPossibleAsObs(
() -> createQueryInternal(resourceLink, sqlQuery, options, klass, resourceTypeEnum, queryClient, activityId),
invalidPartitionExceptionRetryPolicy);
}

private <T extends Resource> Flux<FeedResponse<T>> createQueryInternal(
String resourceLink,
SqlQuerySpec sqlQuery,
CosmosQueryRequestOptions options,
Class<T> klass,
ResourceType resourceTypeEnum) {
ResourceType resourceTypeEnum,
IDocumentQueryClient queryClient,
UUID activityId) {

String queryResourceLink = parentResourceLinkToQueryLink(parentResourceLink, resourceTypeEnum);

UUID activityId = Utils.randomUUID();
IDocumentQueryClient queryClient = documentQueryClientImpl(RxDocumentClientImpl.this);
Flux<? extends IDocumentQueryExecutionContext<T>> executionContext =
DocumentQueryExecutionContextFactory.createDocumentQueryExecutionContextAsync(queryClient, resourceTypeEnum, klass, sqlQuery , options, queryResourceLink, false, activityId);
DocumentQueryExecutionContextFactory.createDocumentQueryExecutionContextAsync(queryClient, resourceTypeEnum, klass, sqlQuery , options, resourceLink, false, activityId);

return executionContext.flatMap(iDocumentQueryExecutionContext -> {
QueryInfo queryInfo = null;
if (iDocumentQueryExecutionContext instanceof PipelinedDocumentQueryExecutionContext) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
package com.azure.cosmos;

import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.FailureValidator;
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.RetryAnalyzer;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.implementation.TestConfigurations;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosDatabaseProperties;
import com.azure.cosmos.models.CosmosDatabaseRequestOptions;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosResponse;
import com.azure.cosmos.models.IndexingMode;
import com.azure.cosmos.models.IndexingPolicy;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.rx.CosmosItemResponseValidator;
import com.azure.cosmos.rx.TestSuiteBase;
import com.azure.cosmos.implementation.TestConfigurations;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -65,17 +64,6 @@ public Object[][] crudArgProvider() {
};
}

private CosmosContainerProperties getCollectionDefinition(String collectionName) {
PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
ArrayList<String> paths = new ArrayList<>();
paths.add("/mypk");
partitionKeyDef.setPaths(paths);

return new CosmosContainerProperties(
collectionName,
partitionKeyDef);
}

private InternalObjectNode getDocumentDefinition(String documentId) {
final String uuid = UUID.randomUUID().toString();
return new InternalObjectNode(String.format("{ "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.IndexingMode;
import com.azure.cosmos.models.IndexingPolicy;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.rx.TestSuiteBase;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -102,17 +100,6 @@ public void replaceContainer_withContentResponseOnWriteDisabled() throws Excepti

}

private CosmosContainerProperties getCollectionDefinition(String collectionName) {
PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
ArrayList<String> paths = new ArrayList<String>();
paths.add("/mypk");
partitionKeyDef.setPaths(paths);

return new CosmosContainerProperties(
collectionName,
partitionKeyDef);
}

private void validateContainerResponse(CosmosContainerProperties containerProperties,
CosmosContainerResponse createResponse) {
// Basic validation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,23 @@

package com.azure.cosmos;

import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.IndexingMode;
import com.azure.cosmos.models.IndexingPolicy;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.rx.TestSuiteBase;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.util.CosmosPagedIterable;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -53,19 +51,6 @@ public void afterClass() {
safeCloseSyncClient(client);
}

private CosmosContainerProperties getCollectionDefinition(String collectionName) {
PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
ArrayList<String> paths = new ArrayList<String>();
paths.add("/mypk");
partitionKeyDef.setPaths(paths);

CosmosContainerProperties collectionDefinition = new CosmosContainerProperties(
collectionName,
partitionKeyDef);

return collectionDefinition;
}

@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void createContainer_withProperties() throws Exception {
String collectionName = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,16 @@
package com.azure.cosmos;

import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.rx.TestSuiteBase;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -106,17 +103,6 @@ public void deleteItem_withContentResponseOnWriteDisabled() throws Exception {
validateMinimalItemResponse(properties, deleteResponse, false);
}

private CosmosContainerProperties getCollectionDefinition(String collectionName) {
PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
ArrayList<String> paths = new ArrayList<String>();
paths.add("/mypk");
partitionKeyDef.setPaths(paths);

return new CosmosContainerProperties(
collectionName,
partitionKeyDef);
}

private InternalObjectNode getDocumentDefinition(String documentId) {
final String uuid = UUID.randomUUID().toString();
final InternalObjectNode properties =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,6 @@ public Object[][] collectionCrudArgProvider() {
};
}

private CosmosContainerProperties getCollectionDefinition(String collectionName) {
PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
ArrayList<String> paths = new ArrayList<String>();
paths.add("/mypk");
partitionKeyDef.setPaths(paths);

CosmosContainerProperties collectionDefinition = new CosmosContainerProperties(
collectionName,
partitionKeyDef);

return collectionDefinition;
}

@Test(groups = { "emulator" }, timeOut = TIMEOUT, dataProvider = "collectionCrudArgProvider")
public void createCollection(String collectionName) throws InterruptedException {
CosmosContainerProperties collectionDefinition = getCollectionDefinition(collectionName);
Expand Down
Loading