Skip to content

Commit

Permalink
Correct service time parameter in ARS formula (#70283)
Browse files Browse the repository at this point in the history
The adaptive replica selection algorithm implements the [C3 algorithm](https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf)
for ranking nodes. The formula defines service time as a quantity `1/muBarS`.
Our implementation accidentally plugs in service time for `muBarS` instead of
`1/muBarS`. This commit corrects the formula and adds invariant tests to
confirm it behaves as expected.

This change also fixes a bug in how we adjust node statistics. To ensure that
nodes with high ranks occasionally get selected, every time we select a
'winner' node, we average its stats with the node's stats and add it to the
moving average. For service time, we were accidentally overwriting the whole
moving average with the new stats, which caused the ranks to adjust too
quickly. This issue has a much bigger impact now that the formula correctly
incorporates service time, and is important to fix so the behavior remains
reasonable.

Fixes #65838.
  • Loading branch information
jtibshirani authored Mar 18, 2021
1 parent 9831084 commit ca43dd1
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

package org.elasticsearch.search.preference;
package org.elasticsearch.search.routing;

import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.search.routing;

import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.ResponseCollectorService.ComputedNodeStats;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(numClientNodes = 1, numDataNodes = 3)
public class SearchReplicaSelectionIT extends ESIntegTestCase {

@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING.getKey(), true).build();
}

public void testNodeSelection() {
// We grab a client directly to avoid using a randomizing client that might set a search preference.
Client client = internalCluster().coordOnlyNodeClient();

client.admin().indices().prepareCreate("test")
.setSettings(Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 2))
.get();
ensureGreen();

client.prepareIndex("test").setSource("field", "value").get();
refresh();

// Before we've gathered stats for all nodes, we should try each node once.
Set<String> nodeIds = new HashSet<>();
SearchResponse searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
nodeIds.add(searchResponse.getHits().getAt(0).getShard().getNodeId());

searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
nodeIds.add(searchResponse.getHits().getAt(0).getShard().getNodeId());

searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
nodeIds.add(searchResponse.getHits().getAt(0).getShard().getNodeId());

assertEquals(3, nodeIds.size());

// Now after more searches, we should select the node with the best ARS rank.
for (int i = 0; i < 5; i++) {
client.prepareSearch().setQuery(matchAllQuery()).get();
}

ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().get();
ImmutableOpenMap<String, DiscoveryNode> coordinatingNodes = clusterStateResponse.getState().nodes().getCoordinatingOnlyNodes();
assertEquals(1, coordinatingNodes.size());

String coordinatingNodeId = coordinatingNodes.valuesIt().next().getId();
NodesStatsResponse statsResponse = client.admin().cluster().prepareNodesStats()
.setAdaptiveSelection(true)
.get();
NodeStats nodeStats = statsResponse.getNodesMap().get(coordinatingNodeId);
assertNotNull(nodeStats);
assertEquals(3, nodeStats.getAdaptiveSelectionStats().getComputedStats().size());

String bestNodeId = null;
double bestRank = Double.POSITIVE_INFINITY;
for (Map.Entry<String, ComputedNodeStats> entry : nodeStats.getAdaptiveSelectionStats().getComputedStats().entrySet()) {
double rank = entry.getValue().rank(1L);
if (rank < bestRank) {
bestNodeId = entry.getKey();
bestRank = rank;
}
}

searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get();
assertEquals(bestNodeId, searchResponse.getHits().getAt(0).getShard().getNodeId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.cluster.routing;

import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -309,7 +310,12 @@ private static void adjustStats(final ResponseCollectorService collector,
final ResponseCollectorService.ComputedNodeStats stats = maybeStats.get();
final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2;
final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2;
final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2;

ExponentiallyWeightedMovingAverage avgServiceTime = new ExponentiallyWeightedMovingAverage(
ResponseCollectorService.ALPHA, stats.serviceTime);
avgServiceTime.addValue((minStats.serviceTime + stats.serviceTime) / 2);
final long updatedService = (long) avgServiceTime.getAverage();

collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
*/
public final class ResponseCollectorService implements ClusterStateListener {

private static final double ALPHA = 0.3;
/**
* The weight parameter used for all moving averages of parameters.
*/
public static final double ALPHA = 0.3;

private final ConcurrentMap<String, NodeStatistics> nodeIdToStats = ConcurrentCollections.newConcurrentMap();

Expand Down Expand Up @@ -161,12 +164,12 @@ private double innerRank(long outstandingRequests) {

// EWMA of response time
double rS = responseTime / FACTOR;
// EWMA of service time
double muBarS = serviceTime / FACTOR;
// EWMA of service time. We match the paper's notation, which
// defines service time as the inverse of service rate (muBarS).
double muBarSInverse = serviceTime / FACTOR;

// The final formula
double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS);
return rank;
return rS - muBarSInverse + Math.pow(qHatS, queueAdjustmentFactor) * muBarSInverse;
}

public double rank(long outstandingRequests) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.node.stats;

import org.elasticsearch.node.ResponseCollectorService.ComputedNodeStats;
import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;

/**
* Tests the core calculation used for ranking nodes in adaptive replica selection.
*/
public class ComputedNodeStatsTests extends ESTestCase {

public void testBasicInvariants() {
// When queue size estimate is 0, the rank should equal response time.
ComputedNodeStats stats = createStats(0, 150, 100);
assertThat(stats.rank(0), equalTo(150.0));

stats = createStats(0, 20, 19);
assertThat(stats.rank(0), equalTo(20.0));
}

public void testParameterScaling() {
// With non-zero queue size estimate, a larger service time should result in a larger rank.
ComputedNodeStats first = createStats(0, 150, 100);
ComputedNodeStats second = createStats(0, 150, 120);
assertTrue(first.rank(1) < second.rank(1));

first = createStats(1, 200, 199);
second = createStats(1, 200, 200);
assertTrue(first.rank(3) < second.rank(3));

// A larger response time should always result in a larger rank.
first = createStats(2, 150, 100);
second = createStats(2, 200, 100);
assertTrue(first.rank(1) < second.rank(1));

first = createStats(2, 150, 150);
second = createStats(2, 200, 150);
assertTrue(first.rank(1) < second.rank(1));

// More queued requests should always result in a larger rank.
first = createStats(2, 150, 100);
second = createStats(3, 150, 100);
assertTrue(first.rank(1) < second.rank(1));

// More pending requests should always result in a larger rank.
first = createStats(2, 150, 100);
second = createStats(2, 150, 100);
assertTrue(first.rank(0) < second.rank(1));
}

private ComputedNodeStats createStats(int queueSize, int responseTimeMillis, int serviceTimeMillis) {
return new ComputedNodeStats("node0", 5, queueSize, 1_000_000 * responseTimeMillis, 1_000_000 * serviceTimeMillis);
}
}
Loading

0 comments on commit ca43dd1

Please sign in to comment.