From 2dab32d85575e3f3358a02f7c2a7c677a70f81cf Mon Sep 17 00:00:00 2001 From: Annie Liang <64233642+xinlian12@users.noreply.github.com> Date: Fri, 15 Apr 2022 09:29:42 -0700 Subject: [PATCH] AllowRequestContinueForThroughputControlInitError (#27702) * allow request to continue on init error Co-authored-by: annie-mac Co-authored-by: Fabian Meiswinkel Co-authored-by: annie-mac --- .../spark/ThroughputControlHelper.scala | 8 +- sdk/cosmos/azure-cosmos/CHANGELOG.md | 1 + .../cosmos/ThroughputControlGroupConfig.java | 22 ++- .../ThroughputControlGroupConfigBuilder.java | 86 +++++++++++- ...ainerThroughputControlGroupProperties.java | 79 +++++++++++ .../ThroughputControlStore.java | 76 +++++++---- .../config/GlobalThroughputControlGroup.java | 3 +- .../config/LocalThroughputControlGroup.java | 5 +- .../config/ThroughputControlGroupFactory.java | 4 +- .../ThroughputControlGroupInternal.java | 33 ++++- .../ThroughputContainerController.java | 8 +- .../group/ThroughputGroupControllerBase.java | 8 +- .../ThroughputControlContainerManager.java | 15 ++- ...oughputControlInitializationException.java | 29 ++++ .../main/java/com/azure/cosmos/util/Beta.java | 4 +- .../cosmos/ThroughputControlCodeSnippet.java | 8 +- .../com/azure/cosmos/CosmosBulkAsyncTest.java | 6 +- ...ThroughputControlGroupPropertiesTests.java | 96 +++++++++++++ .../throughputControl/TestItem.java | 6 + ...tControlGroupConfigConfigurationTests.java | 12 +- .../ThroughputControlTests.java | 127 ++++++++++++++++-- 21 files changed, 561 insertions(+), 75 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ContainerThroughputControlGroupProperties.java create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/exceptions/ThroughputControlInitializationException.java create mode 100644 sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ContainerThroughputControlGroupPropertiesTests.java diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ThroughputControlHelper.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ThroughputControlHelper.scala index 6a54de9c20bf0..7a3255907569b 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ThroughputControlHelper.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ThroughputControlHelper.scala @@ -17,14 +17,14 @@ private object ThroughputControlHelper { val throughputControlConfig = cosmosThroughputControlConfig.get val groupConfigBuilder = new ThroughputControlGroupConfigBuilder() - .setGroupName(throughputControlConfig.groupName) - .setDefault(true) + .groupName(throughputControlConfig.groupName) + .defaultControlGroup(true) if (throughputControlConfig.targetThroughput.isDefined) { - groupConfigBuilder.setTargetThroughput(throughputControlConfig.targetThroughput.get) + groupConfigBuilder.targetThroughput(throughputControlConfig.targetThroughput.get) } if (throughputControlConfig.targetThroughputThreshold.isDefined) { - groupConfigBuilder.setTargetThroughputThreshold(throughputControlConfig.targetThroughputThreshold.get) + groupConfigBuilder.targetThroughputThreshold(throughputControlConfig.targetThroughputThreshold.get) } val globalThroughputControlConfigBuilder = client.createGlobalThroughputControlConfigBuilder( diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 7357eb7b7d1df..faa7a7ce4b5f1 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.29.0-beta.1 (Unreleased) #### Features Added +* Added Beta API `continueOnInitError` in `ThroughputControlGroupConfigBuilder` - See [PR 27702](https://github.com/Azure/azure-sdk-for-java/pull/27702) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ThroughputControlGroupConfig.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ThroughputControlGroupConfig.java index bef6e91b55542..6c42b9e59f8e7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ThroughputControlGroupConfig.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ThroughputControlGroupConfig.java @@ -14,12 +14,19 @@ public final class ThroughputControlGroupConfig { private final Integer targetThroughput; private final Double targetThroughputThreshold; private final boolean isDefault; + private final boolean continueOnInitError; - ThroughputControlGroupConfig(String groupName, Integer targetThroughput, Double targetThroughputThreshold, boolean isDefault) { + ThroughputControlGroupConfig( + String groupName, + Integer targetThroughput, + Double targetThroughputThreshold, + boolean isDefault, + boolean continueOnInitError) { this.groupName= groupName; this.targetThroughput = targetThroughput; this.targetThroughputThreshold = targetThroughputThreshold; this.isDefault = isDefault; + this.continueOnInitError = continueOnInitError; } /** @@ -70,4 +77,17 @@ public Double getTargetThroughputThreshold() { public boolean isDefault() { return this.isDefault; } + + /** + * Get whether request is allowed to continue on original request flow if throughput control controller failed on initialization. + * + * By default, it is false. + * If it is true, requests will continue on original request flow if throughput control controller failed on initialization. + * + * @return {@code true} request will continue on original request flow if throughput control controller failed on initialization. {@code false} otherwise. + */ + @Beta(value = Beta.SinceVersion.V4_28_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING) + public boolean isContinueOnInitError() { + return continueOnInitError; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ThroughputControlGroupConfigBuilder.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ThroughputControlGroupConfigBuilder.java index 51870e5be734d..f4b5d69823a90 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ThroughputControlGroupConfigBuilder.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ThroughputControlGroupConfigBuilder.java @@ -13,10 +13,12 @@ */ @Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING) public class ThroughputControlGroupConfigBuilder { + private static final boolean DEFAULT_CONTINUE_ON_INIT_ERROR = false; private String groupName; private Integer targetThroughput; private Double targetThroughputThreshold; private boolean isDefault; + private boolean continueOnInitError = DEFAULT_CONTINUE_ON_INIT_ERROR; /** * Set the throughput control group name. @@ -24,6 +26,7 @@ public class ThroughputControlGroupConfigBuilder { * @param groupName The throughput control group name. * @return The {@link ThroughputControlGroupConfigBuilder}. */ + @Deprecated @Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING) public ThroughputControlGroupConfigBuilder setGroupName(String groupName) { checkArgument(StringUtils.isNotEmpty(groupName), "Group name cannot be null nor empty"); @@ -32,6 +35,20 @@ public ThroughputControlGroupConfigBuilder setGroupName(String groupName) { return this; } + /** + * Set the throughput control group name. + * + * @param groupName The throughput control group name. + * @return The {@link ThroughputControlGroupConfigBuilder}. + */ + @Beta(value = Beta.SinceVersion.V4_28_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING) + public ThroughputControlGroupConfigBuilder groupName(String groupName) { + checkArgument(StringUtils.isNotEmpty(groupName), "Group name cannot be null nor empty"); + + this.groupName = groupName; + return this; + } + /** * Set the throughput control group target throughput. * @@ -40,6 +57,7 @@ public ThroughputControlGroupConfigBuilder setGroupName(String groupName) { * @param targetThroughput The target throughput for the control group. * @return The {@link ThroughputControlGroupConfigBuilder}. */ + @Deprecated @Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING) public ThroughputControlGroupConfigBuilder setTargetThroughput(int targetThroughput) { checkArgument(targetThroughput > 0, "Target throughput should be greater than 0"); @@ -48,6 +66,22 @@ public ThroughputControlGroupConfigBuilder setTargetThroughput(int targetThrough return this; } + /** + * Set the throughput control group target throughput. + * + * The target throughput value should be greater than 0. + * + * @param targetThroughput The target throughput for the control group. + * @return The {@link ThroughputControlGroupConfigBuilder}. + */ + @Beta(value = Beta.SinceVersion.V4_28_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING) + public ThroughputControlGroupConfigBuilder targetThroughput(int targetThroughput) { + checkArgument(targetThroughput > 0, "Target throughput should be greater than 0"); + + this.targetThroughput = targetThroughput; + return this; + } + /** * Set the throughput control group target throughput threshold. * @@ -56,6 +90,7 @@ public ThroughputControlGroupConfigBuilder setTargetThroughput(int targetThrough * @param targetThroughputThreshold The target throughput threshold for the control group. * @return The {@link ThroughputControlGroupConfigBuilder}. */ + @Deprecated @Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING) public ThroughputControlGroupConfigBuilder setTargetThroughputThreshold(double targetThroughputThreshold) { checkArgument(targetThroughputThreshold > 0 && targetThroughputThreshold <= 1, "Target throughput threshold should between (0, 1]"); @@ -64,6 +99,21 @@ public ThroughputControlGroupConfigBuilder setTargetThroughputThreshold(double t return this; } + /** + * Set the throughput control group target throughput threshold. + * + * The target throughput threshold value should be between (0, 1]. + * + * @param targetThroughputThreshold The target throughput threshold for the control group. + * @return The {@link ThroughputControlGroupConfigBuilder}. + */ + @Beta(value = Beta.SinceVersion.V4_28_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING) + public ThroughputControlGroupConfigBuilder targetThroughputThreshold(double targetThroughputThreshold) { + checkArgument(targetThroughputThreshold > 0 && targetThroughputThreshold <= 1, "Target throughput threshold should between (0, 1]"); + + this.targetThroughputThreshold = targetThroughputThreshold; + return this; + } /** * Set whether this throughput control group will be used by default. @@ -72,9 +122,36 @@ public ThroughputControlGroupConfigBuilder setTargetThroughputThreshold(double t * @param aDefault The flag to indicate whether the throughput control group will be used by default. * @return The {@link ThroughputControlGroupConfigBuilder}. */ + @Deprecated @Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING) public ThroughputControlGroupConfigBuilder setDefault(boolean aDefault) { - isDefault = aDefault; + this.isDefault = aDefault; + return this; + } + + /** + * Set whether this throughput control group will be used by default. + * If set to true, requests without explicit override of the throughput control group will be routed to this group. + * + * @param aDefault The flag to indicate whether the throughput control group will be used by default. + * @return The {@link ThroughputControlGroupConfigBuilder}. + */ + @Beta(value = Beta.SinceVersion.V4_28_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING) + public ThroughputControlGroupConfigBuilder defaultControlGroup(boolean aDefault) { + this.isDefault = aDefault; + return this; + } + + /** + * Set whether allow request to continue on original request flow if throughput control controller failed on initialization. + * If set to true, requests will be able to fall back to original request flow if throughput control controller failed on initialization. + * + * @param continueOnInitError The flag to indicate whether request is allowed to fall back to original request flow. + * @return The {@link ThroughputControlGroupConfigBuilder}. + */ + @Beta(value = Beta.SinceVersion.V4_28_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING) + public ThroughputControlGroupConfigBuilder continueOnInitError(boolean continueOnInitError) { + this.continueOnInitError = continueOnInitError; return this; } @@ -92,6 +169,11 @@ public ThroughputControlGroupConfig build() { throw new IllegalArgumentException("Neither targetThroughput nor targetThroughputThreshold is defined."); } - return new ThroughputControlGroupConfig(groupName, this.targetThroughput, this.targetThroughputThreshold, isDefault); + return new ThroughputControlGroupConfig( + this.groupName, + this.targetThroughput, + this.targetThroughputThreshold, + this.isDefault, + this.continueOnInitError); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ContainerThroughputControlGroupProperties.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ContainerThroughputControlGroupProperties.java new file mode 100644 index 0000000000000..15ed7339016d7 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ContainerThroughputControlGroupProperties.java @@ -0,0 +1,79 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.throughputControl; + +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.implementation.throughputControl.config.ThroughputControlGroupInternal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +public class ContainerThroughputControlGroupProperties { + private static Logger logger = LoggerFactory.getLogger(ContainerThroughputControlGroupProperties.class); + + private final AtomicReference defaultGroup; + private final Set throughputControlGroupSet; + private final Set supressInitErrorGroupSet; + + public ContainerThroughputControlGroupProperties() { + this.defaultGroup = new AtomicReference<>(); + this.throughputControlGroupSet = ConcurrentHashMap.newKeySet(); + this.supressInitErrorGroupSet = ConcurrentHashMap.newKeySet(); + } + + /*** + * Enable a throughput control group. + * + * @param group a {@link ThroughputControlGroupInternal}. + * + * @return the total size of distinct throughput control groups enabled on the container. + */ + public int enableThroughputControlGroup(ThroughputControlGroupInternal group) { + checkNotNull(group, "Throughput control group should not be null"); + + if (group.isDefault()) { + if (!this.defaultGroup.compareAndSet(null, group)) { + if (!this.defaultGroup.get().equals(group)) { + throw new IllegalArgumentException("A default group already exists"); + } + } + } + + if (group.isContinueOnInitError()) { + this.supressInitErrorGroupSet.add(group.getGroupName()); + } + + // Only throw when two different groups are using the same id (databaseId + containerId + groupName) + if (this.throughputControlGroupSet.stream() + .anyMatch(existingGroup -> Objects.equals(existingGroup.getId(), group.getId()) && !existingGroup.equals(group))) { + throw new IllegalArgumentException("Throughput control group with id " + group.getId() + " already exists"); + } + + this.throughputControlGroupSet.add(group); + + return this.throughputControlGroupSet.size(); + } + + public Set getThroughputControlGroupSet() { + return this.throughputControlGroupSet; + } + + public boolean allowRequestToContinueOnInitError(RxDocumentServiceRequest request) { + checkNotNull(request, "Request should not be null"); + + String requestGroupName = request.getThroughputControlGroupName(); + if (StringUtils.isEmpty(requestGroupName)) { + requestGroupName = this.defaultGroup.get().getGroupName(); + } + + return this.supressInitErrorGroupSet.contains(requestGroupName); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlStore.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlStore.java index f1df8b15749d8..399d215f930bc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlStore.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlStore.java @@ -6,7 +6,6 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConnectionMode; import com.azure.cosmos.CosmosException; -import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.Utils; @@ -19,13 +18,12 @@ import com.azure.cosmos.implementation.throughputControl.controller.container.EmptyThroughputContainerController; import com.azure.cosmos.implementation.throughputControl.controller.container.IThroughputContainerController; import com.azure.cosmos.implementation.throughputControl.controller.container.ThroughputContainerController; +import com.azure.cosmos.implementation.throughputControl.exceptions.ThroughputControlInitializationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Exceptions; import reactor.core.publisher.Mono; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import static com.azure.cosmos.implementation.Exceptions.isNameCacheStale; @@ -81,7 +79,7 @@ public class ThroughputControlStore { private final RxClientCollectionCache collectionCache; private final ConnectionMode connectionMode; private final AsyncCache containerControllerCache; - private final ConcurrentHashMap> groupMapByContainer; + private final ConcurrentHashMap containerMap; private final RxPartitionKeyRangeCache partitionKeyRangeCache; private final LinkedCancellationTokenSource cancellationTokenSource; @@ -98,7 +96,7 @@ public ThroughputControlStore( this.collectionCache = collectionCache; this.connectionMode = connectionMode; this.containerControllerCache = new AsyncCache<>(); - this.groupMapByContainer = new ConcurrentHashMap<>(); + this.containerMap = new ConcurrentHashMap<>(); this.partitionKeyRangeCache = partitionKeyRangeCache; this.cancellationTokenSource = new LinkedCancellationTokenSource(); @@ -109,30 +107,20 @@ public void enableThroughputControlGroup(ThroughputControlGroupInternal group) { checkNotNull(group, "Throughput control group cannot be null"); String containerNameLink = Utils.trimBeginningAndEndingSlashes(BridgeInternal.extractContainerSelfLink(group.getTargetContainer())); - this.groupMapByContainer.compute(containerNameLink, (key, groupSet) -> { - if (groupSet == null) { - groupSet = ConcurrentHashMap.newKeySet(); + this.containerMap.compute(containerNameLink, (key, throughputControlContainerProperties) -> { + if (throughputControlContainerProperties == null) { + throughputControlContainerProperties = new ContainerThroughputControlGroupProperties(); } - if (group.isDefault()) { - if (groupSet.stream().anyMatch( - controlGroup -> controlGroup.isDefault() && !StringUtils.equals(group.getId(), controlGroup.getId()))) { - throw new IllegalArgumentException("A default group already exists"); - } - } - - if (!groupSet.add(group)) { - logger.debug("Can not add duplicate group"); - return groupSet; - } + int groupSize = throughputControlContainerProperties.enableThroughputControlGroup(group); - if (groupSet.size() == 1) { + if (groupSize == 1) { // This is the first enabled group for the target container // Clean the current cache in case we have built EmptyThroughputContainerController. this.containerControllerCache.remove(containerNameLink); } - return groupSet; + return throughputControlContainerProperties; }); } @@ -159,9 +147,39 @@ public Mono processRequest(RxDocumentServiceRequest request, Mono orig // We will handle the first scenario by creating a new container controller, // while fall back to original request Mono for the second scenario. return this.updateControllerAndRetry(collectionNameLink, request, originalRequestMono); + }) + .onErrorResume(throwable -> { + + Exception unwrappedException = Utils.as(Exceptions.unwrap(throwable), Exception.class); + if (unwrappedException instanceof ThroughputControlInitializationException) { + if (this.shouldContinueRequestOnInitError(request, collectionNameLink, unwrappedException)) { + return originalRequestMono; + } + + return Mono.error(unwrappedException.getCause()); + } + + return Mono.error(throwable); }); } + private boolean shouldContinueRequestOnInitError(RxDocumentServiceRequest request, String collectionNameLink, Throwable throwable) { + if (throwable instanceof ThroughputControlInitializationException) { + ContainerThroughputControlGroupProperties throughputControlContainerProperties = this.containerMap.get(collectionNameLink); + + checkNotNull( + throughputControlContainerProperties, + "Throughput control container properties should not be null"); + checkArgument( + throughputControlContainerProperties.getThroughputControlGroupSet().size() > 0, + "There should be more than one throughput control group"); + + return throughputControlContainerProperties.allowRequestToContinueOnInitError(request); + } + + return false; + } + private Mono updateControllerAndRetry( String containerNameLink, RxDocumentServiceRequest request, @@ -204,18 +222,18 @@ private Mono resolveContainerController(String c checkArgument(StringUtils.isNotEmpty(containerNameLink), "Container name link can not be null or empty"); return this.containerControllerCache.getAsync( - containerNameLink, - null, - () -> this.createAndInitContainerController(containerNameLink) - ); + containerNameLink, + null, + () -> this.createAndInitContainerController(containerNameLink)) + .onErrorResume(throwable -> Mono.error(new ThroughputControlInitializationException(throwable))); } private Mono createAndInitContainerController(String containerNameLink) { checkArgument(StringUtils.isNotEmpty(containerNameLink), "Container link should not be null or empty"); - if (this.groupMapByContainer.containsKey(containerNameLink)) { - return Mono.just(this.groupMapByContainer.get(containerNameLink)) - .flatMap(groups -> { + if (this.containerMap.containsKey(containerNameLink)) { + return Mono.just(this.containerMap.get(containerNameLink)) + .flatMap(throughputControlContainerProperties -> { LinkedCancellationToken parentToken = this.cancellationTokenMap.compute( containerNameLink, @@ -225,7 +243,7 @@ private Mono createAndInitContainerController(St new ThroughputContainerController( this.collectionCache, this.connectionMode, - groups, + throughputControlContainerProperties.getThroughputControlGroupSet(), this.partitionKeyRangeCache, parentToken); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/GlobalThroughputControlGroup.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/GlobalThroughputControlGroup.java index d7d19e16ba673..767756d6550b5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/GlobalThroughputControlGroup.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/GlobalThroughputControlGroup.java @@ -22,11 +22,12 @@ public GlobalThroughputControlGroup( Integer targetThroughput, Double targetThroughputThreshold, boolean isDefault, + boolean continueOnInitError, CosmosAsyncContainer globalControlContainer, Duration controlItemRenewInterval, Duration controlItemExpireInterval) { - super (groupName, targetContainer, targetThroughput, targetThroughputThreshold, isDefault); + super (groupName, targetContainer, targetThroughput, targetThroughputThreshold, isDefault, continueOnInitError); checkNotNull(globalControlContainer, "Global control container can not be null"); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/LocalThroughputControlGroup.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/LocalThroughputControlGroup.java index 5290c04ca59c8..fef3e3e9dea51 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/LocalThroughputControlGroup.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/LocalThroughputControlGroup.java @@ -12,7 +12,8 @@ public LocalThroughputControlGroup( CosmosAsyncContainer targetContainer, Integer targetThroughput, Double targetThroughputThreshold, - boolean isDefault) { - super (groupName, targetContainer, targetThroughput, targetThroughputThreshold, isDefault); + boolean isDefault, + boolean continueOnInitError) { + super (groupName, targetContainer, targetThroughput, targetThroughputThreshold, isDefault, continueOnInitError); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/ThroughputControlGroupFactory.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/ThroughputControlGroupFactory.java index fa3379487dfc3..7fb6919df6214 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/ThroughputControlGroupFactory.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/ThroughputControlGroupFactory.java @@ -21,7 +21,8 @@ public static LocalThroughputControlGroup createThroughputLocalControlGroup(Thro targetContainer, groupConfig.getTargetThroughput(), groupConfig.getTargetThroughputThreshold(), - groupConfig.isDefault()); + groupConfig.isDefault(), + groupConfig.isContinueOnInitError()); } public static GlobalThroughputControlGroup createThroughputGlobalControlGroup( @@ -39,6 +40,7 @@ public static GlobalThroughputControlGroup createThroughputGlobalControlGroup( groupConfig.getTargetThroughput(), groupConfig.getTargetThroughputThreshold(), groupConfig.isDefault(), + groupConfig.isContinueOnInitError(), BridgeInternal.getControlContainerFromThroughputGlobalControlConfig(globalControlConfig), globalControlConfig.getControlItemRenewInterval(), globalControlConfig.getControlItemExpireInterval()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/ThroughputControlGroupInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/ThroughputControlGroupInternal.java index f607188472da8..1d0632a5c26d4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/ThroughputControlGroupInternal.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/config/ThroughputControlGroupInternal.java @@ -6,6 +6,8 @@ import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import java.util.Objects; + import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; @@ -13,6 +15,7 @@ public abstract class ThroughputControlGroupInternal { private final String groupName; private final String id; private final boolean isDefault; + private final boolean continueOnInitError; private final CosmosAsyncContainer targetContainer; private final Integer targetThroughput; private final Double targetThroughputThreshold; @@ -22,7 +25,8 @@ public ThroughputControlGroupInternal( CosmosAsyncContainer targetContainer, Integer targetThroughput, Double targetThroughputThreshold, - boolean isDefault) { + boolean isDefault, + boolean continueOnInitError) { checkArgument(StringUtils.isNotEmpty(groupName), "Group name can not be null or empty"); checkNotNull(targetContainer, "Target container can not be null"); @@ -36,6 +40,7 @@ public ThroughputControlGroupInternal( this.targetThroughput = targetThroughput; this.targetThroughputThreshold = targetThroughputThreshold; this.isDefault = isDefault; + this.continueOnInitError = continueOnInitError; this.id = String.format( "%s/%s/%s", @@ -98,6 +103,18 @@ public boolean isDefault() { return this.isDefault; } + /** + * Get whether allow request to continue on original request flow if throughput control failed on initialization. + * + * By default, it is false. + * If it is true, request will be able to continue on original request flow if throughput control failed on initialization. + * + * @return {@code true} request will be allowed to continue on original request flow if throughput control failed on initialization. {@code false} otherwise. + */ + public boolean isContinueOnInitError() { + return continueOnInitError; + } + public String getId() { return this.id; } @@ -114,11 +131,21 @@ public boolean equals(Object other) { ThroughputControlGroupInternal that = (ThroughputControlGroupInternal) other; - return StringUtils.equals(this.id, that.id); + return Objects.equals(this.id, that.id) + && this.isDefault == that.isDefault + && this.continueOnInitError == that.continueOnInitError + && Objects.equals(this.targetThroughput, that.targetThroughput) + && Objects.equals(this.targetThroughputThreshold, that.targetThroughputThreshold); } @Override public int hashCode() { - return this.id.hashCode(); + int hash = 0; + hash = (hash * 397) ^ this.id.hashCode(); + hash = (hash * 397) ^ Boolean.hashCode(this.isDefault); + hash = (hash * 397) ^ Boolean.hashCode(this.continueOnInitError); + hash = (hash * 397) ^ (this.targetThroughput == null ? 0 : Integer.hashCode(this.targetThroughput)); + hash = (hash * 397) ^ (this.targetThroughputThreshold == null ? 0 : Double.hashCode(this.targetThroughputThreshold)); + return hash; } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/container/ThroughputContainerController.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/container/ThroughputContainerController.java index e7a6d2f2b3266..45c740862bd0b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/container/ThroughputContainerController.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/container/ThroughputContainerController.java @@ -22,6 +22,7 @@ import com.azure.cosmos.implementation.throughputControl.config.ThroughputControlGroupInternal; import com.azure.cosmos.implementation.throughputControl.controller.group.ThroughputGroupControllerBase; import com.azure.cosmos.implementation.throughputControl.controller.group.ThroughputGroupControllerFactory; +import com.azure.cosmos.implementation.throughputControl.exceptions.ThroughputControlInitializationException; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.ThroughputProperties; @@ -316,9 +317,10 @@ private Mono createAndInitializeGroupControllers( private Mono resolveThroughputGroupController(ThroughputControlGroupInternal group) { return this.groupControllerCache.getAsync( - group.getGroupName(), - null, - () -> this.createAndInitializeGroupController(group)); + group.getGroupName(), + null, + () -> this.createAndInitializeGroupController(group)) + .onErrorResume(throwable -> Mono.error(new ThroughputControlInitializationException(throwable))); } private Mono createAndInitializeGroupController(ThroughputControlGroupInternal group) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/ThroughputGroupControllerBase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/ThroughputGroupControllerBase.java index 25055e5361c75..e1d980a076f17 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/ThroughputGroupControllerBase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/ThroughputGroupControllerBase.java @@ -18,6 +18,7 @@ import com.azure.cosmos.implementation.throughputControl.controller.request.GlobalThroughputRequestController; import com.azure.cosmos.implementation.throughputControl.controller.request.IThroughputRequestController; import com.azure.cosmos.implementation.throughputControl.controller.request.PkRangesThroughputRequestController; +import com.azure.cosmos.implementation.throughputControl.exceptions.ThroughputControlInitializationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Exceptions; @@ -217,9 +218,10 @@ private Mono shouldUpdateRequestController(RxDocumentServiceRequest req protected Mono resolveRequestController() { return this.requestControllerAsyncCache.getAsync( - this.group.getGroupName(), - null, - () -> this.createAndInitializeRequestController()); + this.group.getGroupName(), + null, + () -> this.createAndInitializeRequestController()) + .onErrorResume(throwable -> Mono.error(new ThroughputControlInitializationException(throwable))); } private void refreshRequestController() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/global/ThroughputControlContainerManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/global/ThroughputControlContainerManager.java index e226abaa97b5a..795548b5fc868 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/global/ThroughputControlContainerManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/global/ThroughputControlContainerManager.java @@ -15,6 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Exceptions; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.RetrySpec; @@ -139,15 +140,21 @@ public Mono getOrCreateConfigItem() { public Mono queryLoadFactorsOfAllClients(double clientLoadFactor) { // The current design is using ttl to expire client items, so there is no need to check whether the client item is expired. - String sqlQueryTest = "SELECT VALUE SUM(c.loadFactor) FROM c WHERE c.groupId = @GROUPID AND c.id != @CLIENTITEMID"; + // You probably have the question about why not use the following query: + // "SELECT VALUE SUM(c.loadFactor) FROM c WHERE c.groupId = @GROUPID AND c.id != @CLIENTITEMID" + // The reason being that we might get some inconsistent results back for the query above, using the following query will make sure + // we always get the items within ttl. + String sqlQueryTest = "SELECT * FROM c WHERE c.groupId = @GROUPID AND c.id != @CLIENTITEMID"; List parameters = new ArrayList<>(); parameters.add(new SqlParameter("@GROUPID", this.clientItemPartitionKeyValue)); parameters.add(new SqlParameter("@CLIENTITEMID", this.clientItemId)); SqlQuerySpec querySpec = new SqlQuerySpec(sqlQueryTest, parameters); - return this.globalControlContainer.queryItems(querySpec, Double.class) - .single() - .map(result -> result + clientLoadFactor); + return this.globalControlContainer.queryItems(querySpec, GlobalThroughputControlClientItem.class) + .collectList() + .flatMapMany(clientItemList -> Flux.fromIterable(clientItemList)) + .map(clientItem -> clientItem.getLoadFactor()) + .reduce(clientLoadFactor, (loadFactor1, loadFactor2) -> loadFactor1 + loadFactor2); } /** diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/exceptions/ThroughputControlInitializationException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/exceptions/ThroughputControlInitializationException.java new file mode 100644 index 0000000000000..ece6c75d1c796 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/exceptions/ThroughputControlInitializationException.java @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.throughputControl.exceptions; + +import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +/** + * This exception is mainly used to indicate the exception happens during throughput control initialization. + * This exception can be used to determine whether the request will continue to original request flow if fallBackOnInitError is true. + * + * This exception wraps the true underlying exception + * if request should fail, then {@link ThroughputControlStore} will be responsible for throw the underlying exception to upcaller. + */ +public class ThroughputControlInitializationException extends RuntimeException { + private final Throwable cause; + + public ThroughputControlInitializationException(Throwable throwable) { + checkNotNull(throwable, "Throwable can not be null"); + this.cause = throwable; + } + + @Override + public Throwable getCause() { + return cause; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java index 0a1aad1a35d77..2cd23cc72cf9d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java @@ -89,6 +89,8 @@ public enum SinceVersion { /** v4.26.0 */ V4_26_0, /** v4.27.0 */ - V4_27_0 + V4_27_0, + /** v4.28.0 */ + V4_28_0 } } diff --git a/sdk/cosmos/azure-cosmos/src/samples/java/com/azure/cosmos/ThroughputControlCodeSnippet.java b/sdk/cosmos/azure-cosmos/src/samples/java/com/azure/cosmos/ThroughputControlCodeSnippet.java index 2f77edeefa0e1..51d482c8efe98 100644 --- a/sdk/cosmos/azure-cosmos/src/samples/java/com/azure/cosmos/ThroughputControlCodeSnippet.java +++ b/sdk/cosmos/azure-cosmos/src/samples/java/com/azure/cosmos/ThroughputControlCodeSnippet.java @@ -28,8 +28,8 @@ public void codeSnippetForEnableLocalThroughputControl() { // BEGIN: com.azure.cosmos.throughputControl.localControl ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() - .setGroupName("localControlGroup") - .setTargetThroughputThreshold(0.1) + .groupName("localControlGroup") + .targetThroughputThreshold(0.1) .build(); container.enableLocalThroughputControlGroup(groupConfig); @@ -40,8 +40,8 @@ public void codeSnippetForEnableGlobalThroughputControl() { // BEGIN: com.azure.cosmos.throughputControl.globalControl ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() - .setGroupName("localControlGroup") - .setTargetThroughputThreshold(0.1) + .groupName("localControlGroup") + .targetThroughputThreshold(0.1) .build(); GlobalThroughputControlConfig globalControlConfig = diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java index a36e6ca12cb54..d544da08f9577 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosBulkAsyncTest.java @@ -71,9 +71,9 @@ public void createItem_withBulkAndThroughputControl() throws InterruptedExceptio new CosmosContainerProperties(UUID.randomUUID().toString(), pkDefinition)); ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() - .setGroupName("test-group") - .setTargetThroughputThreshold(0.2) - .setDefault(true) + .groupName("test-group") + .targetThroughputThreshold(0.2) + .defaultControlGroup(true) .build(); bulkAsyncContainerWithThroughputControl.enableLocalThroughputControlGroup(groupConfig); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ContainerThroughputControlGroupPropertiesTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ContainerThroughputControlGroupPropertiesTests.java new file mode 100644 index 0000000000000..181fa1de5371a --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ContainerThroughputControlGroupPropertiesTests.java @@ -0,0 +1,96 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.throughputControl; + +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.implementation.TestConfigurations; +import com.azure.cosmos.implementation.throughputControl.config.LocalThroughputControlGroup; +import org.testng.annotations.Test; + +import java.util.UUID; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +public class ContainerThroughputControlGroupPropertiesTests { + + @Test(groups = "emulator") + public void enableThroughputControlGroup() { + CosmosAsyncClient testClient = null; + try { + testClient = new CosmosClientBuilder() + .endpoint(TestConfigurations.HOST) + .key(TestConfigurations.MASTER_KEY) + .buildAsyncClient(); + + ContainerThroughputControlGroupProperties throughputControlContainerProperties = new ContainerThroughputControlGroupProperties(); + + CosmosAsyncContainer container = testClient.getDatabase("fakeDatabase").getContainer("fakeContainer"); + + // Test 1: add default throughput control group successfully + LocalThroughputControlGroup throughputControlDefaultGroup = new LocalThroughputControlGroup( + "test-" + UUID.randomUUID(), + container, + 6, + null, + true, + false); + + int currentGroupSize = throughputControlContainerProperties.enableThroughputControlGroup(throughputControlDefaultGroup); + assertThat(currentGroupSize).isEqualTo(1); + + // Test 2: add throughput control group with same id + LocalThroughputControlGroup throughputControlGroupDuplciate = new LocalThroughputControlGroup( + throughputControlDefaultGroup.getGroupName(), + container, + 6, + null, + false, + false); + + assertThatThrownBy(() -> throughputControlContainerProperties.enableThroughputControlGroup(throughputControlGroupDuplciate)) + .isInstanceOf(IllegalArgumentException.class); + + // Test 3: add another default group + LocalThroughputControlGroup throughputControlDefaultGroup2 = new LocalThroughputControlGroup( + "test-" + UUID.randomUUID(), + container, + 6, + null, + true, + false); + assertThatThrownBy(() -> throughputControlContainerProperties.enableThroughputControlGroup(throughputControlDefaultGroup2)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("A default group already exists"); + + // Test 4: add a new group + LocalThroughputControlGroup newGroup = new LocalThroughputControlGroup( + "test-" + UUID.randomUUID(), + container, + 6, + null, + false, + false); + currentGroupSize = throughputControlContainerProperties.enableThroughputControlGroup(newGroup); + assertThat(currentGroupSize).isEqualTo(2); + + // Test 5: add a same group as step 4 + LocalThroughputControlGroup newGroupDuplicate = new LocalThroughputControlGroup( + newGroup.getGroupName(), + container, + newGroup.getTargetThroughput(), + newGroup.getTargetThroughputThreshold(), + newGroup.isDefault(), + newGroup.isContinueOnInitError()); + currentGroupSize = throughputControlContainerProperties.enableThroughputControlGroup(newGroupDuplicate); + assertThat(currentGroupSize).isEqualTo(2); + } finally { + if (testClient != null) { + testClient.close(); + } + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/TestItem.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/TestItem.java index 27273b660e8a3..8e03339f67c30 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/TestItem.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/TestItem.java @@ -3,6 +3,8 @@ package com.azure.cosmos.implementation.throughputControl; +import java.util.UUID; + public class TestItem { private String id; private String mypk; @@ -17,6 +19,10 @@ public TestItem(String id, String mypk, String prop) { this.prop = prop; } + public static TestItem createNewItem() { + return new TestItem(UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + public String getId() { return id; } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlGroupConfigConfigurationTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlGroupConfigConfigurationTests.java index a3e7906d0f53d..6744411282491 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlGroupConfigConfigurationTests.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlGroupConfigConfigurationTests.java @@ -34,17 +34,17 @@ public ThroughputControlGroupConfigConfigurationTests(CosmosClientBuilder client public void validateMultipleDefaultGroups() { ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() - .setGroupName("group-1") - .setTargetThroughput(10) - .setDefault(true) + .groupName("group-1") + .targetThroughput(10) + .defaultControlGroup(true) .build(); container.enableLocalThroughputControlGroup(groupConfig); ThroughputControlGroupConfig groupConfig2 = new ThroughputControlGroupConfigBuilder() - .setGroupName("group-2") - .setTargetThroughputThreshold(1.0) - .setDefault(true) + .groupName("group-2") + .targetThroughputThreshold(1.0) + .defaultControlGroup(true) .build(); assertThatThrownBy(() -> container.enableLocalThroughputControlGroup(groupConfig2)) .isInstanceOf(IllegalArgumentException.class) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java index 679f6f3d021e7..6fa057d09b7b7 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java @@ -15,8 +15,11 @@ import com.azure.cosmos.ThroughputControlGroupConfig; import com.azure.cosmos.ThroughputControlGroupConfigBuilder; import com.azure.cosmos.implementation.FailureValidator; +import com.azure.cosmos.implementation.InternalObjectNode; import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.implementation.throughputControl.controller.group.global.GlobalThroughputControlClientItem; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosContainerRequestOptions; @@ -26,6 +29,8 @@ import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.SqlParameter; +import com.azure.cosmos.models.SqlQuerySpec; import com.azure.cosmos.rx.CosmosItemResponseValidator; import com.azure.cosmos.rx.TestSuiteBase; import org.testng.annotations.BeforeClass; @@ -34,6 +39,8 @@ import org.testng.annotations.Test; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; @@ -65,13 +72,21 @@ public static Object[][] operationTypeProvider() { }; } + @DataProvider + public static Object[][] allowRequestToContinueOnInitErrorProvider() { + return new Object[][]{ + { true }, + { false } + }; + } + @Test(groups = {"emulator"}, dataProvider = "operationTypeProvider", timeOut = TIMEOUT) public void throughputLocalControl(OperationType operationType) { // The create document in this test usually takes around 6.29RU, pick a RU here relatively close, so to test throttled scenario ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() - .setGroupName("group-" + UUID.randomUUID()) - .setTargetThroughput(6) + .groupName("group-" + UUID.randomUUID()) + .targetThroughput(6) .build(); container.enableLocalThroughputControlGroup(groupConfig); @@ -101,8 +116,8 @@ public void throughputGlobalControl(OperationType operationType) { // The create document in this test usually takes around 6.29RU, pick a RU here relatively close, so to test throttled scenario ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() - .setGroupName("group-" + UUID.randomUUID()) - .setTargetThroughput(6) + .groupName("group-" + UUID.randomUUID()) + .targetThroughput(6) .build(); GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId) @@ -146,8 +161,8 @@ public void throughputLocalControlForContainerCreateDeleteWithSameName(Operation // pick a RU super small here so we know it will throttle requests for several cycles/seconds ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() - .setGroupName("group-" + UUID.randomUUID()) - .setTargetThroughput(1) + .groupName("group-" + UUID.randomUUID()) + .targetThroughput(1) .build(); container.enableLocalThroughputControlGroup(groupConfig); createdContainer.enableLocalThroughputControlGroup(groupConfig); @@ -191,8 +206,8 @@ public void throughputLocalControl_createItem() throws InterruptedException { // The create document in this test usually takes around 6.29RU, pick a RU here relatively close, so to test throttled scenario ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() - .setGroupName("group-" + UUID.randomUUID()) - .setTargetThroughput(6) + .groupName("group-" + UUID.randomUUID()) + .targetThroughput(6) .build(); container.enableLocalThroughputControlGroup(groupConfig); @@ -221,6 +236,102 @@ public void throughputLocalControl_createItem() throws InterruptedException { validateItemSuccess(container.createItem(itemGetThrottled), successValidator); } + @Test(groups = {"emulator"}, dataProvider = "allowRequestToContinueOnInitErrorProvider", timeOut = TIMEOUT) + public void throughputControlContinueOnInitError(boolean continueOnInitError) { + // Purposely not creating the throughput control container so to test allowRequestContinueOnInitError + String controlContainerId = "throughputControlContainer"; + GlobalThroughputControlConfig globalControlConfig = + this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId) + .setControlItemRenewInterval(Duration.ofSeconds(5)) + .setControlItemExpireInterval(Duration.ofSeconds(20)) + .build(); + + FailureValidator notFoundValidator = new FailureValidator.Builder().resourceNotFound().build(); + CosmosItemResponseValidator successValidator = + new CosmosItemResponseValidator.Builder>() + .build(); + + ThroughputControlGroupConfig groupConfig = + new ThroughputControlGroupConfigBuilder() + .groupName("group-" + UUID.randomUUID()) + .targetThroughput(6) + .continueOnInitError(continueOnInitError) + .build(); + + container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig); + + CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions(); + requestOptions.setThroughputControlGroupName(groupConfig.getGroupName()); + + if (continueOnInitError) { + validateItemSuccess( + container.createItem(TestItem.createNewItem(), requestOptions), + successValidator); + } else { + CosmosAsyncContainer fakeContainer = client.getDatabase(database.getId()).getContainer("fakeContainer"); + validateItemFailure( + fakeContainer.createItem(TestItem.createNewItem(), requestOptions), + notFoundValidator); + } + } + + @Test(groups = {"emulator"}, timeOut = TIMEOUT * 4) + public void throughputGlobalControlMultipleClients() throws InterruptedException { + List clients = new ArrayList<>(); + try{ + // and do not enable ttl on the container so to test how many items are created. + String controlContainerId = "throughputControlContainer"; + CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId); + database.createContainerIfNotExists(controlContainerId, "/groupId").block(); + ThroughputControlGroupConfig groupConfig = + new ThroughputControlGroupConfigBuilder() + .groupName("group-" + UUID.randomUUID()) + .targetThroughput(6) + .build(); + + int clientCount = 3; + for (int i = 0; i < clientCount; i++) { + CosmosAsyncClient testClient = new CosmosClientBuilder() + .endpoint(TestConfigurations.HOST) + .key(TestConfigurations.MASTER_KEY) + .buildAsyncClient(); + + clients.add(testClient); + + CosmosAsyncContainer testContainer = testClient.getDatabase(this.database.getId()).getContainer(container.getId()); + GlobalThroughputControlConfig globalControlConfig1 = testClient.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId) + .setControlItemRenewInterval(Duration.ofSeconds(5)) + .setControlItemExpireInterval(Duration.ofSeconds(20)) + .build(); + testContainer.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig1); + + CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions(); + requestOptions.setContentResponseOnWriteEnabled(true); + requestOptions.setThroughputControlGroupName(groupConfig.getGroupName()); + + testContainer.createItem(getDocumentDefinition(), requestOptions).block(); + } + + String query = "SELECT * FROM c WHERE CONTAINS(c.groupId, @GROUPID) AND CONTAINS(c.groupId, @CLIENTITEMSUFFIX)"; + List parameters = new ArrayList<>(); + parameters.add(new SqlParameter("@GROUPID", groupConfig.getGroupName())); + parameters.add(new SqlParameter("@CLIENTITEMSUFFIX", ".client")); + SqlQuerySpec querySpec = new SqlQuerySpec(query, parameters); + + List clientItems = controlContainer.queryItems(querySpec, GlobalThroughputControlClientItem.class) + .collectList() + .block(); + assertThat(clientItems.size()).isEqualTo(clientCount); + + } finally { + for (CosmosAsyncClient client : clients) { + if (client != null) { + client.close(); + } + } + } + } + @BeforeClass(groups = { "emulator" }, timeOut = 4 * SETUP_TIMEOUT) public void before_ThroughputBudgetControllerTest() { client = getClientBuilder().buildAsyncClient();