From ca43dd1e5bc708eb730ccd36b9afd1bb213ebd79 Mon Sep 17 00:00:00 2001 From: Julie Tibshirani Date: Thu, 18 Mar 2021 09:56:34 -0700 Subject: [PATCH] Correct service time parameter in ARS formula (#70283) The adaptive replica selection algorithm implements the [C3 algorithm](https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf) for ranking nodes. The formula defines service time as a quantity `1/muBarS`. Our implementation accidentally plugs in service time for `muBarS` instead of `1/muBarS`. This commit corrects the formula and adds invariant tests to confirm it behaves as expected. This change also fixes a bug in how we adjust node statistics. To ensure that nodes with high ranks occasionally get selected, every time we select a 'winner' node, we average its stats with the node's stats and add it to the moving average. For service time, we were accidentally overwriting the whole moving average with the new stats, which caused the ranks to adjust too quickly. This issue has a much bigger impact now that the formula correctly incorporates service time, and is important to fix so the behavior remains reasonable. Fixes #65838. --- .../SearchPreferenceIT.java | 2 +- .../routing/SearchReplicaSelectionIT.java | 101 +++++++++++++ .../routing/IndexShardRoutingTable.java | 8 +- .../node/ResponseCollectorService.java | 13 +- .../node/stats/ComputedNodeStatsTests.java | 63 ++++++++ .../routing/OperationRoutingTests.java | 142 +++++++++++++++--- 6 files changed, 299 insertions(+), 30 deletions(-) rename server/src/internalClusterTest/java/org/elasticsearch/search/{preference => routing}/SearchPreferenceIT.java (99%) create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/search/routing/SearchReplicaSelectionIT.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/ComputedNodeStatsTests.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/preference/SearchPreferenceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/routing/SearchPreferenceIT.java similarity index 99% rename from server/src/internalClusterTest/java/org/elasticsearch/search/preference/SearchPreferenceIT.java rename to server/src/internalClusterTest/java/org/elasticsearch/search/routing/SearchPreferenceIT.java index e73210575ae22..d2995ba1a9408 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/preference/SearchPreferenceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/routing/SearchPreferenceIT.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.search.preference; +package org.elasticsearch.search.routing; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/routing/SearchReplicaSelectionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/routing/SearchReplicaSelectionIT.java new file mode 100644 index 0000000000000..248be22cc8ed3 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/routing/SearchReplicaSelectionIT.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.routing; + +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.ResponseCollectorService.ComputedNodeStats; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(numClientNodes = 1, numDataNodes = 3) +public class SearchReplicaSelectionIT extends ESIntegTestCase { + + @Override + public Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING.getKey(), true).build(); + } + + public void testNodeSelection() { + // We grab a client directly to avoid using a randomizing client that might set a search preference. + Client client = internalCluster().coordOnlyNodeClient(); + + client.admin().indices().prepareCreate("test") + .setSettings(Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2)) + .get(); + ensureGreen(); + + client.prepareIndex("test").setSource("field", "value").get(); + refresh(); + + // Before we've gathered stats for all nodes, we should try each node once. + Set nodeIds = new HashSet<>(); + SearchResponse searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + nodeIds.add(searchResponse.getHits().getAt(0).getShard().getNodeId()); + + searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + nodeIds.add(searchResponse.getHits().getAt(0).getShard().getNodeId()); + + searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + nodeIds.add(searchResponse.getHits().getAt(0).getShard().getNodeId()); + + assertEquals(3, nodeIds.size()); + + // Now after more searches, we should select the node with the best ARS rank. + for (int i = 0; i < 5; i++) { + client.prepareSearch().setQuery(matchAllQuery()).get(); + } + + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().get(); + ImmutableOpenMap coordinatingNodes = clusterStateResponse.getState().nodes().getCoordinatingOnlyNodes(); + assertEquals(1, coordinatingNodes.size()); + + String coordinatingNodeId = coordinatingNodes.valuesIt().next().getId(); + NodesStatsResponse statsResponse = client.admin().cluster().prepareNodesStats() + .setAdaptiveSelection(true) + .get(); + NodeStats nodeStats = statsResponse.getNodesMap().get(coordinatingNodeId); + assertNotNull(nodeStats); + assertEquals(3, nodeStats.getAdaptiveSelectionStats().getComputedStats().size()); + + String bestNodeId = null; + double bestRank = Double.POSITIVE_INFINITY; + for (Map.Entry entry : nodeStats.getAdaptiveSelectionStats().getComputedStats().entrySet()) { + double rank = entry.getValue().rank(1L); + if (rank < bestRank) { + bestNodeId = entry.getKey(); + bestRank = rank; + } + } + + searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get(); + assertEquals(bestNodeId, searchResponse.getHits().getAt(0).getShard().getNodeId()); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 6f3bbaf445fa4..e06c6530b834e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -9,6 +9,7 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.io.stream.StreamInput; @@ -309,7 +310,12 @@ private static void adjustStats(final ResponseCollectorService collector, final ResponseCollectorService.ComputedNodeStats stats = maybeStats.get(); final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2; final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2; - final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2; + + ExponentiallyWeightedMovingAverage avgServiceTime = new ExponentiallyWeightedMovingAverage( + ResponseCollectorService.ALPHA, stats.serviceTime); + avgServiceTime.addValue((minStats.serviceTime + stats.serviceTime) / 2); + final long updatedService = (long) avgServiceTime.getAverage(); + collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService); } } diff --git a/server/src/main/java/org/elasticsearch/node/ResponseCollectorService.java b/server/src/main/java/org/elasticsearch/node/ResponseCollectorService.java index 8caf719531059..3d2acb845ceac 100644 --- a/server/src/main/java/org/elasticsearch/node/ResponseCollectorService.java +++ b/server/src/main/java/org/elasticsearch/node/ResponseCollectorService.java @@ -32,7 +32,10 @@ */ public final class ResponseCollectorService implements ClusterStateListener { - private static final double ALPHA = 0.3; + /** + * The weight parameter used for all moving averages of parameters. + */ + public static final double ALPHA = 0.3; private final ConcurrentMap nodeIdToStats = ConcurrentCollections.newConcurrentMap(); @@ -161,12 +164,12 @@ private double innerRank(long outstandingRequests) { // EWMA of response time double rS = responseTime / FACTOR; - // EWMA of service time - double muBarS = serviceTime / FACTOR; + // EWMA of service time. We match the paper's notation, which + // defines service time as the inverse of service rate (muBarS). + double muBarSInverse = serviceTime / FACTOR; // The final formula - double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS); - return rank; + return rS - muBarSInverse + Math.pow(qHatS, queueAdjustmentFactor) * muBarSInverse; } public double rank(long outstandingRequests) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/ComputedNodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/ComputedNodeStatsTests.java new file mode 100644 index 0000000000000..05b5191f196d2 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/ComputedNodeStatsTests.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.cluster.node.stats; + +import org.elasticsearch.node.ResponseCollectorService.ComputedNodeStats; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests the core calculation used for ranking nodes in adaptive replica selection. + */ +public class ComputedNodeStatsTests extends ESTestCase { + + public void testBasicInvariants() { + // When queue size estimate is 0, the rank should equal response time. + ComputedNodeStats stats = createStats(0, 150, 100); + assertThat(stats.rank(0), equalTo(150.0)); + + stats = createStats(0, 20, 19); + assertThat(stats.rank(0), equalTo(20.0)); + } + + public void testParameterScaling() { + // With non-zero queue size estimate, a larger service time should result in a larger rank. + ComputedNodeStats first = createStats(0, 150, 100); + ComputedNodeStats second = createStats(0, 150, 120); + assertTrue(first.rank(1) < second.rank(1)); + + first = createStats(1, 200, 199); + second = createStats(1, 200, 200); + assertTrue(first.rank(3) < second.rank(3)); + + // A larger response time should always result in a larger rank. + first = createStats(2, 150, 100); + second = createStats(2, 200, 100); + assertTrue(first.rank(1) < second.rank(1)); + + first = createStats(2, 150, 150); + second = createStats(2, 200, 150); + assertTrue(first.rank(1) < second.rank(1)); + + // More queued requests should always result in a larger rank. + first = createStats(2, 150, 100); + second = createStats(3, 150, 100); + assertTrue(first.rank(1) < second.rank(1)); + + // More pending requests should always result in a larger rank. + first = createStats(2, 150, 100); + second = createStats(2, 150, 100); + assertTrue(first.rank(0) < second.rank(1)); + } + + private ComputedNodeStats createStats(int queueSize, int responseTimeMillis, int serviceTimeMillis) { + return new ComputedNodeStats("node0", 5, queueSize, 1_000_000 * responseTimeMillis, 1_000_000 * serviceTimeMillis); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java index 4164f12114675..77c89fdbe2350 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java @@ -510,7 +510,7 @@ public void testThatOnlyNodesSupportNodeIds() throws InterruptedException, IOExc } } - public void testAdaptiveReplicaSelection() throws Exception { + public void testARSRanking() throws Exception { final int numIndices = 1; final int numShards = 1; final int numReplicas = 2; @@ -523,13 +523,11 @@ public void testAdaptiveReplicaSelection() throws Exception { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); opRouting.setUseAdaptiveReplicaSelection(true); List searchedShards = new ArrayList<>(numShards); - Set selectedNodes = new HashSet<>(numShards); - TestThreadPool threadPool = new TestThreadPool("testThatOnlyNodesSupportNodeIds"); + TestThreadPool threadPool = new TestThreadPool("test"); ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); ResponseCollectorService collector = new ResponseCollectorService(clusterService); - Map outstandingRequests = new HashMap<>(); GroupShardsIterator groupIterator = opRouting.searchShards(state, - indexNames, null, null, collector, outstandingRequests); + indexNames, null, null, collector, new HashMap<>()); assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); @@ -538,58 +536,156 @@ public void testAdaptiveReplicaSelection() throws Exception { ShardRouting firstChoice = groupIterator.get(0).nextOrNull(); assertNotNull(firstChoice); searchedShards.add(firstChoice); - selectedNodes.add(firstChoice.currentNodeId()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); assertThat(groupIterator.size(), equalTo(numIndices * numShards)); ShardRouting secondChoice = groupIterator.get(0).nextOrNull(); assertNotNull(secondChoice); searchedShards.add(secondChoice); - selectedNodes.add(secondChoice.currentNodeId()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); assertThat(groupIterator.size(), equalTo(numIndices * numShards)); ShardRouting thirdChoice = groupIterator.get(0).nextOrNull(); assertNotNull(thirdChoice); searchedShards.add(thirdChoice); - selectedNodes.add(thirdChoice.currentNodeId()); // All three shards should have been separate, because there are no stats yet so they're all ranked equally. assertThat(searchedShards.size(), equalTo(3)); // Now let's start adding node metrics, since that will affect which node is chosen collector.addNodeStatistics("node_0", 2, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos()); - collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(100).nanos(), TimeValue.timeValueMillis(50).nanos()); + collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(150).nanos(), TimeValue.timeValueMillis(50).nanos()); collector.addNodeStatistics("node_2", 1, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(200).nanos()); - outstandingRequests.put("node_0", 1L); - outstandingRequests.put("node_1", 1L); - outstandingRequests.put("node_2", 1L); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); ShardRouting shardChoice = groupIterator.get(0).nextOrNull(); // node 1 should be the lowest ranked node to start assertThat(shardChoice.currentNodeId(), equalTo("node_1")); // node 1 starts getting more loaded... - collector.addNodeStatistics("node_1", 2, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(100).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); shardChoice = groupIterator.get(0).nextOrNull(); assertThat(shardChoice.currentNodeId(), equalTo("node_1")); // and more loaded... - collector.addNodeStatistics("node_1", 3, TimeValue.timeValueMillis(250).nanos(), TimeValue.timeValueMillis(200).nanos()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + collector.addNodeStatistics("node_1", 2, TimeValue.timeValueMillis(220).nanos(), TimeValue.timeValueMillis(120).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); shardChoice = groupIterator.get(0).nextOrNull(); assertThat(shardChoice.currentNodeId(), equalTo("node_1")); // and even more - collector.addNodeStatistics("node_1", 4, TimeValue.timeValueMillis(300).nanos(), TimeValue.timeValueMillis(250).nanos()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + collector.addNodeStatistics("node_1", 3, TimeValue.timeValueMillis(250).nanos(), TimeValue.timeValueMillis(150).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); shardChoice = groupIterator.get(0).nextOrNull(); - // finally, node 2 is chosen instead - assertThat(shardChoice.currentNodeId(), equalTo("node_2")); + // finally, node 0 is chosen instead + assertThat(shardChoice.currentNodeId(), equalTo("node_0")); + + IOUtils.close(clusterService); + terminate(threadPool); + } + + public void testARSStatsAdjustment() throws Exception { + int numIndices = 1; + int numShards = 1; + int numReplicas = 1; + String[] indexNames = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + indexNames[i] = "test" + i; + } + + ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(indexNames, numShards, numReplicas); + OperationRouting opRouting = new OperationRouting(Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + opRouting.setUseAdaptiveReplicaSelection(true); + TestThreadPool threadPool = new TestThreadPool("test"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + + ResponseCollectorService collector = new ResponseCollectorService(clusterService); + GroupShardsIterator groupIterator = opRouting.searchShards(state, + indexNames, null, null, collector, new HashMap<>()); + assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); + + // We have two nodes, where the second has more load + collector.addNodeStatistics("node_0", 1, TimeValue.timeValueMillis(50).nanos(), TimeValue.timeValueMillis(40).nanos()); + collector.addNodeStatistics("node_1", 2, TimeValue.timeValueMillis(100).nanos(), TimeValue.timeValueMillis(60).nanos()); + + // Check the first node is usually selected, if it's stats don't change much + for (int i = 0; i < 10; i++) { + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); + ShardRouting shardChoice = groupIterator.get(0).nextOrNull(); + assertThat(shardChoice.currentNodeId(), equalTo("node_0")); + + int responseTime = 50 + randomInt(5); + int serviceTime = 40 + randomInt(5); + collector.addNodeStatistics("node_0", 1, + TimeValue.timeValueMillis(responseTime).nanos(), + TimeValue.timeValueMillis(serviceTime).nanos()); + } + + // Check that we try the second when the first node slows down more + collector.addNodeStatistics("node_0", 2, TimeValue.timeValueMillis(60).nanos(), TimeValue.timeValueMillis(50).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); + ShardRouting shardChoice = groupIterator.get(0).nextOrNull(); + assertThat(shardChoice.currentNodeId(), equalTo("node_1")); + + IOUtils.close(clusterService); + terminate(threadPool); + } + + public void testARSOutstandingRequestTracking() throws Exception { + int numIndices = 1; + int numShards = 2; + int numReplicas = 1; + String[] indexNames = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + indexNames[i] = "test" + i; + } + + ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(indexNames, numShards, numReplicas); + OperationRouting opRouting = new OperationRouting(Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + opRouting.setUseAdaptiveReplicaSelection(true); + TestThreadPool threadPool = new TestThreadPool("test"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + + ResponseCollectorService collector = new ResponseCollectorService(clusterService); + + // We have two nodes with very similar statistics + collector.addNodeStatistics("node_0", 1, TimeValue.timeValueMillis(50).nanos(), TimeValue.timeValueMillis(40).nanos()); + collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(51).nanos(), TimeValue.timeValueMillis(40).nanos()); + Map outstandingRequests = new HashMap<>(); + outstandingRequests.put("node_0", 1L); + outstandingRequests.put("node_1", 1L); + + // Check that we choose to search over both nodes + GroupShardsIterator groupIterator = opRouting.searchShards( + state, indexNames, null, null, collector, outstandingRequests); + + Set nodeIds = new HashSet<>(); + nodeIds.add(groupIterator.get(0).nextOrNull().currentNodeId()); + nodeIds.add(groupIterator.get(1).nextOrNull().currentNodeId()); + assertThat(nodeIds, equalTo(Set.of("node_0", "node_1"))); + assertThat(outstandingRequests.get("node_0"), equalTo(2L)); + assertThat(outstandingRequests.get("node_1"), equalTo(2L)); + + // The first node becomes much more loaded + collector.addNodeStatistics("node_0", 5, TimeValue.timeValueMillis(300).nanos(), TimeValue.timeValueMillis(200).nanos()); + outstandingRequests = new HashMap<>(); + outstandingRequests.put("node_0", 1L); + outstandingRequests.put("node_1", 1L); + + // Check that we always choose the second node + groupIterator = opRouting.searchShards( + state, indexNames, null, null, collector, outstandingRequests); + + nodeIds = new HashSet<>(); + nodeIds.add(groupIterator.get(0).nextOrNull().currentNodeId()); + nodeIds.add(groupIterator.get(1).nextOrNull().currentNodeId()); + assertThat(nodeIds, equalTo(Set.of("node_1"))); + assertThat(outstandingRequests.get("node_1"), equalTo(3L)); IOUtils.close(clusterService); terminate(threadPool);