Skip to content

Commit

Permalink
Disallow search request with preference parameter when weighted routi…
Browse files Browse the repository at this point in the history
…ng enabled (opensearch-project#5874)

* Disallow preference search with strict weighted shard routing

Signed-off-by: Anshu Agarwal <[email protected]>
  • Loading branch information
anshu1106 authored and nknize committed Jan 19, 2023
1 parent 6157ef7 commit 9d903bf
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))
- Add query for initialized extensions ([#5658](https://github.com/opensearch-project/OpenSearch/pull/5658))
- Revert 'Added jackson dependency to server' and change extension reading ([#5768](https://github.com/opensearch-project/OpenSearch/pull/5768))
- Add support to disallow search request with preference parameter with strict weighted shard routing([#5874](https://github.com/opensearch-project/OpenSearch/pull/5874))

### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.PreferenceBasedSearchNotAllowedException;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.plugins.Plugin;
import org.opensearch.rest.RestStatus;
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.aggregations.bucket.terms.Terms;
import org.opensearch.snapshots.mockstore.MockRepository;
Expand Down Expand Up @@ -796,4 +798,75 @@ public void testMultiGetWithNetworkDisruption_FailOpenDisabled() throws Exceptio
);
}

/**
* Assert that preference based search is not allowed with strict weighted shard routing
* @throws Exception throws exception
*/
public void testStrictWeightedRouting() throws Exception {

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.routing.weighted.fail_open", true)
.put("cluster.routing.weighted.strict", true)
.build();

int nodeCountPerAZ = 1;
Map<String, List<String>> nodeMap = setupCluster(nodeCountPerAZ, commonSettings);

int numShards = 10;
int numReplicas = 1;
setUpIndexing(numShards, numReplicas);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
setShardRoutingWeights(weights);
String nodeInZoneA = nodeMap.get("a").get(0);
String customPreference = randomAlphaOfLength(10);

assertThrows(
PreferenceBasedSearchNotAllowedException.class,
() -> internalCluster().client(nodeMap.get("b").get(0))
.prepareSearch()
.setSize(0)
.setPreference(randomFrom("_local", "_only_nodes:" + nodeInZoneA, "_prefer_nodes:" + nodeInZoneA, customPreference))
.get()
);

}

/**
* Assert that preference based search works with non-strict weighted shard routing
* @throws Exception
*/
public void testPreferenceSearchWithWeightedRouting() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.routing.weighted.fail_open", true)
.put("cluster.routing.weighted.strict", false)
.build();

int nodeCountPerAZ = 1;
Map<String, List<String>> nodeMap = setupCluster(nodeCountPerAZ, commonSettings);

int numShards = 10;
int numReplicas = 1;
setUpIndexing(numShards, numReplicas);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
setShardRoutingWeights(weights);

String customPreference = randomAlphaOfLength(10);
String nodeInZoneA = nodeMap.get("a").get(0);

SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0))
.prepareSearch()
.setSize(0)
.setPreference(randomFrom("_local", "_only_nodes:" + nodeInZoneA, "_prefer_nodes:" + nodeInZoneA, customPreference))
.get();
assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus());
}

}
8 changes: 8 additions & 0 deletions server/src/main/java/org/opensearch/OpenSearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.action.support.replication.ReplicationOperation;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.routing.PreferenceBasedSearchNotAllowedException;
import org.opensearch.cluster.routing.UnsupportedWeightedRoutingStateException;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
import org.opensearch.common.CheckedFunction;
Expand Down Expand Up @@ -72,6 +73,7 @@
import static org.opensearch.Version.V_2_1_0;
import static org.opensearch.Version.V_2_4_0;
import static org.opensearch.Version.V_2_5_0;
import static org.opensearch.Version.V_2_6_0;
import static org.opensearch.Version.V_3_0_0;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
Expand Down Expand Up @@ -1632,6 +1634,12 @@ private enum OpenSearchExceptionHandle {
167,
V_2_5_0
),
PREFERENCE_BASED_SEARCH_NOT_ALLOWED_EXCEPTION(
PreferenceBasedSearchNotAllowedException.class,
PreferenceBasedSearchNotAllowedException::new,
168,
V_2_6_0
),
INDEX_CREATE_BLOCK_EXCEPTION(
org.opensearch.cluster.block.IndexCreateBlockException.class,
org.opensearch.cluster.block.IndexCreateBlockException::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,19 @@ public class OperationRouting {
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<Boolean> STRICT_WEIGHTED_SHARD_ROUTING_ENABLED = Setting.boolSetting(
"cluster.routing.weighted.strict",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
private volatile List<String> awarenessAttributes;
private volatile boolean useAdaptiveReplicaSelection;
private volatile boolean ignoreAwarenessAttr;
private volatile double weightedRoutingDefaultWeight;
private volatile boolean isFailOpenEnabled;
private volatile boolean isStrictWeightedShardRouting;

public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
// whether to ignore awareness attributes when routing requests
Expand All @@ -107,10 +115,12 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings);
this.weightedRoutingDefaultWeight = WEIGHTED_ROUTING_DEFAULT_WEIGHT.get(settings);
this.isFailOpenEnabled = WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(settings);
this.isStrictWeightedShardRouting = STRICT_WEIGHTED_SHARD_ROUTING_ENABLED.get(settings);
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes);
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_DEFAULT_WEIGHT, this::setWeightedRoutingDefaultWeight);
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled);
clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting);
}

void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
Expand All @@ -129,6 +139,10 @@ void setFailOpenEnabled(boolean isFailOpenEnabled) {
this.isFailOpenEnabled = isFailOpenEnabled;
}

void setStrictWeightedShardRouting(boolean strictWeightedShardRouting) {
this.isStrictWeightedShardRouting = strictWeightedShardRouting;
}

public boolean isIgnoreAwarenessAttr() {
return ignoreAwarenessAttr;
}
Expand Down Expand Up @@ -267,6 +281,11 @@ private ShardIterator preferenceActiveShardIterator(
if (preference == null || preference.isEmpty()) {
return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata);
}
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) {
throw new PreferenceBasedSearchNotAllowedException(
"Preference based routing not allowed with strict weighted shard routing setting"
);
}
if (preference.charAt(0) == '_') {
Preference preferenceType = Preference.parse(preference);
if (preferenceType == Preference.SHARDS) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.rest.RestStatus;

import java.io.IOException;

/**
* Thrown to disallow preference based search with strict weighted shard routing. See {@link WeightedRoutingService}
* * for more details.
*
* @opensearch.internal
*/
public class PreferenceBasedSearchNotAllowedException extends OpenSearchException {

public PreferenceBasedSearchNotAllowedException(StreamInput in) throws IOException {
super(in);
}

public PreferenceBasedSearchNotAllowedException(String msg, Object... args) {
super(msg, args);
}

@Override
public RestStatus status() {
return RestStatus.FORBIDDEN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ public void apply(Settings value, Settings current, Settings previous) {
OperationRouting.IGNORE_AWARENESS_ATTRIBUTES_SETTING,
OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT,
OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED,
OperationRouting.STRICT_WEIGHTED_SHARD_ROUTING_ENABLED,
IndexGraveyard.SETTING_MAX_TOMBSTONES,
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IllegalShardRoutingStateException;
import org.opensearch.cluster.routing.PreferenceBasedSearchNotAllowedException;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.TestShardRouting;
Expand Down Expand Up @@ -867,6 +868,7 @@ public void testIds() {
ids.put(165, ClusterManagerThrottlingException.class);
ids.put(166, SnapshotInUseDeletionException.class);
ids.put(167, UnsupportedWeightedRoutingStateException.class);
ids.put(168, PreferenceBasedSearchNotAllowedException.class);
ids.put(10001, IndexCreateBlockException.class);

Map<Class<? extends OpenSearchException>, Integer> reverse = new HashMap<>();
Expand Down

0 comments on commit 9d903bf

Please sign in to comment.