Skip to content

Commit

Permalink
Added null value holder change (#6737)
Browse files Browse the repository at this point in the history
  • Loading branch information
kushagraThapar authored Dec 10, 2019
1 parent ba76215 commit da4bd6d
Show file tree
Hide file tree
Showing 22 changed files with 308 additions and 263 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
* This is meant to be internally used only by our sdk.
**/
public interface ICollectionRoutingMapCache {
default Mono<CollectionRoutingMap> tryLookupAsync(
default Mono<Utils.ValueHolder<CollectionRoutingMap>> tryLookupAsync(
String collectionRid,
CollectionRoutingMap previousValue,
Map<String, Object> properties) {
return tryLookupAsync(collectionRid, previousValue, false, properties);
}

Mono<CollectionRoutingMap> tryLookupAsync(
Mono<Utils.ValueHolder<CollectionRoutingMap>> tryLookupAsync(
String collectionRid,
CollectionRoutingMap previousValue,
boolean forceRefreshCollectionRoutingMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ public interface IRoutingMapProvider {
/// <param name="range">This method will return all ranges which overlap this range.</param>
/// <param name="forceRefresh">Whether forcefully refreshing the routing map is necessary</param>
/// <returns>List of effective partition key ranges for a collection or null if collection doesn't exist.</returns>
Mono<List<PartitionKeyRange>> tryGetOverlappingRangesAsync(String collectionResourceId, Range<String> range,
Mono<Utils.ValueHolder<List<PartitionKeyRange>>> tryGetOverlappingRangesAsync(String collectionResourceId, Range<String> range,
boolean forceRefresh /* = false */, Map<String, Object> properties);

Mono<PartitionKeyRange> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId,
Mono<Utils.ValueHolder<PartitionKeyRange>> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId,
boolean forceRefresh /* = false */, Map<String, Object> properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,30 @@ public Mono<ShouldRetryResult> shouldRetry(Exception exception) {
if (this.feedOptions != null) {
request.properties = this.feedOptions.properties();
}
Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);

return collectionObs.flatMap(collection -> {
return collectionObs.flatMap(collectionValueHolder -> {

Mono<CollectionRoutingMap> routingMapObs = this.partitionKeyRangeCache.tryLookupAsync(collection.resourceId(), null, request.properties);
Mono<Utils.ValueHolder<CollectionRoutingMap>> routingMapObs = this.partitionKeyRangeCache.tryLookupAsync(collectionValueHolder.v.resourceId(),
null, request.properties);

Mono<CollectionRoutingMap> refreshedRoutingMapObs = routingMapObs.flatMap(routingMap -> {
// Force refresh.
return this.partitionKeyRangeCache.tryLookupAsync(
collection.resourceId(),
routingMap,
Mono<Utils.ValueHolder<CollectionRoutingMap>> refreshedRoutingMapObs = routingMapObs.flatMap(routingMapValueHolder -> {
if (routingMapValueHolder.v != null) {
// Force refresh.
return this.partitionKeyRangeCache.tryLookupAsync(
collectionValueHolder.v.resourceId(),
routingMapValueHolder.v,
request.properties);
}).switchIfEmpty(Mono.defer(Mono::empty));
} else {
return Mono.just(new Utils.ValueHolder<>(null));
}
});

// TODO: Check if this behavior can be replaced by doOnSubscribe
return refreshedRoutingMapObs.flatMap(rm -> {
this.retried = true;
return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
}).switchIfEmpty(Mono.defer(() -> {
this.retried = true;
return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
}));
});

});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,16 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
request.forceNameCacheRefresh = true;
request.requestContext.resolvedCollectionRid = null;

Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);

return collectionObs.flatMap(collectionInfo -> {
if (!StringUtils.isEmpty(oldCollectionRid) && !StringUtils.isEmpty(collectionInfo.resourceId())) {
return collectionObs.flatMap(collectionValueHolder -> {
if (collectionValueHolder.v == null) {
logger.warn("Can't recover from session unavailable exception because resolving collection name {} returned null", request.getResourceAddress());
} else if (!StringUtils.isEmpty(oldCollectionRid) && !StringUtils.isEmpty(collectionValueHolder.v.resourceId())) {
return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
}
return Mono.just(shouldRetryResult);
}).switchIfEmpty(Mono.defer(() -> {
logger.warn("Can't recover from session unavailable exception because resolving collection name {} returned null", request.getResourceAddress());
return Mono.just(shouldRetryResult);
})).onErrorResume(throwable -> {
}).onErrorResume(throwable -> {
// When resolveCollectionAsync throws an exception ignore it because it's an attempt to recover an existing
// error. When the recovery fails we return ShouldRetryResult.noRetry and propagate the original exception to the client

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,19 +869,19 @@ private Map<String, String> getRequestHeaders(RequestOptions options) {
private Mono<RxDocumentServiceRequest> addPartitionKeyInformation(RxDocumentServiceRequest request, Document document,
RequestOptions options) {

Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);
return collectionObs
.map(collection -> {
addPartitionKeyInformation(request, document, options, collection);
return request;
});
.map(collectionValueHolder -> {
addPartitionKeyInformation(request, document, options, collectionValueHolder.v);
return request;
});
}

private Mono<RxDocumentServiceRequest> addPartitionKeyInformation(RxDocumentServiceRequest request, Document document, RequestOptions options,
Mono<DocumentCollection> collectionObs) {
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs) {

return collectionObs.map(collection -> {
addPartitionKeyInformation(request, document, options, collection);
return collectionObs.map(collectionValueHolder -> {
addPartitionKeyInformation(request, document, options, collectionValueHolder.v);
return request;
});
}
Expand Down Expand Up @@ -969,7 +969,7 @@ private Mono<RxDocumentServiceRequest> getCreateDocumentRequest(String documentC
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(operationType, ResourceType.Document, path,
typedDocument, requestHeaders, options);

Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);
return addPartitionKeyInformation(request, typedDocument, options, collectionObs);
}

Expand Down Expand Up @@ -1217,7 +1217,7 @@ private Flux<ResourceResponse<Document>> replaceDocumentInternal(String document

validateResource(document);

Mono<DocumentCollection> collectionObs = collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = collectionCache.resolveCollectionAsync(request);
Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, document, options, collectionObs);

return requestObs.flux().flatMap(req -> {
Expand Down Expand Up @@ -1247,7 +1247,7 @@ private Flux<ResourceResponse<Document>> deleteDocumentInternal(String documentL
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Delete,
ResourceType.Document, path, requestHeaders, options);

Mono<DocumentCollection> collectionObs = collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = collectionCache.resolveCollectionAsync(request);

Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, null, options, collectionObs);

Expand Down Expand Up @@ -1283,7 +1283,7 @@ private Flux<ResourceResponse<Document>> readDocumentInternal(String documentLin
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Read,
ResourceType.Document, path, requestHeaders, options);

Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);

Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, null, options, collectionObs);

Expand Down Expand Up @@ -2587,7 +2587,7 @@ private <T extends Resource> Flux<FeedResponse<T>> readFeedCollectionChild(FeedO

Function<RxDocumentServiceRequest, Flux<FeedResponse<T>>> executeFunc = request -> {
return ObservableHelper.inlineIfPossibleAsObs(() -> {
Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<RxDocumentServiceRequest> requestObs = this.addPartitionKeyInformation(request, null, requestOptions, collectionObs);

return requestObs.flux().flatMap(req -> this.readFeed(req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.azure.data.cosmos.internal.ICollectionRoutingMapCache;
import com.azure.data.cosmos.internal.IRoutingMapProvider;
import com.azure.data.cosmos.internal.PartitionKeyRange;
import com.azure.data.cosmos.internal.Utils;
import com.azure.data.cosmos.internal.routing.CollectionRoutingMap;
import com.azure.data.cosmos.internal.routing.Range;
import reactor.core.publisher.Mono;
Expand All @@ -17,14 +18,14 @@
*/
public interface IPartitionKeyRangeCache extends IRoutingMapProvider, ICollectionRoutingMapCache {

Mono<CollectionRoutingMap> tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, Map<String, Object> properties);
Mono<Utils.ValueHolder<CollectionRoutingMap>> tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, Map<String, Object> properties);

Mono<List<PartitionKeyRange>> tryGetOverlappingRangesAsync(String collectionRid, Range<String> range, boolean forceRefresh,
Map<String, Object> properties);
Mono<Utils.ValueHolder<List<PartitionKeyRange>>> tryGetOverlappingRangesAsync(String collectionRid, Range<String> range, boolean forceRefresh,
Map<String, Object> properties);

Mono<PartitionKeyRange> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, boolean forceRefresh,
Mono<Utils.ValueHolder<PartitionKeyRange>> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, boolean forceRefresh,
Map<String, Object> properties);

Mono<PartitionKeyRange> tryGetRangeByPartitionKeyRangeId(String collectionRid, String partitionKeyRangeId, Map<String, Object> properties);
Mono<Utils.ValueHolder<PartitionKeyRange>> tryGetRangeByPartitionKeyRangeId(String collectionRid, String partitionKeyRangeId, Map<String, Object> properties);

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.data.cosmos.internal.RMResources;
import com.azure.data.cosmos.internal.ResourceId;
import com.azure.data.cosmos.internal.RxDocumentServiceRequest;
import com.azure.data.cosmos.internal.Utils;
import com.azure.data.cosmos.internal.routing.PartitionKeyRangeIdentity;
import org.apache.commons.lang3.StringUtils;
import reactor.core.Exceptions;
Expand Down Expand Up @@ -36,7 +37,7 @@ protected RxCollectionCache() {
* @param request Request to resolve.
* @return an instance of Single&lt;DocumentCollection&gt;
*/
public Mono<DocumentCollection> resolveCollectionAsync(
public Mono<Utils.ValueHolder<DocumentCollection>> resolveCollectionAsync(
RxDocumentServiceRequest request) {
// Mono Void to represent only terminal events specifically complete and error
Mono<Void> init = null;
Expand All @@ -46,14 +47,17 @@ public Mono<DocumentCollection> resolveCollectionAsync(
init = mono.then(Mono.fromRunnable(() -> request.setForceNameCacheRefresh(false)));
}

Mono<DocumentCollection> collectionInfoObs = this.resolveByPartitionKeyRangeIdentityAsync(
Mono<Utils.ValueHolder<DocumentCollection>> collectionInfoObs = this.resolveByPartitionKeyRangeIdentityAsync(
request.getPartitionKeyRangeIdentity(), request.properties);

if (init != null) {
collectionInfoObs = init.then(collectionInfoObs);
}

return collectionInfoObs.flatMap(Mono::just).switchIfEmpty(Mono.defer(() -> {
return collectionInfoObs.flatMap(collectionValueHolder -> {
if (collectionValueHolder.v != null) {
return Mono.just(collectionValueHolder);
}
if (request.requestContext.resolvedCollectionRid == null) {

Mono<DocumentCollection> collectionInfoRes = this.resolveByNameAsync(request.getResourceAddress(), request.properties);
Expand All @@ -67,16 +71,23 @@ public Mono<DocumentCollection> resolveCollectionAsync(

request.setResourceId(collection.resourceId());
request.requestContext.resolvedCollectionRid = collection.resourceId();
return Mono.just(collection);
return Mono.just(new Utils.ValueHolder<>(collection));

});
} else {
return this.resolveByRidAsync(request.requestContext.resolvedCollectionRid, request.properties);
}
}));
});
} else {
return resolveByPartitionKeyRangeIdentityAsync(request.getPartitionKeyRangeIdentity(),request.properties)
.flatMap(Mono::just).switchIfEmpty(this.resolveByRidAsync(request.getResourceAddress(), request.properties));
.flatMap(collectionValueHolder -> {

if (collectionValueHolder.v != null) {
return Mono.just(collectionValueHolder);
}

return this.resolveByRidAsync(request.getResourceAddress(), request.properties);
});
}
}

Expand All @@ -101,7 +112,7 @@ public void refresh(String resourceAddress, Map<String, Object> properties) {

protected abstract Mono<DocumentCollection> getByNameAsync(String resourceAddress, Map<String, Object> properties);

private Mono<DocumentCollection> resolveByPartitionKeyRangeIdentityAsync(PartitionKeyRangeIdentity partitionKeyRangeIdentity, Map<String, Object> properties) {
private Mono<Utils.ValueHolder<DocumentCollection>> resolveByPartitionKeyRangeIdentityAsync(PartitionKeyRangeIdentity partitionKeyRangeIdentity, Map<String, Object> properties) {
// if request is targeted at specific partition using x-ms-documentd-partitionkeyrangeid header,
// which contains value "<collectionrid>,<partitionkeyrangeid>", then resolve to collection rid in this header.
if (partitionKeyRangeIdentity != null && partitionKeyRangeIdentity.getCollectionRid() != null) {
Expand All @@ -117,19 +128,20 @@ private Mono<DocumentCollection> resolveByPartitionKeyRangeIdentityAsync(Partiti

});
}
return Mono.empty();
return Mono.just(new Utils.ValueHolder<>(null));
}

private Mono<DocumentCollection> resolveByRidAsync(
private Mono<Utils.ValueHolder<DocumentCollection>> resolveByRidAsync(
String resourceId, Map<String, Object> properties) {

ResourceId resourceIdParsed = ResourceId.parse(resourceId);
String collectionResourceId = resourceIdParsed.getDocumentCollectionId().toString();

return this.collectionInfoByIdCache.getAsync(
collectionResourceId,
null,
() -> this.getByRidAsync(collectionResourceId, properties));
Mono<DocumentCollection> async = this.collectionInfoByIdCache.getAsync(
collectionResourceId,
null,
() -> this.getByRidAsync(collectionResourceId, properties));
return async.map(Utils.ValueHolder::new);
}

private Mono<DocumentCollection> resolveByNameAsync(
Expand Down
Loading

0 comments on commit da4bd6d

Please sign in to comment.