From 5b19d0ca3a802b6b5b6f2530265f8a9cc17cf3d9 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Thu, 30 Mar 2023 14:03:12 +0530 Subject: [PATCH] Enhance search preference based routing for WRR (#6834) Signed-off-by: Varun Bansal (cherry picked from commit d8d6e7324adb4be088cab8e1fa5833e18ef19109) --- CHANGELOG.md | 1 + .../search/SearchWeightedRoutingIT.java | 473 +++++++++++++++++- .../routing/FailAwareWeightedRouting.java | 21 +- .../routing/IndexShardRoutingTable.java | 34 +- .../cluster/routing/OperationRouting.java | 51 +- .../cluster/routing/WeightedRoutingUtils.java | 12 + .../common/settings/ClusterSettings.java | 1 + .../FailAwareWeightedRoutingTests.java | 124 ++++- .../structure/RoutingIteratorTests.java | 95 +++- 9 files changed, 771 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0c24a8f713f6..3a204587ab933 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Segment Replication] Add new cluster setting to set replication strategy by default for all indices in cluster. ([#6791](https://github.com/opensearch-project/OpenSearch/pull/6791)) - Enable sort optimization for all NumericTypes ([#6464](https://github.com/opensearch-project/OpenSearch/pull/6464) - Remove 'cluster_manager' role attachment when using 'node.master' deprecated setting ([#6331](https://github.com/opensearch-project/OpenSearch/pull/6331)) +- Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index 02b51f9e625d3..a4cf5ebb028e3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -49,6 +49,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; @@ -513,6 +514,292 @@ public void testShardRoutingWithNetworkDisruption_FailOpenEnabled() throws Excep assertNoSearchInAZ("a"); } + public void testStrictWeightedRoutingWithCustomString_FailOpenEnabled() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0)).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + Future[] responses = new Future[50]; + String customPreference = randomAlphaOfLength(10); + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("test") + .setPreference(customPreference) + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + logger.info("--> shards should fail due to network disruption"); + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(searchResponse.getFailedShards(), 0); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + fail("search should not fail"); + } + } + + try { + assertSearchInAZ("b"); + } catch (AssertionError ae) { + assertSearchInAZ("c"); + } + assertNoSearchInAZ("a"); + } + + public void testStrictWeightedRoutingWithCustomString_FailOpenDisabled() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", false) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0)).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + Future[] responses = new Future[50]; + String customPreference = randomAlphaOfLength(10); + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("test") + .setPreference(customPreference) + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + logger.info("--> shards should fail due to network disruption"); + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertNotEquals(searchResponse.getFailedShards(), 0); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + fail("search should not fail"); + } + } + + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + Set expectedHotNodes = new HashSet<>(); + for (DiscoveryNode node : dataNodes) { + if (node.getAttributes().getOrDefault("zone", "").equals("b")) { + expectedHotNodes.add(node.getId()); + } + } + + assertEquals(expectedHotNodes, hitNodes); + + assertSearchInAZ("b"); + assertNoSearchInAZ("c"); + assertNoSearchInAZ("a"); + } + + /** + * Should failopen shards even if failopen enabled with custom search preference. + * @throws Exception + */ + public void testStrictWeightedRoutingWithShardPrefNetworkDisruption_FailOpenEnabled() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("c").get(0)).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Future[] responses = new Future[50]; + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + ShardId shardId = internalCluster().clusterService() + .state() + .getRoutingTable() + .index("test") + .randomAllActiveShardsIt() + .getShardRoutings() + .stream() + .filter(shard -> { + return dataNodes.get(shard.currentNodeId()).getAttributes().getOrDefault("zone", "").equals("c"); + }) + .findFirst() + .get() + .shardId(); + + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("c").get(0)) + .prepareSearch("test") + .setPreference(String.format(Locale.ROOT, "_shards:%s", shardId.getId())) + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(searchResponse.getFailedShards(), 0); + } catch (Exception t) { + fail("search should not fail"); + } + } + + assertNoSearchInAZ("a"); + try { + assertSearchInAZ("c"); + } catch (AssertionError ae) { + assertSearchInAZ("b"); + } + } + + public void testStrictWeightedRoutingWithShardPref() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + ShardId shardId = internalCluster().clusterService() + .state() + .getRoutingTable() + .index("test") + .randomAllActiveShardsIt() + .getShardRoutings() + .stream() + .filter(shard -> { + return dataNodes.get(shard.currentNodeId()).getAttributes().getOrDefault("zone", "").equals("c"); + }) + .findFirst() + .get() + .shardId(); + + Future[] responses = new Future[50]; + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("test") + .setPreference(String.format(Locale.ROOT, "_shards:%s", shardId.getId())) + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(searchResponse.getFailedShards(), 0); + assertNotEquals(searchResponse.getHits().getTotalHits().value, 0); + } catch (Exception t) { + fail("search should not fail"); + } + } + assertNoSearchInAZ("c"); + } + private void assertNoSearchInAZ(String az) { ImmutableOpenMap dataNodes = internalCluster().clusterService().state().nodes().getDataNodes(); String dataNodeId = null; @@ -805,8 +1092,7 @@ public void testMultiGetWithNetworkDisruption_FailOpenDisabled() throws Exceptio /** * Assert that preference based search is not allowed with strict weighted shard routing */ - public void testStrictWeightedRouting() { - + public void testStrictWeightedRoutingWithCustomString() { Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") @@ -827,15 +1113,36 @@ public void testStrictWeightedRouting() { String nodeInZoneA = nodeMap.get("a").get(0); String customPreference = randomAlphaOfLength(10); - assertThrows( - PreferenceBasedSearchNotAllowedException.class, - () -> internalCluster().client(nodeMap.get("b").get(0)) - .prepareSearch() - .setSize(0) - .setPreference(randomFrom("_local", "_only_nodes:" + nodeInZoneA, "_prefer_nodes:" + nodeInZoneA, customPreference)) - .get() - ); + SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch() + .setSize(20) + .setPreference(customPreference) + .get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + assertNoSearchInAZ("c"); + assertSearchInAZ("a"); + assertSearchInAZ("b"); + // disable strict weighed routing + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put("cluster.routing.weighted.strict", false)) + .get(); + + // make search requests with custom string + internalCluster().client(nodeMap.get("a").get(0)) + .prepareSearch() + .setSize(20) + .setPreference(customPreference) + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + // assert search on data nodes on az c (weighed away az) + try { + assertSearchInAZ("c"); + } catch (AssertionError ae) { + assertSearchInAZ("a"); + } } /** @@ -862,13 +1169,157 @@ public void testPreferenceSearchWithWeightedRouting() { String customPreference = randomAlphaOfLength(10); String nodeInZoneA = nodeMap.get("a").get(0); + String nodeInZoneB = nodeMap.get("b").get(0); + String nodeInZoneC = nodeMap.get("c").get(0); + + Map nodeIDMap = new HashMap<>(); + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + for (DiscoveryNode node : dataNodes) { + nodeIDMap.put(node.getName(), node.getId()); + } + SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch() + .setPreference(randomFrom("_local", "_prefer_nodes:" + "zone:a", customPreference)) + .get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + + searchResponse = internalCluster().client(nodeMap.get("a").get(0)) + .prepareSearch() + .setPreference( + "_only_nodes:" + nodeIDMap.get(nodeInZoneA) + "," + nodeIDMap.get(nodeInZoneB) + "," + nodeIDMap.get(nodeInZoneC) + ) + .get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + } + + public void testPreferenceSearchWithIgnoreWeightedRouting() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", false) + .put("cluster.routing.ignore_weighted_routing", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 2; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + String customPreference = randomAlphaOfLength(10); + String nodeInZoneA = nodeMap.get("a").get(0); + String nodeInZoneB = nodeMap.get("b").get(0); + String nodeInZoneC = nodeMap.get("c").get(0); + + Map nodeIDMap = new HashMap<>(); + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + for (DiscoveryNode node : dataNodes) { + nodeIDMap.put(node.getName(), node.getId()); + } + + SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch() + .setPreference(randomFrom("_local", "_prefer_nodes:" + "zone:a", customPreference)) + .get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + + searchResponse = internalCluster().client(nodeMap.get("a").get(0)) + .prepareSearch() + .setPreference( + "_only_nodes:" + nodeIDMap.get(nodeInZoneA) + "," + nodeIDMap.get(nodeInZoneB) + "," + nodeIDMap.get(nodeInZoneC) + ) + .get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + } + + /** + * Assert that preference based search with preference type is not allowed with strict weighted shard routing + */ + public void testStrictWeightedRouting() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + String nodeInZoneA = nodeMap.get("a").get(0); + + assertThrows( + PreferenceBasedSearchNotAllowedException.class, + () -> internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch() + .setSize(0) + .setPreference("_only_nodes:" + nodeInZoneA) + .get() + ); + + assertThrows( + PreferenceBasedSearchNotAllowedException.class, + () -> internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch() + .setSize(0) + .setPreference("_prefer_nodes:" + nodeInZoneA) + .get() + ); + } + + public void testStrictWeightedRoutingAllowedForSomeSearchPrefs() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + String nodeInZoneA = nodeMap.get("a").get(0); + String customPreference = randomAlphaOfLength(10); SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0)) .prepareSearch() .setSize(0) - .setPreference(randomFrom("_local", "_only_nodes:" + nodeInZoneA, "_prefer_nodes:" + nodeInZoneA, customPreference)) + .setPreference("_only_local:" + nodeInZoneA) .get(); assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + + searchResponse = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch() + .setSize(0) + .setPreference("_local:" + nodeInZoneA) + .get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + + searchResponse = internalCluster().client(nodeMap.get("b").get(0)).prepareSearch().setSize(0).setPreference("_shards:1").get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + + searchResponse = internalCluster().client(nodeMap.get("b").get(0)).prepareSearch().setSize(0).setPreference(customPreference).get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java b/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java index dbef876c9a258..72c189f20eaf6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java @@ -20,6 +20,8 @@ import java.util.List; +import static org.opensearch.cluster.routing.OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING; + /** * This class contains logic to find next shard to retry search request in case of failure from other shard copy. * This decides if retryable shard search requests can be tried on shard copies present in data @@ -72,9 +74,13 @@ public SearchShardTarget findNext( Runnable onShardSkipped ) { SearchShardTarget next = shardIt.nextOrNull(); + if (ignoreWeightedRouting(clusterState)) { + return next; + } + while (next != null && WeightedRoutingUtils.isWeighedAway(next.getNodeId(), clusterState)) { SearchShardTarget nextShard = next; - if (canFailOpen(nextShard.getShardId(), exception, clusterState)) { + if (canFailOpen(nextShard.getShardId(), shardIt.size(), exception, clusterState)) { logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.getShardId()), exception); getWeightedRoutingStats().updateFailOpenCount(); break; @@ -98,10 +104,13 @@ public SearchShardTarget findNext( */ public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState clusterState, Exception exception, Runnable onShardSkipped) { ShardRouting next = shardsIt.nextOrNull(); + if (ignoreWeightedRouting(clusterState)) { + return next; + } while (next != null && WeightedRoutingUtils.isWeighedAway(next.currentNodeId(), clusterState)) { ShardRouting nextShard = next; - if (canFailOpen(nextShard.shardId(), exception, clusterState)) { + if (canFailOpen(nextShard.shardId(), shardsIt.size(), exception, clusterState)) { logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.shardId()), exception); getWeightedRoutingStats().updateFailOpenCount(); break; @@ -117,8 +126,8 @@ public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState cluster * @return true if can fail open ie request shard copies present in nodes with weighted shard * routing weight set to zero */ - private boolean canFailOpen(ShardId shardId, Exception exception, ClusterState clusterState) { - return isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId); + private boolean canFailOpen(ShardId shardId, int shardItSize, Exception exception, ClusterState clusterState) { + return shardItSize == 1 || isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId); } private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardId) { @@ -131,6 +140,10 @@ private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardI return false; } + private boolean ignoreWeightedRouting(ClusterState clusterState) { + return IGNORE_WEIGHTED_SHARD_ROUTING.get(clusterState.getMetadata().settings()); + } + public WeightedRoutingStats getWeightedRoutingStats() { return WeightedRoutingStats.getInstance(); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index f730a2833fd02..711a750ade712 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -324,17 +324,13 @@ public ShardIterator activeInitializingShardsWeightedIt( WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight, - boolean isFailOpenEnabled + boolean isFailOpenEnabled, + @Nullable Integer seed ) { - final int seed = shufflerForWeightedRouting.nextSeed(); - List ordered = new ArrayList<>(); - List orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight); - List orderedListWithDistinctShards; - ordered.addAll(shufflerForWeightedRouting.shuffle(orderedActiveShards, seed)); - if (!allInitializingShards.isEmpty()) { - List orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight); - ordered.addAll(orderedInitializingShards); + if (seed == null) { + seed = shufflerForWeightedRouting.nextSeed(); } + List ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed); // append shards for attribute value with weight zero, so that shard search requests can be tried on // shard copies in case of request failure from other attribute values. @@ -357,8 +353,26 @@ public ShardIterator activeInitializingShardsWeightedIt( logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId); } } + + return new PlainShardIterator(shardId, ordered); + } + + private List activeInitializingShardsWithWeights( + WeightedRouting weightedRouting, + DiscoveryNodes nodes, + double defaultWeight, + int seed + ) { + List ordered = new ArrayList<>(); + List orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight); + ordered.addAll(shufflerForWeightedRouting.shuffle(orderedActiveShards, seed)); + if (!allInitializingShards.isEmpty()) { + List orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight); + ordered.addAll(orderedInitializingShards); + } + List orderedListWithDistinctShards; orderedListWithDistinctShards = ordered.stream().distinct().collect(Collectors.toList()); - return new PlainShardIterator(shardId, orderedListWithDistinctShards); + return orderedListWithDistinctShards; } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index cb20e223c9e20..9dd4261229670 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -93,16 +93,30 @@ public class OperationRouting { public static final Setting STRICT_WEIGHTED_SHARD_ROUTING_ENABLED = Setting.boolSetting( "cluster.routing.weighted.strict", + true, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting IGNORE_WEIGHTED_SHARD_ROUTING = Setting.boolSetting( + "cluster.routing.ignore_weighted_routing", false, Setting.Property.Dynamic, Setting.Property.NodeScope ); + + private static final List WEIGHTED_ROUTING_RESTRICTED_PREFERENCES = Arrays.asList( + Preference.ONLY_NODES, + Preference.PREFER_NODES + ); + private volatile List awarenessAttributes; private volatile boolean useAdaptiveReplicaSelection; private volatile boolean ignoreAwarenessAttr; private volatile double weightedRoutingDefaultWeight; private volatile boolean isFailOpenEnabled; private volatile boolean isStrictWeightedShardRouting; + private volatile boolean ignoreWeightedRouting; public OperationRouting(Settings settings, ClusterSettings clusterSettings) { // whether to ignore awareness attributes when routing requests @@ -116,11 +130,13 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) { this.weightedRoutingDefaultWeight = WEIGHTED_ROUTING_DEFAULT_WEIGHT.get(settings); this.isFailOpenEnabled = WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(settings); this.isStrictWeightedShardRouting = STRICT_WEIGHTED_SHARD_ROUTING_ENABLED.get(settings); + this.ignoreWeightedRouting = IGNORE_WEIGHTED_SHARD_ROUTING.get(settings); clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection); clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes); clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_DEFAULT_WEIGHT, this::setWeightedRoutingDefaultWeight); clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled); clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting); + clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting); } void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { @@ -143,6 +159,10 @@ void setStrictWeightedShardRouting(boolean strictWeightedShardRouting) { this.isStrictWeightedShardRouting = strictWeightedShardRouting; } + void setIgnoreWeightedRouting(boolean isWeightedRoundRobinEnabled) { + this.ignoreWeightedRouting = isWeightedRoundRobinEnabled; + } + public boolean isIgnoreAwarenessAttr() { return ignoreAwarenessAttr; } @@ -318,6 +338,7 @@ private ShardIterator preferenceActiveShardIterator( } } preferenceType = Preference.parse(preference); + checkPreferenceBasedRoutingAllowed(preferenceType, weightedRoutingMetadata); switch (preferenceType) { case PREFER_NODES: final Set nodesIds = Arrays.stream(preference.substring(Preference.PREFER_NODES.type().length() + 1).split(",")) @@ -343,7 +364,19 @@ private ShardIterator preferenceActiveShardIterator( // for a different element in the list by also incorporating the // shard ID into the hash of the user-supplied preference key. routingHash = 31 * routingHash + indexShard.shardId.hashCode(); - if (ignoreAwarenessAttributes()) { + if (WeightedRoutingUtils.shouldPerformStrictWeightedRouting( + isStrictWeightedShardRouting, + ignoreWeightedRouting, + weightedRoutingMetadata + )) { + return indexShard.activeInitializingShardsWeightedIt( + weightedRoutingMetadata.getWeightedRouting(), + nodes, + getWeightedRoutingDefaultWeight(), + isFailOpenEnabled, + routingHash + ); + } else if (ignoreAwarenessAttributes()) { return indexShard.activeInitializingShardsIt(routingHash); } else { return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash); @@ -357,12 +390,13 @@ private ShardIterator shardRoutings( @Nullable Map nodeCounts, @Nullable WeightedRoutingMetadata weightedRoutingMetadata ) { - if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet()) { + if (WeightedRoutingUtils.shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata)) { return indexShard.activeInitializingShardsWeightedIt( weightedRoutingMetadata.getWeightedRouting(), nodes, getWeightedRoutingDefaultWeight(), - isFailOpenEnabled + isFailOpenEnabled, + null ); } else if (ignoreAwarenessAttributes()) { if (useAdaptiveReplicaSelection) { @@ -430,4 +464,15 @@ private static int calculateScaledShardId(IndexMetadata indexMetadata, String ef return Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor(); } + private void checkPreferenceBasedRoutingAllowed(Preference preference, @Nullable WeightedRoutingMetadata weightedRoutingMetadata) { + if (WeightedRoutingUtils.shouldPerformStrictWeightedRouting( + isStrictWeightedShardRouting, + ignoreWeightedRouting, + weightedRoutingMetadata + ) && WEIGHTED_ROUTING_RESTRICTED_PREFERENCES.contains(preference)) { + throw new PreferenceBasedSearchNotAllowedException( + "Preference type based routing not allowed with strict weighted shard routing enabled" + ); + } + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingUtils.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingUtils.java index c7d40cbbbea61..72387aad0fa45 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingUtils.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingUtils.java @@ -53,4 +53,16 @@ public static boolean isWeighedAway(String nodeId, ClusterState clusterState) { } return false; } + + public static boolean shouldPerformWeightedRouting(boolean ignoreWeightedRouting, WeightedRoutingMetadata weightedRoutingMetadata) { + return !ignoreWeightedRouting && weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet(); + } + + public static boolean shouldPerformStrictWeightedRouting( + boolean isStrictWeightedShardRouting, + boolean ignoreWeightedRouting, + WeightedRoutingMetadata weightedRoutingMetadata + ) { + return isStrictWeightedShardRouting && shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata); + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 1b9e148fe6bf0..9e62b77de04ff 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -547,6 +547,7 @@ public void apply(Settings value, Settings current, Settings previous) { OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT, OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED, OperationRouting.STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, + OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING, IndexGraveyard.SETTING_MAX_TOMBSTONES, PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java index 5c3a2454c4074..c0164f1afd924 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java @@ -42,6 +42,10 @@ public class FailAwareWeightedRoutingTests extends OpenSearchTestCase { private ClusterState setUpCluster() { + return setUpCluster(Settings.EMPTY); + } + + private ClusterState setUpCluster(Settings transientSettings) { ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build(); // set up nodes @@ -78,7 +82,7 @@ private ClusterState setUpCluster() { Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); - Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()).transientSettings(transientSettings); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); @@ -143,6 +147,124 @@ public void testFindNextWithoutFailOpen() throws IOException { assertEquals(1, shardSkipped.get()); } + public void testFindNextWithJustOneShardInStandbyZone() throws IOException { + ClusterState clusterState = setUpCluster(); + + AtomicInteger shardSkipped = new AtomicInteger(); + // set up index + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + + ShardRouting shardRoutingA = TestShardRouting.newShardRouting("test", 0, "node_zone_a", true, ShardRoutingState.STARTED); + ShardRouting shardRoutingB = TestShardRouting.newShardRouting("test", 0, "node_zone_b", false, ShardRoutingState.STARTED); + ShardRouting shardRoutingC = TestShardRouting.newShardRouting("test", 0, "node_zone_c", false, ShardRoutingState.STARTED); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + final ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingBuilder.addShard(shardRoutingA); + indexShardRoutingBuilder.addShard(shardRoutingB); + indexShardRoutingBuilder.addShard(shardRoutingC); + + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + clusterState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()).build(); + + List shardRoutings = new ArrayList<>(); + shardRoutings.add(shardRoutingC); + + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + SearchShardIterator searchShardIterator = new SearchShardIterator( + clusterAlias, + shardId, + shardRoutings, + OriginalIndicesTests.randomOriginalIndices() + ); + + // fail open is not executed since fail open conditions don't met + SearchShardTarget next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNotNull(next); + next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNull(next); + assertEquals(0, shardSkipped.get()); + } + + public void testFindNextWithIgnoreWeightedRoutingTrue() throws IOException { + ClusterState clusterState = setUpCluster(Settings.builder().put("cluster.routing.ignore_weighted_routing", true).build()); + + AtomicInteger shardSkipped = new AtomicInteger(); + // set up index + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + + ShardRouting shardRoutingA = TestShardRouting.newShardRouting("test", 0, "node_zone_a", true, ShardRoutingState.STARTED); + ShardRouting shardRoutingB = TestShardRouting.newShardRouting("test", 0, "node_zone_b", false, ShardRoutingState.STARTED); + ShardRouting shardRoutingC = TestShardRouting.newShardRouting("test", 0, "node_zone_c", false, ShardRoutingState.STARTED); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + final ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingBuilder.addShard(shardRoutingA); + indexShardRoutingBuilder.addShard(shardRoutingB); + indexShardRoutingBuilder.addShard(shardRoutingC); + + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + clusterState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()).build(); + + List shardRoutings = new ArrayList<>(); + shardRoutings.add(shardRoutingA); + shardRoutings.add(shardRoutingB); + shardRoutings.add(shardRoutingC); + + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + SearchShardIterator searchShardIterator = new SearchShardIterator( + clusterAlias, + shardId, + shardRoutings, + OriginalIndicesTests.randomOriginalIndices() + ); + + // fail open is not executed since fail open conditions don't met + SearchShardTarget next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNotNull(next); + next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNotNull(next); + next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNotNull(next); + next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNull(next); + assertEquals(0, shardSkipped.get()); + } + public void testFindNextWithFailOpenDueTo5xx() throws IOException { ClusterState clusterState = setUpCluster(); diff --git a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java index 9715d3af09fc7..59ea0dfca559a 100644 --- a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java +++ b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java @@ -552,7 +552,7 @@ public void testWeightedRoutingWithDifferentWeights() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(2, shardIterator.size()); ShardRouting shardRouting; @@ -565,7 +565,7 @@ public void testWeightedRoutingWithDifferentWeights() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(3, shardIterator.size()); weights = Map.of("zone1", -1.0, "zone2", 0.0, "zone3", 1.0); @@ -573,7 +573,7 @@ public void testWeightedRoutingWithDifferentWeights() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(1, shardIterator.size()); shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); @@ -584,7 +584,7 @@ public void testWeightedRoutingWithDifferentWeights() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, null); assertEquals(3, shardIterator.size()); shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); @@ -646,7 +646,7 @@ public void testWeightedRoutingInMemoryStore() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(2, shardIterator.size()); ShardRouting shardRouting; shardRouting = shardIterator.nextOrNull(); @@ -660,7 +660,7 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(2, shardIterator.size()); shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); @@ -675,7 +675,7 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(2, shardIterator.size()); shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); @@ -690,7 +690,7 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(2, shardIterator.size()); shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); @@ -755,7 +755,7 @@ public void testWeightedRoutingShardState() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, null); assertEquals(3, shardIterator.size()); ShardRouting shardRouting; @@ -834,21 +834,21 @@ public void testWeightedRoutingShardStateWithDifferentWeights() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, null); ShardRouting shardRouting1 = shardIterator.nextOrNull(); shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, null); ShardRouting shardRouting2 = shardIterator.nextOrNull(); shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, null); ShardRouting shardRouting3 = shardIterator.nextOrNull(); @@ -860,4 +860,75 @@ public void testWeightedRoutingShardStateWithDifferentWeights() { terminate(threadPool); } } + + /** + * Test to validate that simple weighted shard routing with seed return same shard routing on each call + */ + public void testActiveInitializingShardsWeightedItWithCustomSeed() { + TestThreadPool threadPool = new TestThreadPool("testActiveInitializingShardsWeightedItWithCustomSeed"); + try { + Settings.Builder settings = Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.awareness.attributes", "zone"); + AllocationService strategy = createAllocationService(settings.build()); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + + Map node1Attributes = new HashMap<>(); + node1Attributes.put("zone", "zone1"); + Map node2Attributes = new HashMap<>(); + node2Attributes.put("zone", "zone2"); + Map node3Attributes = new HashMap<>(); + node3Attributes.put("zone", "zone3"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", unmodifiableMap(node1Attributes))) + .add(newNode("node2", unmodifiableMap(node2Attributes))) + .add(newNode("node3", unmodifiableMap(node3Attributes))) + .localNodeId("node1") + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + List> weightsList = new ArrayList<>(); + Map weights1 = Map.of("zone1", 1.0, "zone2", 1.0, "zone3", 0.0); + weightsList.add(weights1); + + WeightedRouting weightedRouting = new WeightedRouting("zone", weights1); + ShardIterator shardIterator = clusterState.routingTable() + .index("test") + .shard(0) + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, 1); + + ShardRouting shardRouting1 = shardIterator.nextOrNull(); + + for (int i = 0; i < 50; i++) { + shardIterator = clusterState.routingTable() + .index("test") + .shard(0) + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, 1); + + ShardRouting shardRouting2 = shardIterator.nextOrNull(); + + assertEquals(shardRouting1.currentNodeId(), shardRouting2.currentNodeId()); + } + + } finally { + terminate(threadPool); + } + } }