diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/preference/SearchPreferenceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/routing/SearchPreferenceIT.java similarity index 99% rename from server/src/internalClusterTest/java/org/elasticsearch/search/preference/SearchPreferenceIT.java rename to server/src/internalClusterTest/java/org/elasticsearch/search/routing/SearchPreferenceIT.java index e73210575ae22..d2995ba1a9408 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/preference/SearchPreferenceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/routing/SearchPreferenceIT.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.search.preference; +package org.elasticsearch.search.routing; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/routing/SearchReplicaSelectionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/routing/SearchReplicaSelectionIT.java new file mode 100644 index 0000000000000..248be22cc8ed3 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/routing/SearchReplicaSelectionIT.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.routing; + +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.ResponseCollectorService.ComputedNodeStats; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(numClientNodes = 1, numDataNodes = 3) +public class SearchReplicaSelectionIT extends ESIntegTestCase { + + @Override + public Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING.getKey(), true).build(); + } + + public void testNodeSelection() { + // We grab a client directly to avoid using a randomizing client that might set a search preference. + Client client = internalCluster().coordOnlyNodeClient(); + + client.admin().indices().prepareCreate("test") + .setSettings(Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2)) + .get(); + ensureGreen(); + + client.prepareIndex("test").setSource("field", "value").get(); + refresh(); + + // Before we've gathered stats for all nodes, we should try each node once. + Set nodeIds = new HashSet<>(); + SearchResponse searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + nodeIds.add(searchResponse.getHits().getAt(0).getShard().getNodeId()); + + searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + nodeIds.add(searchResponse.getHits().getAt(0).getShard().getNodeId()); + + searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + nodeIds.add(searchResponse.getHits().getAt(0).getShard().getNodeId()); + + assertEquals(3, nodeIds.size()); + + // Now after more searches, we should select the node with the best ARS rank. + for (int i = 0; i < 5; i++) { + client.prepareSearch().setQuery(matchAllQuery()).get(); + } + + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().get(); + ImmutableOpenMap coordinatingNodes = clusterStateResponse.getState().nodes().getCoordinatingOnlyNodes(); + assertEquals(1, coordinatingNodes.size()); + + String coordinatingNodeId = coordinatingNodes.valuesIt().next().getId(); + NodesStatsResponse statsResponse = client.admin().cluster().prepareNodesStats() + .setAdaptiveSelection(true) + .get(); + NodeStats nodeStats = statsResponse.getNodesMap().get(coordinatingNodeId); + assertNotNull(nodeStats); + assertEquals(3, nodeStats.getAdaptiveSelectionStats().getComputedStats().size()); + + String bestNodeId = null; + double bestRank = Double.POSITIVE_INFINITY; + for (Map.Entry entry : nodeStats.getAdaptiveSelectionStats().getComputedStats().entrySet()) { + double rank = entry.getValue().rank(1L); + if (rank < bestRank) { + bestNodeId = entry.getKey(); + bestRank = rank; + } + } + + searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get(); + assertEquals(bestNodeId, searchResponse.getHits().getAt(0).getShard().getNodeId()); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 6f3bbaf445fa4..e06c6530b834e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -9,6 +9,7 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.io.stream.StreamInput; @@ -309,7 +310,12 @@ private static void adjustStats(final ResponseCollectorService collector, final ResponseCollectorService.ComputedNodeStats stats = maybeStats.get(); final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2; final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2; - final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2; + + ExponentiallyWeightedMovingAverage avgServiceTime = new ExponentiallyWeightedMovingAverage( + ResponseCollectorService.ALPHA, stats.serviceTime); + avgServiceTime.addValue((minStats.serviceTime + stats.serviceTime) / 2); + final long updatedService = (long) avgServiceTime.getAverage(); + collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService); } } diff --git a/server/src/main/java/org/elasticsearch/node/ResponseCollectorService.java b/server/src/main/java/org/elasticsearch/node/ResponseCollectorService.java index 8caf719531059..3d2acb845ceac 100644 --- a/server/src/main/java/org/elasticsearch/node/ResponseCollectorService.java +++ b/server/src/main/java/org/elasticsearch/node/ResponseCollectorService.java @@ -32,7 +32,10 @@ */ public final class ResponseCollectorService implements ClusterStateListener { - private static final double ALPHA = 0.3; + /** + * The weight parameter used for all moving averages of parameters. + */ + public static final double ALPHA = 0.3; private final ConcurrentMap nodeIdToStats = ConcurrentCollections.newConcurrentMap(); @@ -161,12 +164,12 @@ private double innerRank(long outstandingRequests) { // EWMA of response time double rS = responseTime / FACTOR; - // EWMA of service time - double muBarS = serviceTime / FACTOR; + // EWMA of service time. We match the paper's notation, which + // defines service time as the inverse of service rate (muBarS). + double muBarSInverse = serviceTime / FACTOR; // The final formula - double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS); - return rank; + return rS - muBarSInverse + Math.pow(qHatS, queueAdjustmentFactor) * muBarSInverse; } public double rank(long outstandingRequests) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/ComputedNodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/ComputedNodeStatsTests.java new file mode 100644 index 0000000000000..05b5191f196d2 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/ComputedNodeStatsTests.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.cluster.node.stats; + +import org.elasticsearch.node.ResponseCollectorService.ComputedNodeStats; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests the core calculation used for ranking nodes in adaptive replica selection. + */ +public class ComputedNodeStatsTests extends ESTestCase { + + public void testBasicInvariants() { + // When queue size estimate is 0, the rank should equal response time. + ComputedNodeStats stats = createStats(0, 150, 100); + assertThat(stats.rank(0), equalTo(150.0)); + + stats = createStats(0, 20, 19); + assertThat(stats.rank(0), equalTo(20.0)); + } + + public void testParameterScaling() { + // With non-zero queue size estimate, a larger service time should result in a larger rank. + ComputedNodeStats first = createStats(0, 150, 100); + ComputedNodeStats second = createStats(0, 150, 120); + assertTrue(first.rank(1) < second.rank(1)); + + first = createStats(1, 200, 199); + second = createStats(1, 200, 200); + assertTrue(first.rank(3) < second.rank(3)); + + // A larger response time should always result in a larger rank. + first = createStats(2, 150, 100); + second = createStats(2, 200, 100); + assertTrue(first.rank(1) < second.rank(1)); + + first = createStats(2, 150, 150); + second = createStats(2, 200, 150); + assertTrue(first.rank(1) < second.rank(1)); + + // More queued requests should always result in a larger rank. + first = createStats(2, 150, 100); + second = createStats(3, 150, 100); + assertTrue(first.rank(1) < second.rank(1)); + + // More pending requests should always result in a larger rank. + first = createStats(2, 150, 100); + second = createStats(2, 150, 100); + assertTrue(first.rank(0) < second.rank(1)); + } + + private ComputedNodeStats createStats(int queueSize, int responseTimeMillis, int serviceTimeMillis) { + return new ComputedNodeStats("node0", 5, queueSize, 1_000_000 * responseTimeMillis, 1_000_000 * serviceTimeMillis); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java index 4164f12114675..77c89fdbe2350 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java @@ -510,7 +510,7 @@ public void testThatOnlyNodesSupportNodeIds() throws InterruptedException, IOExc } } - public void testAdaptiveReplicaSelection() throws Exception { + public void testARSRanking() throws Exception { final int numIndices = 1; final int numShards = 1; final int numReplicas = 2; @@ -523,13 +523,11 @@ public void testAdaptiveReplicaSelection() throws Exception { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); opRouting.setUseAdaptiveReplicaSelection(true); List searchedShards = new ArrayList<>(numShards); - Set selectedNodes = new HashSet<>(numShards); - TestThreadPool threadPool = new TestThreadPool("testThatOnlyNodesSupportNodeIds"); + TestThreadPool threadPool = new TestThreadPool("test"); ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); ResponseCollectorService collector = new ResponseCollectorService(clusterService); - Map outstandingRequests = new HashMap<>(); GroupShardsIterator groupIterator = opRouting.searchShards(state, - indexNames, null, null, collector, outstandingRequests); + indexNames, null, null, collector, new HashMap<>()); assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); @@ -538,58 +536,156 @@ public void testAdaptiveReplicaSelection() throws Exception { ShardRouting firstChoice = groupIterator.get(0).nextOrNull(); assertNotNull(firstChoice); searchedShards.add(firstChoice); - selectedNodes.add(firstChoice.currentNodeId()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); assertThat(groupIterator.size(), equalTo(numIndices * numShards)); ShardRouting secondChoice = groupIterator.get(0).nextOrNull(); assertNotNull(secondChoice); searchedShards.add(secondChoice); - selectedNodes.add(secondChoice.currentNodeId()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); assertThat(groupIterator.size(), equalTo(numIndices * numShards)); ShardRouting thirdChoice = groupIterator.get(0).nextOrNull(); assertNotNull(thirdChoice); searchedShards.add(thirdChoice); - selectedNodes.add(thirdChoice.currentNodeId()); // All three shards should have been separate, because there are no stats yet so they're all ranked equally. assertThat(searchedShards.size(), equalTo(3)); // Now let's start adding node metrics, since that will affect which node is chosen collector.addNodeStatistics("node_0", 2, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos()); - collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(100).nanos(), TimeValue.timeValueMillis(50).nanos()); + collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(150).nanos(), TimeValue.timeValueMillis(50).nanos()); collector.addNodeStatistics("node_2", 1, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(200).nanos()); - outstandingRequests.put("node_0", 1L); - outstandingRequests.put("node_1", 1L); - outstandingRequests.put("node_2", 1L); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); ShardRouting shardChoice = groupIterator.get(0).nextOrNull(); // node 1 should be the lowest ranked node to start assertThat(shardChoice.currentNodeId(), equalTo("node_1")); // node 1 starts getting more loaded... - collector.addNodeStatistics("node_1", 2, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(100).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); shardChoice = groupIterator.get(0).nextOrNull(); assertThat(shardChoice.currentNodeId(), equalTo("node_1")); // and more loaded... - collector.addNodeStatistics("node_1", 3, TimeValue.timeValueMillis(250).nanos(), TimeValue.timeValueMillis(200).nanos()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + collector.addNodeStatistics("node_1", 2, TimeValue.timeValueMillis(220).nanos(), TimeValue.timeValueMillis(120).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); shardChoice = groupIterator.get(0).nextOrNull(); assertThat(shardChoice.currentNodeId(), equalTo("node_1")); // and even more - collector.addNodeStatistics("node_1", 4, TimeValue.timeValueMillis(300).nanos(), TimeValue.timeValueMillis(250).nanos()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + collector.addNodeStatistics("node_1", 3, TimeValue.timeValueMillis(250).nanos(), TimeValue.timeValueMillis(150).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); shardChoice = groupIterator.get(0).nextOrNull(); - // finally, node 2 is chosen instead - assertThat(shardChoice.currentNodeId(), equalTo("node_2")); + // finally, node 0 is chosen instead + assertThat(shardChoice.currentNodeId(), equalTo("node_0")); + + IOUtils.close(clusterService); + terminate(threadPool); + } + + public void testARSStatsAdjustment() throws Exception { + int numIndices = 1; + int numShards = 1; + int numReplicas = 1; + String[] indexNames = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + indexNames[i] = "test" + i; + } + + ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(indexNames, numShards, numReplicas); + OperationRouting opRouting = new OperationRouting(Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + opRouting.setUseAdaptiveReplicaSelection(true); + TestThreadPool threadPool = new TestThreadPool("test"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + + ResponseCollectorService collector = new ResponseCollectorService(clusterService); + GroupShardsIterator groupIterator = opRouting.searchShards(state, + indexNames, null, null, collector, new HashMap<>()); + assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); + + // We have two nodes, where the second has more load + collector.addNodeStatistics("node_0", 1, TimeValue.timeValueMillis(50).nanos(), TimeValue.timeValueMillis(40).nanos()); + collector.addNodeStatistics("node_1", 2, TimeValue.timeValueMillis(100).nanos(), TimeValue.timeValueMillis(60).nanos()); + + // Check the first node is usually selected, if it's stats don't change much + for (int i = 0; i < 10; i++) { + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); + ShardRouting shardChoice = groupIterator.get(0).nextOrNull(); + assertThat(shardChoice.currentNodeId(), equalTo("node_0")); + + int responseTime = 50 + randomInt(5); + int serviceTime = 40 + randomInt(5); + collector.addNodeStatistics("node_0", 1, + TimeValue.timeValueMillis(responseTime).nanos(), + TimeValue.timeValueMillis(serviceTime).nanos()); + } + + // Check that we try the second when the first node slows down more + collector.addNodeStatistics("node_0", 2, TimeValue.timeValueMillis(60).nanos(), TimeValue.timeValueMillis(50).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>()); + ShardRouting shardChoice = groupIterator.get(0).nextOrNull(); + assertThat(shardChoice.currentNodeId(), equalTo("node_1")); + + IOUtils.close(clusterService); + terminate(threadPool); + } + + public void testARSOutstandingRequestTracking() throws Exception { + int numIndices = 1; + int numShards = 2; + int numReplicas = 1; + String[] indexNames = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + indexNames[i] = "test" + i; + } + + ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(indexNames, numShards, numReplicas); + OperationRouting opRouting = new OperationRouting(Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + opRouting.setUseAdaptiveReplicaSelection(true); + TestThreadPool threadPool = new TestThreadPool("test"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + + ResponseCollectorService collector = new ResponseCollectorService(clusterService); + + // We have two nodes with very similar statistics + collector.addNodeStatistics("node_0", 1, TimeValue.timeValueMillis(50).nanos(), TimeValue.timeValueMillis(40).nanos()); + collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(51).nanos(), TimeValue.timeValueMillis(40).nanos()); + Map outstandingRequests = new HashMap<>(); + outstandingRequests.put("node_0", 1L); + outstandingRequests.put("node_1", 1L); + + // Check that we choose to search over both nodes + GroupShardsIterator groupIterator = opRouting.searchShards( + state, indexNames, null, null, collector, outstandingRequests); + + Set nodeIds = new HashSet<>(); + nodeIds.add(groupIterator.get(0).nextOrNull().currentNodeId()); + nodeIds.add(groupIterator.get(1).nextOrNull().currentNodeId()); + assertThat(nodeIds, equalTo(Set.of("node_0", "node_1"))); + assertThat(outstandingRequests.get("node_0"), equalTo(2L)); + assertThat(outstandingRequests.get("node_1"), equalTo(2L)); + + // The first node becomes much more loaded + collector.addNodeStatistics("node_0", 5, TimeValue.timeValueMillis(300).nanos(), TimeValue.timeValueMillis(200).nanos()); + outstandingRequests = new HashMap<>(); + outstandingRequests.put("node_0", 1L); + outstandingRequests.put("node_1", 1L); + + // Check that we always choose the second node + groupIterator = opRouting.searchShards( + state, indexNames, null, null, collector, outstandingRequests); + + nodeIds = new HashSet<>(); + nodeIds.add(groupIterator.get(0).nextOrNull().currentNodeId()); + nodeIds.add(groupIterator.get(1).nextOrNull().currentNodeId()); + assertThat(nodeIds, equalTo(Set.of("node_1"))); + assertThat(outstandingRequests.get("node_1"), equalTo(3L)); IOUtils.close(clusterService); terminate(threadPool);