From da4bd6d3c77eca1e7d8520f02c0598cee943ece2 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Tue, 10 Dec 2019 12:27:27 -0800 Subject: [PATCH] Added null value holder change (#6737) --- .../internal/ICollectionRoutingMapCache.java | 4 +- .../cosmos/internal/IRoutingMapProvider.java | 4 +- .../PartitionKeyRangeGoneRetryPolicy.java | 28 +-- ...enameCollectionAwareClientRetryPolicy.java | 13 +- .../cosmos/internal/RxDocumentClientImpl.java | 26 +-- .../caches/IPartitionKeyRangeCache.java | 11 +- .../internal/caches/RxCollectionCache.java | 38 ++-- .../caches/RxPartitionKeyRangeCache.java | 89 +++++---- .../directconnectivity/AddressResolver.java | 171 +++++++++--------- .../GatewayAddressCache.java | 19 +- .../GlobalAddressResolver.java | 19 +- .../directconnectivity/IAddressCache.java | 3 +- .../DefaultDocumentQueryExecutionContext.java | 2 +- .../internal/query/DocumentProducer.java | 10 +- .../DocumentQueryExecutionContextFactory.java | 14 +- .../routing/RoutingMapProviderHelper.java | 33 ++-- ...eCollectionAwareClientRetryPolicyTest.java | 2 +- .../AddressResolverTest.java | 15 +- .../GatewayAddressCacheTest.java | 59 +++--- .../GlobalAddressResolverTest.java | 5 +- .../internal/query/DocumentProducerTest.java | 3 +- .../routing/RoutingMapProviderHelperTest.java | 3 +- 22 files changed, 308 insertions(+), 263 deletions(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ICollectionRoutingMapCache.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ICollectionRoutingMapCache.java index ce1bf103cf2da..d91e9d2c6bced 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ICollectionRoutingMapCache.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ICollectionRoutingMapCache.java @@ -13,14 +13,14 @@ * This is meant to be internally used only by our sdk. **/ public interface ICollectionRoutingMapCache { - default Mono tryLookupAsync( + default Mono> tryLookupAsync( String collectionRid, CollectionRoutingMap previousValue, Map properties) { return tryLookupAsync(collectionRid, previousValue, false, properties); } - Mono tryLookupAsync( + Mono> tryLookupAsync( String collectionRid, CollectionRoutingMap previousValue, boolean forceRefreshCollectionRoutingMap, diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/IRoutingMapProvider.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/IRoutingMapProvider.java index 8d9c5236d25cd..9cf6376cd56f1 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/IRoutingMapProvider.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/IRoutingMapProvider.java @@ -22,9 +22,9 @@ public interface IRoutingMapProvider { /// This method will return all ranges which overlap this range. /// Whether forcefully refreshing the routing map is necessary /// List of effective partition key ranges for a collection or null if collection doesn't exist. - Mono> tryGetOverlappingRangesAsync(String collectionResourceId, Range range, + Mono>> tryGetOverlappingRangesAsync(String collectionResourceId, Range range, boolean forceRefresh /* = false */, Map properties); - Mono tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, + Mono> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, boolean forceRefresh /* = false */, Map properties); } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/PartitionKeyRangeGoneRetryPolicy.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/PartitionKeyRangeGoneRetryPolicy.java index 7bedc56d484f8..4ea8250b386d6 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/PartitionKeyRangeGoneRetryPolicy.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/PartitionKeyRangeGoneRetryPolicy.java @@ -64,28 +64,30 @@ public Mono shouldRetry(Exception exception) { if (this.feedOptions != null) { request.properties = this.feedOptions.properties(); } - Mono collectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = this.collectionCache.resolveCollectionAsync(request); - return collectionObs.flatMap(collection -> { + return collectionObs.flatMap(collectionValueHolder -> { - Mono routingMapObs = this.partitionKeyRangeCache.tryLookupAsync(collection.resourceId(), null, request.properties); + Mono> routingMapObs = this.partitionKeyRangeCache.tryLookupAsync(collectionValueHolder.v.resourceId(), + null, request.properties); - Mono refreshedRoutingMapObs = routingMapObs.flatMap(routingMap -> { - // Force refresh. - return this.partitionKeyRangeCache.tryLookupAsync( - collection.resourceId(), - routingMap, + Mono> refreshedRoutingMapObs = routingMapObs.flatMap(routingMapValueHolder -> { + if (routingMapValueHolder.v != null) { + // Force refresh. + return this.partitionKeyRangeCache.tryLookupAsync( + collectionValueHolder.v.resourceId(), + routingMapValueHolder.v, request.properties); - }).switchIfEmpty(Mono.defer(Mono::empty)); + } else { + return Mono.just(new Utils.ValueHolder<>(null)); + } + }); // TODO: Check if this behavior can be replaced by doOnSubscribe return refreshedRoutingMapObs.flatMap(rm -> { this.retried = true; return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO)); - }).switchIfEmpty(Mono.defer(() -> { - this.retried = true; - return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO)); - })); + }); }); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RenameCollectionAwareClientRetryPolicy.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RenameCollectionAwareClientRetryPolicy.java index 06ec1be94708c..5f2ad048c0c0d 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RenameCollectionAwareClientRetryPolicy.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RenameCollectionAwareClientRetryPolicy.java @@ -61,17 +61,16 @@ public Mono shouldRetry(Exception e) { request.forceNameCacheRefresh = true; request.requestContext.resolvedCollectionRid = null; - Mono collectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = this.collectionCache.resolveCollectionAsync(request); - return collectionObs.flatMap(collectionInfo -> { - if (!StringUtils.isEmpty(oldCollectionRid) && !StringUtils.isEmpty(collectionInfo.resourceId())) { + return collectionObs.flatMap(collectionValueHolder -> { + if (collectionValueHolder.v == null) { + logger.warn("Can't recover from session unavailable exception because resolving collection name {} returned null", request.getResourceAddress()); + } else if (!StringUtils.isEmpty(oldCollectionRid) && !StringUtils.isEmpty(collectionValueHolder.v.resourceId())) { return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO)); } return Mono.just(shouldRetryResult); - }).switchIfEmpty(Mono.defer(() -> { - logger.warn("Can't recover from session unavailable exception because resolving collection name {} returned null", request.getResourceAddress()); - return Mono.just(shouldRetryResult); - })).onErrorResume(throwable -> { + }).onErrorResume(throwable -> { // When resolveCollectionAsync throws an exception ignore it because it's an attempt to recover an existing // error. When the recovery fails we return ShouldRetryResult.noRetry and propagate the original exception to the client diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxDocumentClientImpl.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxDocumentClientImpl.java index 1d4617d6cd5fa..5dfc4e41122ce 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxDocumentClientImpl.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxDocumentClientImpl.java @@ -869,19 +869,19 @@ private Map getRequestHeaders(RequestOptions options) { private Mono addPartitionKeyInformation(RxDocumentServiceRequest request, Document document, RequestOptions options) { - Mono collectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = this.collectionCache.resolveCollectionAsync(request); return collectionObs - .map(collection -> { - addPartitionKeyInformation(request, document, options, collection); - return request; - }); + .map(collectionValueHolder -> { + addPartitionKeyInformation(request, document, options, collectionValueHolder.v); + return request; + }); } private Mono addPartitionKeyInformation(RxDocumentServiceRequest request, Document document, RequestOptions options, - Mono collectionObs) { + Mono> collectionObs) { - return collectionObs.map(collection -> { - addPartitionKeyInformation(request, document, options, collection); + return collectionObs.map(collectionValueHolder -> { + addPartitionKeyInformation(request, document, options, collectionValueHolder.v); return request; }); } @@ -969,7 +969,7 @@ private Mono getCreateDocumentRequest(String documentC RxDocumentServiceRequest request = RxDocumentServiceRequest.create(operationType, ResourceType.Document, path, typedDocument, requestHeaders, options); - Mono collectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = this.collectionCache.resolveCollectionAsync(request); return addPartitionKeyInformation(request, typedDocument, options, collectionObs); } @@ -1217,7 +1217,7 @@ private Flux> replaceDocumentInternal(String document validateResource(document); - Mono collectionObs = collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = collectionCache.resolveCollectionAsync(request); Mono requestObs = addPartitionKeyInformation(request, document, options, collectionObs); return requestObs.flux().flatMap(req -> { @@ -1247,7 +1247,7 @@ private Flux> deleteDocumentInternal(String documentL RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Delete, ResourceType.Document, path, requestHeaders, options); - Mono collectionObs = collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = collectionCache.resolveCollectionAsync(request); Mono requestObs = addPartitionKeyInformation(request, null, options, collectionObs); @@ -1283,7 +1283,7 @@ private Flux> readDocumentInternal(String documentLin RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Read, ResourceType.Document, path, requestHeaders, options); - Mono collectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = this.collectionCache.resolveCollectionAsync(request); Mono requestObs = addPartitionKeyInformation(request, null, options, collectionObs); @@ -2587,7 +2587,7 @@ private Flux> readFeedCollectionChild(FeedO Function>> executeFunc = request -> { return ObservableHelper.inlineIfPossibleAsObs(() -> { - Mono collectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = this.collectionCache.resolveCollectionAsync(request); Mono requestObs = this.addPartitionKeyInformation(request, null, requestOptions, collectionObs); return requestObs.flux().flatMap(req -> this.readFeed(req) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/IPartitionKeyRangeCache.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/IPartitionKeyRangeCache.java index 48538b5fdb0b3..aab1d48c5afc9 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/IPartitionKeyRangeCache.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/IPartitionKeyRangeCache.java @@ -5,6 +5,7 @@ import com.azure.data.cosmos.internal.ICollectionRoutingMapCache; import com.azure.data.cosmos.internal.IRoutingMapProvider; import com.azure.data.cosmos.internal.PartitionKeyRange; +import com.azure.data.cosmos.internal.Utils; import com.azure.data.cosmos.internal.routing.CollectionRoutingMap; import com.azure.data.cosmos.internal.routing.Range; import reactor.core.publisher.Mono; @@ -17,14 +18,14 @@ */ public interface IPartitionKeyRangeCache extends IRoutingMapProvider, ICollectionRoutingMapCache { - Mono tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, Map properties); + Mono> tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, Map properties); - Mono> tryGetOverlappingRangesAsync(String collectionRid, Range range, boolean forceRefresh, - Map properties); + Mono>> tryGetOverlappingRangesAsync(String collectionRid, Range range, boolean forceRefresh, + Map properties); - Mono tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, boolean forceRefresh, + Mono> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, boolean forceRefresh, Map properties); - Mono tryGetRangeByPartitionKeyRangeId(String collectionRid, String partitionKeyRangeId, Map properties); + Mono> tryGetRangeByPartitionKeyRangeId(String collectionRid, String partitionKeyRangeId, Map properties); } \ No newline at end of file diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxCollectionCache.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxCollectionCache.java index 231cb463f7875..8801f83e29071 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxCollectionCache.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxCollectionCache.java @@ -9,6 +9,7 @@ import com.azure.data.cosmos.internal.RMResources; import com.azure.data.cosmos.internal.ResourceId; import com.azure.data.cosmos.internal.RxDocumentServiceRequest; +import com.azure.data.cosmos.internal.Utils; import com.azure.data.cosmos.internal.routing.PartitionKeyRangeIdentity; import org.apache.commons.lang3.StringUtils; import reactor.core.Exceptions; @@ -36,7 +37,7 @@ protected RxCollectionCache() { * @param request Request to resolve. * @return an instance of Single<DocumentCollection> */ - public Mono resolveCollectionAsync( + public Mono> resolveCollectionAsync( RxDocumentServiceRequest request) { // Mono Void to represent only terminal events specifically complete and error Mono init = null; @@ -46,14 +47,17 @@ public Mono resolveCollectionAsync( init = mono.then(Mono.fromRunnable(() -> request.setForceNameCacheRefresh(false))); } - Mono collectionInfoObs = this.resolveByPartitionKeyRangeIdentityAsync( + Mono> collectionInfoObs = this.resolveByPartitionKeyRangeIdentityAsync( request.getPartitionKeyRangeIdentity(), request.properties); if (init != null) { collectionInfoObs = init.then(collectionInfoObs); } - return collectionInfoObs.flatMap(Mono::just).switchIfEmpty(Mono.defer(() -> { + return collectionInfoObs.flatMap(collectionValueHolder -> { + if (collectionValueHolder.v != null) { + return Mono.just(collectionValueHolder); + } if (request.requestContext.resolvedCollectionRid == null) { Mono collectionInfoRes = this.resolveByNameAsync(request.getResourceAddress(), request.properties); @@ -67,16 +71,23 @@ public Mono resolveCollectionAsync( request.setResourceId(collection.resourceId()); request.requestContext.resolvedCollectionRid = collection.resourceId(); - return Mono.just(collection); + return Mono.just(new Utils.ValueHolder<>(collection)); }); } else { return this.resolveByRidAsync(request.requestContext.resolvedCollectionRid, request.properties); } - })); + }); } else { return resolveByPartitionKeyRangeIdentityAsync(request.getPartitionKeyRangeIdentity(),request.properties) - .flatMap(Mono::just).switchIfEmpty(this.resolveByRidAsync(request.getResourceAddress(), request.properties)); + .flatMap(collectionValueHolder -> { + + if (collectionValueHolder.v != null) { + return Mono.just(collectionValueHolder); + } + + return this.resolveByRidAsync(request.getResourceAddress(), request.properties); + }); } } @@ -101,7 +112,7 @@ public void refresh(String resourceAddress, Map properties) { protected abstract Mono getByNameAsync(String resourceAddress, Map properties); - private Mono resolveByPartitionKeyRangeIdentityAsync(PartitionKeyRangeIdentity partitionKeyRangeIdentity, Map properties) { + private Mono> resolveByPartitionKeyRangeIdentityAsync(PartitionKeyRangeIdentity partitionKeyRangeIdentity, Map properties) { // if request is targeted at specific partition using x-ms-documentd-partitionkeyrangeid header, // which contains value ",", then resolve to collection rid in this header. if (partitionKeyRangeIdentity != null && partitionKeyRangeIdentity.getCollectionRid() != null) { @@ -117,19 +128,20 @@ private Mono resolveByPartitionKeyRangeIdentityAsync(Partiti }); } - return Mono.empty(); + return Mono.just(new Utils.ValueHolder<>(null)); } - private Mono resolveByRidAsync( + private Mono> resolveByRidAsync( String resourceId, Map properties) { ResourceId resourceIdParsed = ResourceId.parse(resourceId); String collectionResourceId = resourceIdParsed.getDocumentCollectionId().toString(); - return this.collectionInfoByIdCache.getAsync( - collectionResourceId, - null, - () -> this.getByRidAsync(collectionResourceId, properties)); + Mono async = this.collectionInfoByIdCache.getAsync( + collectionResourceId, + null, + () -> this.getByRidAsync(collectionResourceId, properties)); + return async.map(Utils.ValueHolder::new); } private Mono resolveByNameAsync( diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxPartitionKeyRangeCache.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxPartitionKeyRangeCache.java index 92d0c6f90ec65..e63ffb3e852e8 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxPartitionKeyRangeCache.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxPartitionKeyRangeCache.java @@ -25,6 +25,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -52,24 +53,25 @@ public RxPartitionKeyRangeCache(AsyncDocumentClient client, RxCollectionCache co * @see IPartitionKeyRangeCache#tryLookupAsync(java.lang.STRING, com.azure.data.cosmos.internal.routing.CollectionRoutingMap) */ @Override - public Mono tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, Map properties) { + public Mono> tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, Map properties) { return routingMapCache.getAsync( collectionRid, previousValue, () -> getRoutingMapForCollectionAsync(collectionRid, previousValue, properties)) - .onErrorResume(err -> { - logger.debug("tryLookupAsync on collectionRid {} encountered failure", collectionRid, err); - CosmosClientException dce = Utils.as(err, CosmosClientException.class); - if (dce != null && Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.NOTFOUND)) { - return Mono.empty(); - } - - return Mono.error(err); - }); + .map(Utils.ValueHolder::new) + .onErrorResume(err -> { + logger.debug("tryLookupAsync on collectionRid {} encountered failure", collectionRid, err); + CosmosClientException dce = Utils.as(err, CosmosClientException.class); + if (dce != null && Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.NOTFOUND)) { + return Mono.just(new Utils.ValueHolder<>(null)); + } + + return Mono.error(err); + }); } @Override - public Mono tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, boolean forceRefreshCollectionRoutingMap, + public Mono> tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, boolean forceRefreshCollectionRoutingMap, Map properties) { return tryLookupAsync(collectionRid, previousValue, properties); } @@ -78,63 +80,73 @@ public Mono tryLookupAsync(String collectionRid, Collectio * @see IPartitionKeyRangeCache#tryGetOverlappingRangesAsync(java.lang.STRING, com.azure.data.cosmos.internal.routing.RANGE, boolean) */ @Override - public Mono> tryGetOverlappingRangesAsync(String collectionRid, Range range, boolean forceRefresh, + public Mono>> tryGetOverlappingRangesAsync(String collectionRid, Range range, boolean forceRefresh, Map properties) { - Mono routingMapObs = tryLookupAsync(collectionRid, null, properties); + Mono> routingMapObs = tryLookupAsync(collectionRid, null, properties); - return routingMapObs.flatMap(routingMap -> { - if (forceRefresh) { + return routingMapObs.flatMap(routingMapValueHolder -> { + if (forceRefresh && routingMapValueHolder.v != null) { logger.debug("tryGetOverlappingRangesAsync with forceRefresh on collectionRid {}", collectionRid); - return tryLookupAsync(collectionRid, routingMap, properties); + return tryLookupAsync(collectionRid, routingMapValueHolder.v, properties); } - return Mono.just(routingMap); - }).switchIfEmpty(Mono.empty()).map(routingMap -> routingMap.getOverlappingRanges(range)).switchIfEmpty(Mono.defer(() -> { - logger.debug("Routing Map Null for collection: {} for range: {}, forceRefresh:{}", collectionRid, range.toString(), forceRefresh); - return Mono.empty(); - })); + return Mono.just(routingMapValueHolder); + }).map(routingMapValueHolder -> { + if (routingMapValueHolder.v != null) { + // TODO: the routingMap.getOverlappingRanges(range) returns Collection + // maybe we should consider changing to ArrayList to avoid conversion + return new Utils.ValueHolder<>(new ArrayList<>(routingMapValueHolder.v.getOverlappingRanges(range))); + } else { + logger.debug("Routing Map Null for collection: {} for range: {}, forceRefresh:{}", collectionRid, range.toString(), forceRefresh); + return new Utils.ValueHolder<>(null); + } + }); } /* (non-Javadoc) * @see IPartitionKeyRangeCache#tryGetPartitionKeyRangeByIdAsync(java.lang.STRING, java.lang.STRING, boolean) */ @Override - public Mono tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, + public Mono> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, boolean forceRefresh, Map properties) { - Mono routingMapObs = tryLookupAsync(collectionResourceId, null, properties); + Mono> routingMapObs = tryLookupAsync(collectionResourceId, null, properties); - return routingMapObs.flatMap(routingMap -> { - if (forceRefresh && routingMap != null) { - return tryLookupAsync(collectionResourceId, routingMap, properties); + return routingMapObs.flatMap(routingMapValueHolder -> { + if (forceRefresh && routingMapValueHolder.v != null) { + return tryLookupAsync(collectionResourceId, routingMapValueHolder.v, properties); } - return Mono.justOrEmpty(routingMap); - - }).switchIfEmpty(Mono.defer(Mono::empty)).map(routingMap -> routingMap.getRangeByPartitionKeyRangeId(partitionKeyRangeId)).switchIfEmpty(Mono.defer(() -> { - logger.debug("Routing Map Null for collection: {}, PartitionKeyRangeId: {}, forceRefresh:{}", collectionResourceId, partitionKeyRangeId, forceRefresh); - return null; - })); + return Mono.just(routingMapValueHolder); + + }).map(routingMapValueHolder -> { + if (routingMapValueHolder.v != null) { + return new Utils.ValueHolder<>(routingMapValueHolder.v.getRangeByPartitionKeyRangeId(partitionKeyRangeId)); + } else { + logger.debug("Routing Map Null for collection: {}, PartitionKeyRangeId: {}, forceRefresh:{}", collectionResourceId, partitionKeyRangeId, forceRefresh); + return new Utils.ValueHolder<>(null); + } + }); } /* (non-Javadoc) * @see IPartitionKeyRangeCache#tryGetRangeByPartitionKeyRangeId(java.lang.STRING, java.lang.STRING) */ @Override - public Mono tryGetRangeByPartitionKeyRangeId(String collectionRid, String partitionKeyRangeId, Map properties) { - Mono routingMapObs = routingMapCache.getAsync( + public Mono> tryGetRangeByPartitionKeyRangeId(String collectionRid, String partitionKeyRangeId, Map properties) { + Mono> routingMapObs = routingMapCache.getAsync( collectionRid, null, - () -> getRoutingMapForCollectionAsync(collectionRid, null, properties)); + () -> getRoutingMapForCollectionAsync(collectionRid, null, properties)).map(Utils.ValueHolder::new); - return routingMapObs.map(routingMap -> routingMap.getRangeByPartitionKeyRangeId(partitionKeyRangeId)) + return routingMapObs.map(routingMapValueHolder -> new Utils.ValueHolder<>(routingMapValueHolder.v.getRangeByPartitionKeyRangeId(partitionKeyRangeId))) .onErrorResume(err -> { CosmosClientException dce = Utils.as(err, CosmosClientException.class); logger.debug("tryGetRangeByPartitionKeyRangeId on collectionRid {} and partitionKeyRangeId {} encountered failure", collectionRid, partitionKeyRangeId, err); if (dce != null && Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.NOTFOUND)) { - return Mono.empty(); + return Mono.just(new Utils.ValueHolder<>(null)); } return Mono.error(dce); @@ -193,7 +205,8 @@ private Mono> getPartitionKeyRange(String collectionRid, ); //this request doesn't actually go to server request.requestContext.resolvedCollectionRid = collectionRid; - Mono collectionObs = collectionCache.resolveCollectionAsync(request); + Mono collectionObs = collectionCache.resolveCollectionAsync(request) + .map(collectionValueHolder -> collectionValueHolder.v); return collectionObs.flatMap(coll -> { diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/AddressResolver.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/AddressResolver.java index e05497cb956a2..bb6163f4e7704 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/AddressResolver.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/AddressResolver.java @@ -20,6 +20,7 @@ import com.azure.data.cosmos.internal.ResourceType; import com.azure.data.cosmos.internal.RxDocumentServiceRequest; import com.azure.data.cosmos.internal.Strings; +import com.azure.data.cosmos.internal.Utils; import com.azure.data.cosmos.internal.caches.RxCollectionCache; import com.azure.data.cosmos.internal.routing.CollectionRoutingMap; import com.azure.data.cosmos.internal.routing.PartitionKeyInternal; @@ -180,7 +181,7 @@ private static void ensureRoutingMapPresent( } } - private Mono tryResolveServerPartitionAsync( + private Mono> tryResolveServerPartitionAsync( RxDocumentServiceRequest request, DocumentCollection collection, CollectionRoutingMap routingMap, @@ -232,20 +233,25 @@ private Mono tryResolveServerPartitionAsync( // upper logic will refresh cache and retry. logger.debug("Collection cache or routing map cache is potentially outdated." + " Returning null. Upper logic will refresh cache and retry."); - return Mono.empty(); + return Mono.just(new Utils.ValueHolder<>(null)); } - Mono addressesObs = this.addressCache.tryGetAddresses( + Mono> addressesObs = this.addressCache.tryGetAddresses( request, new PartitionKeyRangeIdentity(collection.resourceId(), range.id()), forceRefreshPartitionAddresses); - return addressesObs.flatMap(addresses -> Mono.just(new ResolutionResult(range, addresses))).switchIfEmpty(Mono.defer(() -> { - logger.info( - "Could not resolve addresses for identity {}/{}. Potentially collection cache or routing map cache is outdated. Return empty - upper logic will refresh and retry. ", + return addressesObs.flatMap(addressesValueHolder -> { + + if (addressesValueHolder.v == null) { + logger.info( + "Could not resolve addresses for identity {}/{}. Potentially collection cache or routing map cache is outdated. Return null - upper logic will refresh and retry. ", new PartitionKeyRangeIdentity(collection.resourceId(), range.id())); - return Mono.empty(); - })); + return Mono.just(new Utils.ValueHolder<>(null)); + } + + return Mono.just(new Utils.ValueHolder<>(new ResolutionResult(range, addressesValueHolder.v))); + }); } catch (Exception e) { return Mono.error(e); @@ -287,21 +293,23 @@ private Mono resolveMasterResourceAddress(RxDocumentServiceReq && request.getPartitionKeyRangeIdentity() == null; // ServiceIdentity serviceIdentity = this.masterServiceIdentity; - Mono addressesObs = this.addressCache.tryGetAddresses(request, + Mono> addressesObs = this.addressCache.tryGetAddresses(request, masterPartitionKeyRangeIdentity,forceRefreshPartitionAddresses); - return addressesObs.flatMap(addresses -> { + return addressesObs.flatMap(addressesValueHolder -> { + if (addressesValueHolder.v == null) { + logger.warn("Could not get addresses for master partition"); + + // return Observable.getError() + NotFoundException e = new NotFoundException(); + BridgeInternal.setResourceAddress(e, request.getResourceAddress()); + return Mono.error(e); + } + PartitionKeyRange partitionKeyRange = new PartitionKeyRange(); partitionKeyRange.id(PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID); - return Mono.just(new ResolutionResult(partitionKeyRange, addresses)); - }).switchIfEmpty(Mono.defer(() -> { - logger.warn("Could not get addresses for master partition"); - - // return Observable.error() - NotFoundException e = new NotFoundException(); - BridgeInternal.setResourceAddress(e, request.getResourceAddress()); - return Mono.error(e); - })); + return Mono.just(new ResolutionResult(partitionKeyRange, addressesValueHolder.v)); + }); } private class RefreshState { @@ -321,36 +329,30 @@ private Mono getOrRefreshRoutingMap(RxDocumentServiceRequest reque (request.getPartitionKeyRangeIdentity() != null && request.getPartitionKeyRangeIdentity().getCollectionRid() != null); state.collectionRoutingMapCacheIsUptoDate = false; - Mono collectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = this.collectionCache.resolveCollectionAsync(request); - Mono stateObs = collectionObs.flatMap(collection -> { - state.collection = collection; - Mono routingMapObs = - this.collectionRoutingMapCache.tryLookupAsync(collection.resourceId(), null, request.forceCollectionRoutingMapRefresh, request.properties); - final DocumentCollection underlyingCollection = collection; - return routingMapObs.flatMap(routingMap -> { - state.routingMap = routingMap; + Mono stateObs = collectionObs.flatMap(collectionValueHolder -> { + state.collection = collectionValueHolder.v; + Mono> routingMapObs = + this.collectionRoutingMapCache.tryLookupAsync(collectionValueHolder.v.resourceId(), null, request.forceCollectionRoutingMapRefresh, request.properties); + final Utils.ValueHolder underlyingCollection = collectionValueHolder; + return routingMapObs.flatMap(routingMapValueHolder -> { + state.routingMap = routingMapValueHolder.v; if (request.forcePartitionKeyRangeRefresh) { state.collectionRoutingMapCacheIsUptoDate = true; request.forcePartitionKeyRangeRefresh = false; - if (routingMap != null) { - return this.collectionRoutingMapCache.tryLookupAsync(underlyingCollection.resourceId(), routingMap, request.properties) - .map(newRoutingMap -> { - state.routingMap = newRoutingMap; + if (routingMapValueHolder.v != null) { + return this.collectionRoutingMapCache.tryLookupAsync(underlyingCollection.v.resourceId(), routingMapValueHolder.v, request.properties) + .map(newRoutingMapValueHolder -> { + state.routingMap = newRoutingMapValueHolder.v; return state; }); } } return Mono.just(state); - }).switchIfEmpty(Mono.defer(() -> { - if (request.forcePartitionKeyRangeRefresh) { - state.collectionRoutingMapCacheIsUptoDate = true; - request.forcePartitionKeyRangeRefresh = false; - } - return Mono.just(state); - })); + }); }); return stateObs.flatMap(newState -> { @@ -362,17 +364,17 @@ private Mono getOrRefreshRoutingMap(RxDocumentServiceRequest reque newState.collectionCacheIsUptoDate = true; newState.collectionRoutingMapCacheIsUptoDate = false; - Mono newCollectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> newCollectionObs = this.collectionCache.resolveCollectionAsync(request); - return newCollectionObs.flatMap(collection -> { - newState.collection = collection; - Mono newRoutingMapObs = this.collectionRoutingMapCache.tryLookupAsync( - collection.resourceId(), + return newCollectionObs.flatMap(collectionValueHolder -> { + newState.collection = collectionValueHolder.v; + Mono> newRoutingMapObs = this.collectionRoutingMapCache.tryLookupAsync( + collectionValueHolder.v.resourceId(), null, request.properties); - return newRoutingMapObs.map(routingMap -> { - newState.routingMap = routingMap; + return newRoutingMapObs.map(routingMapValueHolder -> { + newState.routingMap = routingMapValueHolder.v; return newState; }); } @@ -384,14 +386,11 @@ private Mono getOrRefreshRoutingMap(RxDocumentServiceRequest reque }); } - private Mono getStateWithNewRoutingMap(RefreshState state, Mono routingMapSingle) { - return routingMapSingle.map(r -> { - state.routingMap = r; - return state; - }).switchIfEmpty(Mono.fromSupplier(() -> { - state.routingMap = null; + private Mono getStateWithNewRoutingMap(RefreshState state, Mono> routingMapSingle) { + return routingMapSingle.map(routingMapValueHolder -> { + state.routingMap = routingMapValueHolder.v; return state; - })); + }); } /** @@ -422,7 +421,7 @@ private Mono resolveAddressesAndIdentityAsync( } // At this point we have both collection and routingMap. - Mono resultObs = this.tryResolveServerPartitionAsync( + Mono> resultObs = this.tryResolveServerPartitionAsync( request, state.collection, state.routingMap, @@ -446,16 +445,21 @@ private Mono resolveAddressesAndIdentityAsync( return Mono.just(funcResolutionResult); }; - return resultObs.flatMap(addCollectionRidIfNameBased).switchIfEmpty(Mono.defer(() -> { - // result is empty + return resultObs.flatMap(resolutionResultValueHolder -> { + if (resolutionResultValueHolder.v != null) { + return addCollectionRidIfNameBased.apply(resolutionResultValueHolder.v); + } + + // result is null: + assert resolutionResultValueHolder.v == null; Function> ensureCollectionRoutingMapCacheIsUptoDateFunc = funcState -> { if (!funcState.collectionRoutingMapCacheIsUptoDate) { funcState.collectionRoutingMapCacheIsUptoDate = true; - Mono newRoutingMapObs = this.collectionRoutingMapCache.tryLookupAsync( - funcState.collection.resourceId(), - funcState.routingMap, - request.properties); + Mono> newRoutingMapObs = this.collectionRoutingMapCache.tryLookupAsync( + funcState.collection.resourceId(), + funcState.routingMap, + request.properties); return getStateWithNewRoutingMap(funcState, newRoutingMapObs); } else { @@ -463,7 +467,7 @@ private Mono resolveAddressesAndIdentityAsync( } }; - Function> resolveServerPartition = funcState -> { + Function>> resolveServerPartition = funcState -> { try { AddressResolver.ensureRoutingMapPresent(request, funcState.routingMap, funcState.collection); @@ -480,8 +484,8 @@ private Mono resolveAddressesAndIdentityAsync( forceRefreshPartitionAddresses); }; - Function> onNullThrowNotFound = funcResolutionResult -> { - if (funcResolutionResult == null) { + Function, Mono> onNullThrowNotFound = funcResolutionResult -> { + if (funcResolutionResult.v == null) { logger.debug("Couldn't route partitionkeyrange-oblivious request after retry/cache refresh. Collection doesn't exist."); // At this point collection cache and routing map caches are refreshed. @@ -491,7 +495,7 @@ private Mono resolveAddressesAndIdentityAsync( return Mono.error(BridgeInternal.setResourceAddress(new NotFoundException(), request.getResourceAddress())); } - return Mono.just(funcResolutionResult); + return Mono.just(funcResolutionResult.v); }; // Couldn't resolve server partition or its addresses. @@ -500,16 +504,16 @@ private Mono resolveAddressesAndIdentityAsync( request.forceNameCacheRefresh = true; state.collectionCacheIsUptoDate = true; - Mono newCollectionObs = this.collectionCache.resolveCollectionAsync(request); - Mono newRefreshStateObs = newCollectionObs.flatMap(collection -> { - state.collection = collection; + Mono> newCollectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono newRefreshStateObs = newCollectionObs.flatMap(collectionValueHolder -> { + state.collection = collectionValueHolder.v; - if (collection.resourceId() != state.routingMap.getCollectionUniqueId()) { + if (collectionValueHolder.v.resourceId() != state.routingMap.getCollectionUniqueId()) { // Collection cache was stale. We resolved to new Rid. routing map cache is potentially stale // for this new collection rid. Mark it as such. state.collectionRoutingMapCacheIsUptoDate = false; - Mono newRoutingMap = this.collectionRoutingMapCache.tryLookupAsync( - collection.resourceId(), + Mono> newRoutingMap = this.collectionRoutingMapCache.tryLookupAsync( + collectionValueHolder.v.resourceId(), null, request.properties); @@ -519,7 +523,7 @@ private Mono resolveAddressesAndIdentityAsync( return Mono.just(state); }); - Mono newResultObs = newRefreshStateObs.flatMap(ensureCollectionRoutingMapCacheIsUptoDateFunc) + Mono> newResultObs = newRefreshStateObs.flatMap(ensureCollectionRoutingMapCacheIsUptoDateFunc) .flatMap(resolveServerPartition); return newResultObs.flatMap(onNullThrowNotFound).flatMap(addCollectionRidIfNameBased); @@ -530,7 +534,7 @@ private Mono resolveAddressesAndIdentityAsync( .flatMap(onNullThrowNotFound) .flatMap(addCollectionRidIfNameBased); } - })); + }); } ); } @@ -557,13 +561,13 @@ private ResolutionResult handleRangeAddressResolutionFailure( private Mono returnOrError(Callable function) { try { - return Mono.justOrEmpty(function.call()); + return Mono.just(function.call()); } catch (Exception e) { return Mono.error(e); } } - private Mono tryResolveServerPartitionByPartitionKeyRangeIdAsync( + private Mono> tryResolveServerPartitionByPartitionKeyRangeIdAsync( RxDocumentServiceRequest request, DocumentCollection collection, CollectionRoutingMap routingMap, @@ -574,23 +578,26 @@ private Mono tryResolveServerPartitionByPartitionKeyRangeIdAsy PartitionKeyRange partitionKeyRange = routingMap.getRangeByPartitionKeyRangeId(request.getPartitionKeyRangeIdentity().getPartitionKeyRangeId()); if (partitionKeyRange == null) { logger.debug("Cannot resolve range '{}'", request.getPartitionKeyRangeIdentity().toHeader()); - return returnOrError(() -> this.handleRangeAddressResolutionFailure(request, collectionCacheIsUpToDate, routingMapCacheIsUpToDate, routingMap)); + return returnOrError(() -> new Utils.ValueHolder<>(this.handleRangeAddressResolutionFailure(request, collectionCacheIsUpToDate, routingMapCacheIsUpToDate, routingMap))); } - Mono addressesObs = this.addressCache.tryGetAddresses( + Mono> addressesObs = this.addressCache.tryGetAddresses( request, new PartitionKeyRangeIdentity(collection.resourceId(), request.getPartitionKeyRangeIdentity().getPartitionKeyRangeId()), forceRefreshPartitionAddresses); - return addressesObs.flatMap(addresses -> Mono.just(new ResolutionResult(partitionKeyRange, addresses))).switchIfEmpty(Mono.defer(() -> { - logger.debug("Cannot resolve addresses for range '{}'", request.getPartitionKeyRangeIdentity().toHeader()); + return addressesObs.flatMap(addressesValueHolder -> { + if (addressesValueHolder.v == null) { + logger.debug("Cannot resolve addresses for range '{}'", request.getPartitionKeyRangeIdentity().toHeader()); - try { - return Mono.justOrEmpty(this.handleRangeAddressResolutionFailure(request, collectionCacheIsUpToDate, routingMapCacheIsUpToDate, routingMap)); - } catch (CosmosClientException e) { - return Mono.error(e); + try { + return Mono.just(new Utils.ValueHolder<>(this.handleRangeAddressResolutionFailure(request, collectionCacheIsUpToDate, routingMapCacheIsUpToDate, routingMap))); + } catch (CosmosClientException e) { + return Mono.error(e); + } } - })); + return Mono.just(new Utils.ValueHolder<>(new ResolutionResult(partitionKeyRange, addressesValueHolder.v))); + }); } private PartitionKeyRange tryResolveServerPartitionByPartitionKey( diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java index de8034789f8a2..02f4b9eb8894f 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java @@ -134,7 +134,7 @@ private URL getServiceEndpoint() { } @Override - public Mono tryGetAddresses(RxDocumentServiceRequest request, + public Mono> tryGetAddresses(RxDocumentServiceRequest request, PartitionKeyRangeIdentity partitionKeyRangeIdentity, boolean forceRefreshPartitionAddresses) { @@ -149,7 +149,8 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)) { // if that's master partition return master partition address! - return this.resolveMasterAsync(request, forceRefreshPartitionAddresses, request.properties).map(Pair::getRight); + return this.resolveMasterAsync(request, forceRefreshPartitionAddresses, request.properties) + .map(partitionKeyRangeIdentityPair -> new Utils.ValueHolder<>(partitionKeyRangeIdentityPair.getRight())); } Instant suboptimalServerPartitionTimestamp = this.suboptimalServerPartitionTimestamps.get(partitionKeyRangeIdentity); @@ -199,26 +200,26 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity); } - Mono addressesObs = this.serverPartitionAddressCache.getAsync( + Mono> addressesObs = this.serverPartitionAddressCache.getAsync( partitionKeyRangeIdentity, null, () -> this.getAddressesForRangeId( request, partitionKeyRangeIdentity.getCollectionRid(), partitionKeyRangeIdentity.getPartitionKeyRangeId(), - false)); + false)).map(Utils.ValueHolder::new); return addressesObs.map( - addresses -> { - if (notAllReplicasAvailable(addresses)) { + addressesValueHolder -> { + if (notAllReplicasAvailable(addressesValueHolder.v)) { if (logger.isDebugEnabled()) { - logger.debug("not all replicas available {}", JavaStreamUtils.info(addresses)); + logger.debug("not all replicas available {}", JavaStreamUtils.info(addressesValueHolder.v)); } this.suboptimalServerPartitionTimestamps.putIfAbsent(partitionKeyRangeIdentity, Instant.now()); } - return addresses; + return addressesValueHolder; }).onErrorResume(throwable -> { Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable); CosmosClientException dce = com.azure.data.cosmos.internal.Utils.as(unwrappedException, CosmosClientException.class); @@ -237,7 +238,7 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity); logger.debug("tryGetAddresses: inner onErrorResumeNext return empty", dce); - return Mono.empty(); + return Mono.just(new Utils.ValueHolder<>(null)); } return Mono.error(unwrappedException); } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GlobalAddressResolver.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GlobalAddressResolver.java index 082df288f1186..3540cba969e6e 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GlobalAddressResolver.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GlobalAddressResolver.java @@ -11,12 +11,15 @@ import com.azure.data.cosmos.internal.PartitionKeyRange; import com.azure.data.cosmos.internal.RxDocumentServiceRequest; import com.azure.data.cosmos.internal.UserAgentContainer; +import com.azure.data.cosmos.internal.Utils; import com.azure.data.cosmos.internal.caches.RxCollectionCache; import com.azure.data.cosmos.internal.caches.RxPartitionKeyRangeCache; import com.azure.data.cosmos.internal.http.HttpClient; import com.azure.data.cosmos.internal.routing.CollectionRoutingMap; import com.azure.data.cosmos.internal.routing.PartitionKeyRangeIdentity; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.concurrent.Queues; import java.net.URL; import java.util.ArrayList; @@ -27,8 +30,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -; - public class GlobalAddressResolver implements IAddressResolver { private final static int MaxBackupReadRegions = 3; private final GlobalEndpointManager endpointManager; @@ -78,18 +79,22 @@ public GlobalAddressResolver( } Mono openAsync(DocumentCollection collection) { - Mono routingMap = this.routingMapProvider.tryLookupAsync(collection.id(), null, null); + Mono> routingMap = this.routingMapProvider.tryLookupAsync(collection.id(), null, null); return routingMap.flatMap(collectionRoutingMap -> { - List ranges = ((List)collectionRoutingMap.getOrderedPartitionKeyRanges()).stream().map(range -> + if ( collectionRoutingMap.v == null) { + return Mono.empty(); + } + + List ranges = collectionRoutingMap.v.getOrderedPartitionKeyRanges().stream().map(range -> new PartitionKeyRangeIdentity(collection.resourceId(), range.id())).collect(Collectors.toList()); List> tasks = new ArrayList<>(); + Mono[] array = new Mono[this.addressCacheByEndpoint.values().size()]; for (EndpointCache endpointCache : this.addressCacheByEndpoint.values()) { tasks.add(endpointCache.addressCache.openAsync(collection, ranges)); } - // TODO: Not sure if this will work. - return Mono.whenDelayError(tasks); - }).switchIfEmpty(Mono.defer(Mono::empty)); + return Flux.mergeDelayError(Queues.SMALL_BUFFER_SIZE, tasks.toArray(array)).then(); + }); } @Override diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/IAddressCache.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/IAddressCache.java index 94ee28b5b45ee..b53b92bd46c02 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/IAddressCache.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/IAddressCache.java @@ -4,6 +4,7 @@ package com.azure.data.cosmos.internal.directconnectivity; import com.azure.data.cosmos.internal.RxDocumentServiceRequest; +import com.azure.data.cosmos.internal.Utils; import com.azure.data.cosmos.internal.routing.PartitionKeyRangeIdentity; import reactor.core.publisher.Mono; @@ -19,7 +20,7 @@ public interface IAddressCache { * @param forceRefreshPartitionAddresses Whether addresses need to be refreshed as previously resolved addresses were determined to be outdated. * @return Physical addresses. */ - Mono tryGetAddresses( + Mono> tryGetAddresses( RxDocumentServiceRequest request, PartitionKeyRangeIdentity partitionKeyRangeIdentity, boolean forceRefreshPartitionAddresses); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DefaultDocumentQueryExecutionContext.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DefaultDocumentQueryExecutionContext.java index a8b36aa12ea12..717b9bc77d706 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DefaultDocumentQueryExecutionContext.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DefaultDocumentQueryExecutionContext.java @@ -124,7 +124,7 @@ public Mono> getTargetPartitionKeyRangesById(String reso return client.getPartitionKeyRangeCache().tryGetPartitionKeyRangeByIdAsync(resourceId, partitionKeyRangeIdInternal, false, - null).flatMap(partitionKeyRange -> Mono.just(Collections.singletonList(partitionKeyRange))); + null).flatMap(partitionKeyRange -> Mono.just(Collections.singletonList(partitionKeyRange.v))); } protected Function>> executeInternalAsyncFunc() { diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentProducer.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentProducer.java index 8a43928e6b712..da5214c37dfdf 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentProducer.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentProducer.java @@ -186,22 +186,22 @@ private Flux splitProof(Flux> replacementRangesObs = getReplacementRanges(targetRange.toRange()); + Mono>> replacementRangesObs = getReplacementRanges(targetRange.toRange()); // Since new DocumentProducers are instantiated for the new replacement ranges, if for the new // replacement partitions split happens the corresponding DocumentProducer can recursively handle splits. // so this is resilient to split on splits. Flux> replacementProducers = replacementRangesObs.flux().flatMap( - partitionKeyRanges -> { + partitionKeyRangesValueHolder -> { if (logger.isDebugEnabled()) { logger.info("Cross Partition Query Execution detected partition [{}] split into [{}] partitions," + " last continuation token is [{}].", targetRange.toJson(), - partitionKeyRanges.stream() + partitionKeyRangesValueHolder.v.stream() .map(JsonSerializable::toJson).collect(Collectors.joining(", ")), lastResponseContinuationToken); } - return Flux.fromIterable(createReplacingDocumentProducersOnSplit(partitionKeyRanges)); + return Flux.fromIterable(createReplacingDocumentProducersOnSplit(partitionKeyRangesValueHolder.v)); }); return produceOnSplit(replacementProducers); @@ -241,7 +241,7 @@ protected DocumentProducer createChildDocumentProducerOnSplit( top); } - private Mono> getReplacementRanges(Range range) { + private Mono>> getReplacementRanges(Range range) { return client.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(collectionRid, range, true, feedOptions.properties()); } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentQueryExecutionContextFactory.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentQueryExecutionContextFactory.java index 47c390f42f530..4069959d3784c 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentQueryExecutionContextFactory.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/DocumentQueryExecutionContextFactory.java @@ -35,7 +35,7 @@ public class DocumentQueryExecutionContextFactory { private final static int PageSizeFactorForTop = 5; - private static Mono resolveCollection(IDocumentQueryClient client, SqlQuerySpec query, + private static Mono> resolveCollection(IDocumentQueryClient client, SqlQuerySpec query, ResourceType resourceTypeEnum, String resourceLink) { RxCollectionCache collectionCache = client.getCollectionCache(); @@ -60,7 +60,7 @@ public static Flux collectionObs = Flux.empty(); + Flux> collectionObs = Flux.just(new Utils.ValueHolder<>(null)); if (resourceTypeEnum.isCollectionChild()) { collectionObs = resolveCollection(client, query, resourceTypeEnum, resourceLink).flux(); @@ -83,7 +83,7 @@ public static Flux queryExecutionInfoMono = com.azure.data.cosmos.internal.query.QueryPlanRetriever.getQueryPlanThroughGatewayAsync(client, query, resourceLink); - return collectionObs.single().flatMap(collection -> + return collectionObs.single().flatMap(collectionValueHolder -> queryExecutionInfoMono.flatMap(partitionedQueryExecutionInfo -> { QueryInfo queryInfo = partitionedQueryExecutionInfo.getQueryInfo(); @@ -110,7 +110,7 @@ public static Flux> queryRanges = @@ -122,11 +122,11 @@ public static Flux range = Range.getPointRange(internalPartitionKey .getEffectivePartitionKeyString(internalPartitionKey, - collection.getPartitionKey())); + collectionValueHolder.v.getPartitionKey())); queryRanges = Collections.singletonList(range); } partitionKeyRanges = queryExecutionContext - .getTargetPartitionKeyRanges(collection.resourceId(), queryRanges); + .getTargetPartitionKeyRanges(collectionValueHolder.v.resourceId(), queryRanges); } return partitionKeyRanges .flatMap(pkranges -> createSpecializedDocumentQueryExecutionContextAsync(client, @@ -138,7 +138,7 @@ public static Flux> getOverlappingRanges(IRoutingMapProv } return routingMapProvider.tryGetOverlappingRangesAsync(resourceId, queryRange, false, null) - .map(targetRanges::addAll) - .flatMap(aBoolean -> { - if (!targetRanges.isEmpty()) { - Range lastKnownTargetRange = targetRanges.get(targetRanges.size() - 1).toRange(); - while (iterator.hasNext()) { - Range value = iterator.next(); - if (MAX_COMPARATOR.compare(value, lastKnownTargetRange) > 0) { - // Since we already moved forward on iterator to check above condition, we - // go to previous when it fails so the the value is not skipped on iteration - iterator.previous(); - break; - } - } - } - return Mono.just(targetRanges); - }).flux(); + .map(ranges -> ranges.v) + .map(targetRanges::addAll) + .flatMap(aBoolean -> { + if (!targetRanges.isEmpty()) { + Range lastKnownTargetRange = targetRanges.get(targetRanges.size() - 1).toRange(); + while (iterator.hasNext()) { + Range value = iterator.next(); + if (MAX_COMPARATOR.compare(value, lastKnownTargetRange) > 0) { + // Since we already moved forward on iterator to check above condition, we + // go to previous when it fails so the the value is not skipped on iteration + iterator.previous(); + break; + } + } + } + return Mono.just(targetRanges); + }).flux(); }).repeat(sortedRanges.size()) .takeUntil(stringRange -> !iterator.hasNext()) .last() diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RenameCollectionAwareClientRetryPolicyTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RenameCollectionAwareClientRetryPolicyTest.java index 08de0deb6f12d..3c8d70bb3eaa7 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RenameCollectionAwareClientRetryPolicyTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RenameCollectionAwareClientRetryPolicyTest.java @@ -98,7 +98,7 @@ public void shouldRetryWithNotFoundStatusCodeAndReadSessionNotAvailableSubStatus DocumentCollection documentCollection = new DocumentCollection(); documentCollection.resourceId("rid_1"); - Mockito.when(rxClientCollectionCache.resolveCollectionAsync(request)).thenReturn(Mono.just(documentCollection)); + Mockito.when(rxClientCollectionCache.resolveCollectionAsync(request)).thenReturn(Mono.just(new Utils.ValueHolder<>(documentCollection))); Mono singleShouldRetry = renameCollectionAwareClientRetryPolicy .shouldRetry(notFoundException); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/AddressResolverTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/AddressResolverTest.java index f63ccc9c78205..70e6c0b083f0e 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/AddressResolverTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/AddressResolverTest.java @@ -16,6 +16,7 @@ import com.azure.data.cosmos.internal.OperationType; import com.azure.data.cosmos.internal.ResourceType; import com.azure.data.cosmos.internal.RxDocumentServiceRequest; +import com.azure.data.cosmos.internal.Utils; import com.azure.data.cosmos.internal.caches.RxCollectionCache; import com.azure.data.cosmos.internal.routing.CollectionRoutingMap; import com.azure.data.cosmos.internal.routing.IServerIdentity; @@ -319,7 +320,7 @@ private void initializeMocks( currentCollection.setValue(collectionAfterRefresh); AddressResolverTest.this.collectionCacheRefreshedCount++; request.forceNameCacheRefresh = false; - return Mono.just(currentCollection.getValue()); + return Mono.just(new Utils.ValueHolder<>(currentCollection.getValue())); } if (request.forceNameCacheRefresh && collectionAfterRefresh == null) { @@ -335,10 +336,10 @@ private void initializeMocks( } if (!request.forceNameCacheRefresh && currentCollection.getValue() != null) { - return Mono.just(currentCollection.getValue()); + return Mono.just(new Utils.ValueHolder<>(currentCollection.getValue())); } - return Mono.empty(); + return new Utils.ValueHolder<>(null); }).when(this.collectionCache).resolveCollectionAsync(Mockito.any(RxDocumentServiceRequest.class)); // Routing map cache @@ -359,7 +360,7 @@ private void initializeMocks( CollectionRoutingMap previousValue = invocationOnMock.getArgumentAt(1, CollectionRoutingMap.class); if (previousValue == null) { - return Mono.justOrEmpty(currentRoutingMap.get(collectionRid)); + return Mono.just(new Utils.ValueHolder<>(currentRoutingMap.get(collectionRid))); } if (previousValue != null && currentRoutingMap.containsKey(previousValue.getCollectionUniqueId()) && @@ -383,7 +384,7 @@ private void initializeMocks( } - return Mono.justOrEmpty(currentRoutingMap.get(collectionRid)); + return Mono.just(new Utils.ValueHolder<>(currentRoutingMap.get(collectionRid))); } return Mono.error(new NotImplementedException("not mocked")); @@ -403,7 +404,7 @@ private void initializeMocks( Boolean forceRefresh = invocationOnMock.getArgumentAt(2, Boolean.class); if (!forceRefresh) { - return Mono.justOrEmpty(currentAddresses.get(findMatchingServiceIdentity(currentAddresses, pkri))); + return Mono.just(new Utils.ValueHolder<>(currentAddresses.get(findMatchingServiceIdentity(currentAddresses, pkri)))); } else { ServiceIdentity si; @@ -427,7 +428,7 @@ private void initializeMocks( } // TODO: what to return in this case if it is null!! - return Mono.justOrEmpty(currentAddresses.get(si)); + return Mono.just(new Utils.ValueHolder<>(currentAddresses.get(si))); } }).when(fabricAddressCache).tryGetAddresses(Mockito.any(RxDocumentServiceRequest.class), Mockito.any(PartitionKeyRangeIdentity.class), Mockito.anyBoolean()); } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCacheTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCacheTest.java index da3991318195e..e32fe7ba1420e 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCacheTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCacheTest.java @@ -169,9 +169,10 @@ public void tryGetAddresses_ForDataPartitions(String partitionKeyRangeId, String PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(collectionRid, partitionKeyRangeId); boolean forceRefreshPartitionAddresses = false; - Mono addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses); + Mono> addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses); - ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT)); + ArrayList addressInfosFromCache = + Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT).v); Mono> masterAddressFromGatewayObs = cache.getServerAddressesViaGatewayAsync(req, collectionRid, ImmutableList.of(partitionKeyRangeId), false); @@ -225,8 +226,8 @@ public void tryGetAddresses_ForDataPartitions_AddressCachedByOpenAsync_NoHttpReq PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(collectionRid, partitionKeyRangeId); boolean forceRefreshPartitionAddresses = false; - Mono addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses); - ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT)); + Mono> addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses); + ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT).v); // no new request is made assertThat(httpClientWrapper.capturedRequests) @@ -279,8 +280,8 @@ public void tryGetAddresses_ForDataPartitions_ForceRefresh( new Database(), new HashMap<>()); PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(collectionRid, partitionKeyRangeId); - Mono addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, true); - ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT)); + Mono> addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, true); + ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT).v); // no new request is made assertThat(httpClientWrapper.capturedRequests) @@ -336,8 +337,8 @@ public void tryGetAddresses_ForDataPartitions_Suboptimal_Refresh( new Database(), new HashMap<>()); PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(collectionRid, partitionKeyRangeId); - Mono addressesInfosFromCacheObs = origCache.tryGetAddresses(req, partitionKeyRangeIdentity, true); - ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT)); + Mono> addressesInfosFromCacheObs = origCache.tryGetAddresses(req, partitionKeyRangeIdentity, true); + ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT).v); // no new request is made assertThat(httpClientWrapper.capturedRequests) @@ -380,15 +381,15 @@ public Mono> answer(InvocationOnMock invocationOnMock) throws Thro // force refresh to replace existing with sub-optimal addresses addressesInfosFromCacheObs = spyCache.tryGetAddresses(req, partitionKeyRangeIdentity, true); - AddressInformation[] suboptimalAddresses = getSuccessResult(addressesInfosFromCacheObs, TIMEOUT); + Utils.ValueHolder suboptimalAddresses = getSuccessResult(addressesInfosFromCacheObs, TIMEOUT); assertThat(httpClientWrapper.capturedRequests) .describedAs("getServerAddressesViaGatewayAsync will read addresses from gateway") .asList().hasSize(1); httpClientWrapper.capturedRequests.clear(); // relaxes one replica being down - assertThat(suboptimalAddresses.length).isLessThanOrEqualTo((ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1)); - assertThat(suboptimalAddresses.length).isGreaterThanOrEqualTo(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 2); + assertThat(suboptimalAddresses.v.length).isLessThanOrEqualTo((ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1)); + assertThat(suboptimalAddresses.v.length).isGreaterThanOrEqualTo(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 2); assertThat(fetchCounter.get()).isEqualTo(1); // no refresh, use cache @@ -397,15 +398,15 @@ public Mono> answer(InvocationOnMock invocationOnMock) throws Thro assertThat(httpClientWrapper.capturedRequests) .describedAs("getServerAddressesViaGatewayAsync will read addresses from gateway") .asList().hasSize(0); - assertThat(suboptimalAddresses).hasSize(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1); + assertThat(suboptimalAddresses.v).hasSize(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1); assertThat(fetchCounter.get()).isEqualTo(1); // wait for refresh time TimeUnit.SECONDS.sleep(suboptimalRefreshTime + 1); addressesInfosFromCacheObs = spyCache.tryGetAddresses(req, partitionKeyRangeIdentity, false); - AddressInformation[] addresses = getSuccessResult(addressesInfosFromCacheObs, TIMEOUT); - assertThat(addresses).hasSize(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize); + Utils.ValueHolder addresses = getSuccessResult(addressesInfosFromCacheObs, TIMEOUT); + assertThat(addresses.v).hasSize(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize); assertThat(httpClientWrapper.capturedRequests) .describedAs("getServerAddressesViaGatewayAsync will read addresses from gateway") .asList().hasSize(1); @@ -431,9 +432,9 @@ public void tryGetAddresses_ForMasterPartition(Protocol protocol) throws Excepti PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity("M"); boolean forceRefreshPartitionAddresses = false; - Mono addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses); + Mono> addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses); - ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT)); + ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT).v); Mono> masterAddressFromGatewayObs = cache.getMasterAddressesViaGatewayAsync(req, ResourceType.Database, null, "/dbs/", false, false, null); @@ -481,8 +482,7 @@ public void tryGetAddresses_ForMasterPartition_MasterPartitionAddressAlreadyCach // request master partition info to ensure it is cached. AddressInformation[] expectedAddresses = cache.tryGetAddresses(req, partitionKeyRangeIdentity, - forceRefreshPartitionAddresses) - .block(); + forceRefreshPartitionAddresses).block().v; assertThat(clientWrapper.capturedRequests).asList().hasSize(1); clientWrapper.capturedRequests.clear(); @@ -490,11 +490,11 @@ public void tryGetAddresses_ForMasterPartition_MasterPartitionAddressAlreadyCach TimeUnit.SECONDS.sleep(waitTimeInBetweenAttemptsInSeconds); - Mono addressesObs = cache.tryGetAddresses(req, + Mono> addressesObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses); - AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT); + AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT).v; assertExactlyEqual(actualAddresses, expectedAddresses); @@ -528,17 +528,16 @@ public void tryGetAddresses_ForMasterPartition_ForceRefresh() throws Exception { // request master partition info to ensure it is cached. AddressInformation[] expectedAddresses = cache.tryGetAddresses(req, partitionKeyRangeIdentity, - false) - .block(); + false).block().v; assertThat(clientWrapper.capturedRequests).asList().hasSize(1); clientWrapper.capturedRequests.clear(); - Mono addressesObs = cache.tryGetAddresses(req, + Mono> addressesObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, true); - AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT); + AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT).v; assertExactlyEqual(actualAddresses, expectedAddresses); @@ -622,16 +621,16 @@ public Mono> answer(InvocationOnMock invocationOnMock) throws Thro AddressInformation[] expectedAddresses = spyCache.tryGetAddresses(req, partitionKeyRangeIdentity, false) - .block(); + .block().v; assertThat(clientWrapper.capturedRequests).asList().hasSize(1); clientWrapper.capturedRequests.clear(); - Mono addressesObs = spyCache.tryGetAddresses(req, + Mono> addressesObs = spyCache.tryGetAddresses(req, partitionKeyRangeIdentity, false); - AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT); + AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT).v; assertExactlyEqual(actualAddresses, expectedAddresses); @@ -713,7 +712,7 @@ public Mono> answer(InvocationOnMock invocationOnMock) throws Thro AddressInformation[] subOptimalAddresses = spyCache.tryGetAddresses(req, partitionKeyRangeIdentity, false) - .block(); + .block().v; assertThat(getMasterAddressesViaGatewayAsyncInvocation.get()).isEqualTo(1); assertThat(subOptimalAddresses).hasSize(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1); @@ -726,12 +725,12 @@ public Mono> answer(InvocationOnMock invocationOnMock) throws Thro assertThat(clientWrapper.capturedRequests).asList().hasSize(1); clientWrapper.capturedRequests.clear(); - Mono addressesObs = spyCache.tryGetAddresses(req, + Mono> addressesObs = spyCache.tryGetAddresses(req, partitionKeyRangeIdentity, false); - AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT); + AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT).v; // the cache address is used. no new http request is sent assertThat(clientWrapper.capturedRequests).asList().hasSize(1); assertThat(getMasterAddressesViaGatewayAsyncInvocation.get()).isEqualTo(2); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GlobalAddressResolverTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GlobalAddressResolverTest.java index ba4e45108abd3..7dc162a8366c2 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GlobalAddressResolverTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GlobalAddressResolverTest.java @@ -13,6 +13,7 @@ import com.azure.data.cosmos.internal.ResourceType; import com.azure.data.cosmos.internal.RxDocumentServiceRequest; import com.azure.data.cosmos.internal.UserAgentContainer; +import com.azure.data.cosmos.internal.Utils; import com.azure.data.cosmos.internal.caches.RxCollectionCache; import com.azure.data.cosmos.internal.caches.RxPartitionKeyRangeCache; import com.azure.data.cosmos.internal.http.HttpClient; @@ -91,7 +92,7 @@ public void setup() throws Exception { DocumentCollection collectionDefinition = new DocumentCollection(); collectionDefinition.id(UUID.randomUUID().toString()); collectionCache = Mockito.mock(RxCollectionCache.class); - Mockito.when(collectionCache.resolveCollectionAsync(Matchers.any(RxDocumentServiceRequest.class))).thenReturn(Mono.just(collectionDefinition)); + Mockito.when(collectionCache.resolveCollectionAsync(Matchers.any(RxDocumentServiceRequest.class))).thenReturn(Mono.just(new Utils.ValueHolder<>(collectionDefinition))); routingMapProvider = Mockito.mock(RxPartitionKeyRangeCache.class); userAgentContainer = Mockito.mock(UserAgentContainer.class); serviceConfigReader = Mockito.mock(GatewayServiceConfigurationReader.class); @@ -148,7 +149,7 @@ public void openAsync() throws Exception { List partitionKeyRanges = new ArrayList<>(); partitionKeyRanges.add(range); Mockito.when(collectionRoutingMap.getOrderedPartitionKeyRanges()).thenReturn(partitionKeyRanges); - Mono collectionRoutingMapSingle = Mono.just(collectionRoutingMap); + Mono> collectionRoutingMapSingle = Mono.just(new Utils.ValueHolder<>(collectionRoutingMap)); Mockito.when(routingMapProvider.tryLookupAsync(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(collectionRoutingMapSingle); List ranges = new ArrayList<>(); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/query/DocumentProducerTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/query/DocumentProducerTest.java index 0a2d50d0e1f9a..eac09af335117 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/query/DocumentProducerTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/query/DocumentProducerTest.java @@ -14,6 +14,7 @@ import com.azure.data.cosmos.internal.IRetryPolicyFactory; import com.azure.data.cosmos.internal.RetryPolicy; import com.azure.data.cosmos.internal.RxDocumentServiceRequest; +import com.azure.data.cosmos.internal.Utils; import com.azure.data.cosmos.internal.caches.RxPartitionKeyRangeCache; import com.azure.data.cosmos.internal.query.orderbyquery.OrderByRowResult; import com.azure.data.cosmos.internal.query.orderbyquery.OrderbyRowComparer; @@ -518,7 +519,7 @@ private IDocumentQueryClient mockQueryClient(List replacement IDocumentQueryClient client = Mockito.mock(IDocumentQueryClient.class); RxPartitionKeyRangeCache cache = Mockito.mock(RxPartitionKeyRangeCache.class); doReturn(cache).when(client).getPartitionKeyRangeCache(); - doReturn(Mono.just(replacementRanges)).when(cache). + doReturn(Mono.just(new Utils.ValueHolder<>(replacementRanges))).when(cache). tryGetOverlappingRangesAsync(anyString(), any(Range.class), anyBoolean(), Matchers.anyMap()); return client; } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/routing/RoutingMapProviderHelperTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/routing/RoutingMapProviderHelperTest.java index 4f9a810a26122..d3207a462ce14 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/routing/RoutingMapProviderHelperTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/routing/RoutingMapProviderHelperTest.java @@ -5,6 +5,7 @@ import com.azure.data.cosmos.internal.IRoutingMapProvider; import com.azure.data.cosmos.internal.PartitionKeyRange; +import com.azure.data.cosmos.internal.Utils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.mockito.Matchers; @@ -164,7 +165,7 @@ public String apply(PartitionKeyRange range) { Mockito.doAnswer(invocationOnMock -> { Range range = invocationOnMock.getArgumentAt(1, Range.class); - return Mono.just(resultMap.get(range)); + return Mono.just(new Utils.ValueHolder<>(resultMap.get(range))); }).when(routingMapProviderMock).tryGetOverlappingRangesAsync(Matchers.anyString(), Matchers.any(), Matchers.anyBoolean(),