Skip to content

Commit

Permalink
[Weighted Routing] [Backport 2.x] Disallow search request with prefer…
Browse files Browse the repository at this point in the history
…ence parameter when weighted routing enabled (#6042)

* Disallow search request with preference parameter when weighted routing enabled (#5874)

* Disallow preference search with strict weighted shard routing

Signed-off-by: Anshu Agarwal <[email protected]>
  • Loading branch information
anshu1106 authored Jan 27, 2023
1 parent 922e795 commit 6e7ba5c
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add query for initialized extensions ([#5658](https://github.com/opensearch-project/OpenSearch/pull/5658))
- Add update-index-settings allowlist for searchable snapshot ([#5907](https://github.com/opensearch-project/OpenSearch/pull/5907))
- Replace latches with CompletableFutures for extensions ([#5646](https://github.com/opensearch-project/OpenSearch/pull/5646))
- Add support to disallow search request with preference parameter with strict weighted shard routing([#5874](https://github.com/opensearch-project/OpenSearch/pull/5874))

### Dependencies
- Update nebula-publishing-plugin to 19.2.0 ([#5704](https://github.com/opensearch-project/OpenSearch/pull/5704))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.PreferenceBasedSearchNotAllowedException;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.plugins.Plugin;
import org.opensearch.rest.RestStatus;
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.aggregations.bucket.terms.Terms;
import org.opensearch.snapshots.mockstore.MockRepository;
Expand Down Expand Up @@ -796,4 +798,75 @@ public void testMultiGetWithNetworkDisruption_FailOpenDisabled() throws Exceptio
);
}

/**
* Assert that preference based search is not allowed with strict weighted shard routing
* @throws Exception throws exception
*/
public void testStrictWeightedRouting() throws Exception {

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.routing.weighted.fail_open", true)
.put("cluster.routing.weighted.strict", true)
.build();

int nodeCountPerAZ = 1;
Map<String, List<String>> nodeMap = setupCluster(nodeCountPerAZ, commonSettings);

int numShards = 10;
int numReplicas = 1;
setUpIndexing(numShards, numReplicas);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
setShardRoutingWeights(weights);
String nodeInZoneA = nodeMap.get("a").get(0);
String customPreference = randomAlphaOfLength(10);

assertThrows(
PreferenceBasedSearchNotAllowedException.class,
() -> internalCluster().client(nodeMap.get("b").get(0))
.prepareSearch()
.setSize(0)
.setPreference(randomFrom("_local", "_only_nodes:" + nodeInZoneA, "_prefer_nodes:" + nodeInZoneA, customPreference))
.get()
);

}

/**
* Assert that preference based search works with non-strict weighted shard routing
* @throws Exception
*/
public void testPreferenceSearchWithWeightedRouting() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.routing.weighted.fail_open", true)
.put("cluster.routing.weighted.strict", false)
.build();

int nodeCountPerAZ = 1;
Map<String, List<String>> nodeMap = setupCluster(nodeCountPerAZ, commonSettings);

int numShards = 10;
int numReplicas = 2;
setUpIndexing(numShards, numReplicas);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
setShardRoutingWeights(weights);

String customPreference = randomAlphaOfLength(10);
String nodeInZoneA = nodeMap.get("a").get(0);

SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0))
.prepareSearch()
.setSize(0)
.setPreference(randomFrom("_local", "_only_nodes:" + nodeInZoneA, "_prefer_nodes:" + nodeInZoneA, customPreference))
.get();
assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus());
}

}
8 changes: 8 additions & 0 deletions server/src/main/java/org/opensearch/OpenSearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.action.support.replication.ReplicationOperation;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.routing.PreferenceBasedSearchNotAllowedException;
import org.opensearch.cluster.routing.UnsupportedWeightedRoutingStateException;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
import org.opensearch.common.CheckedFunction;
Expand Down Expand Up @@ -74,6 +75,7 @@
import static org.opensearch.Version.V_2_3_0;
import static org.opensearch.Version.V_2_4_0;
import static org.opensearch.Version.V_2_5_0;
import static org.opensearch.Version.V_2_6_0;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureFieldName;
Expand Down Expand Up @@ -1641,6 +1643,12 @@ private enum OpenSearchExceptionHandle {
UnsupportedWeightedRoutingStateException::new,
167,
V_2_5_0
),
PREFERENCE_BASED_SEARCH_NOT_ALLOWED_EXCEPTION(
PreferenceBasedSearchNotAllowedException.class,
PreferenceBasedSearchNotAllowedException::new,
168,
V_2_6_0
);

final Class<? extends OpenSearchException> exceptionClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,19 @@ public class OperationRouting {
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<Boolean> STRICT_WEIGHTED_SHARD_ROUTING_ENABLED = Setting.boolSetting(
"cluster.routing.weighted.strict",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
private volatile List<String> awarenessAttributes;
private volatile boolean useAdaptiveReplicaSelection;
private volatile boolean ignoreAwarenessAttr;
private volatile double weightedRoutingDefaultWeight;
private volatile boolean isFailOpenEnabled;
private volatile boolean isStrictWeightedShardRouting;

public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
// whether to ignore awareness attributes when routing requests
Expand All @@ -107,10 +115,12 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings);
this.weightedRoutingDefaultWeight = WEIGHTED_ROUTING_DEFAULT_WEIGHT.get(settings);
this.isFailOpenEnabled = WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(settings);
this.isStrictWeightedShardRouting = STRICT_WEIGHTED_SHARD_ROUTING_ENABLED.get(settings);
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes);
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_DEFAULT_WEIGHT, this::setWeightedRoutingDefaultWeight);
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled);
clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting);
}

void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
Expand All @@ -129,6 +139,10 @@ void setFailOpenEnabled(boolean isFailOpenEnabled) {
this.isFailOpenEnabled = isFailOpenEnabled;
}

void setStrictWeightedShardRouting(boolean strictWeightedShardRouting) {
this.isStrictWeightedShardRouting = strictWeightedShardRouting;
}

public boolean isIgnoreAwarenessAttr() {
return ignoreAwarenessAttr;
}
Expand Down Expand Up @@ -267,6 +281,11 @@ private ShardIterator preferenceActiveShardIterator(
if (preference == null || preference.isEmpty()) {
return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata);
}
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) {
throw new PreferenceBasedSearchNotAllowedException(
"Preference based routing not allowed with strict weighted shard routing setting"
);
}
if (preference.charAt(0) == '_') {
Preference preferenceType = Preference.parse(preference);
if (preferenceType == Preference.SHARDS) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.rest.RestStatus;

import java.io.IOException;

/**
* Thrown to disallow preference based search with strict weighted shard routing. See {@link WeightedRoutingService}
* * for more details.
*
* @opensearch.internal
*/
public class PreferenceBasedSearchNotAllowedException extends OpenSearchException {

public PreferenceBasedSearchNotAllowedException(StreamInput in) throws IOException {
super(in);
}

public PreferenceBasedSearchNotAllowedException(String msg, Object... args) {
super(msg, args);
}

@Override
public RestStatus status() {
return RestStatus.FORBIDDEN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ public void apply(Settings value, Settings current, Settings previous) {
OperationRouting.IGNORE_AWARENESS_ATTRIBUTES_SETTING,
OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT,
OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED,
OperationRouting.STRICT_WEIGHTED_SHARD_ROUTING_ENABLED,
IndexGraveyard.SETTING_MAX_TOMBSTONES,
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IllegalShardRoutingStateException;
import org.opensearch.cluster.routing.PreferenceBasedSearchNotAllowedException;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.TestShardRouting;
Expand Down Expand Up @@ -870,6 +871,7 @@ public void testIds() {
ids.put(165, ClusterManagerThrottlingException.class);
ids.put(166, SnapshotInUseDeletionException.class);
ids.put(167, UnsupportedWeightedRoutingStateException.class);
ids.put(168, PreferenceBasedSearchNotAllowedException.class);

Map<Class<? extends OpenSearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends OpenSearchException>> entry : ids.entrySet()) {
Expand Down

0 comments on commit 6e7ba5c

Please sign in to comment.