Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct service time parameter in ARS formula #70283

Merged
merged 7 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.cluster.routing;

import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -309,7 +310,12 @@ private static void adjustStats(final ResponseCollectorService collector,
final ResponseCollectorService.ComputedNodeStats stats = maybeStats.get();
final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2;
final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2;
final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2;

ExponentiallyWeightedMovingAverage avgServiceTime = new ExponentiallyWeightedMovingAverage(
ResponseCollectorService.ALPHA, stats.serviceTime);
avgServiceTime.addValue((minStats.serviceTime + stats.serviceTime) / 2);
final long updatedService = (long) avgServiceTime.getAverage();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, the "adjustment" seems odd. We should think about updating the node statistics explicitly. That's not in the scope of this PR but that would be a good follow up as discussed offline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jtibshirani
I'm curious, in NodeStatistics:

final ExponentiallyWeightedMovingAverage queueSize;
final ExponentiallyWeightedMovingAverage responseTime;
double serviceTime;

why not serviceTime also use type ExponentiallyWeightedMovingAverage, but adjust it here? It seems the effect is the same.


collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
*/
public final class ResponseCollectorService implements ClusterStateListener {

private static final double ALPHA = 0.3;
/**
* The weight parameter used for all moving averages of parameters.
*/
public static final double ALPHA = 0.3;

private final ConcurrentMap<String, NodeStatistics> nodeIdToStats = ConcurrentCollections.newConcurrentMap();

Expand Down Expand Up @@ -161,12 +164,12 @@ private double innerRank(long outstandingRequests) {

// EWMA of response time
double rS = responseTime / FACTOR;
// EWMA of service time
double muBarS = serviceTime / FACTOR;
// EWMA of service time. We match the paper's notation, which
// defines service time as the inverse of service rate (muBarS).
double muBarSInverse = serviceTime / FACTOR;

// The final formula
double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS);
return rank;
return rS - muBarSInverse + Math.pow(qHatS, queueAdjustmentFactor) * muBarSInverse;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change looks good. But I wonder if the clientNum above is representing the right number? As far as I can see, it is the number of nodes that this node has gotten a search response from. Compared to the paper this sounds more like "servers" than "clients". For instance in setups with a dedicated coordinating tier, this could be somewhat dynamic and the number of clients could differ between the nodes in the tier.

I wonder if you know the reasoning behind how the clientNum is derived?

Copy link
Contributor Author

@jtibshirani jtibshirani Mar 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also found this surprising and don't understand the reasoning. I am guessing it is meant as a loose approximation to the number of clients, since by default every node can serve as a coordinating node. Since we were dividing by service time before, this clientNum approximation didn't have a big impact.

I can see a couple options. We could avoid making changes to clientNum in this PR to keep it well-scoped, while recognizing that it may give too much weight to the queue size factor. Or we could always set clientNum to 1 for now, which can underestimate the queue size, but is simpler and makes the calculation more predictable.

In any case, I will make sure we track this through an issue or other means.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My worry is primarily that if we split changes here, we end up with some customers seeing multiple changes to the behavior over releases.

On the other hand, my intuition on this matter is not really strong enough that I think it is a show-stopper to merge this first and then deal with the other part later. It seems likely enough that clientNum will stabilize around a similar number over time for "client" nodes in the cluster. It might be too high (or low) though and building up the numClients could take time after restarts.

The dedicated role for coordinating nodes might come in handy here.

Copy link
Contributor Author

@jtibshirani jtibshirani Mar 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change identified a few aspects of ARS that could use improvement (including the 'stats adjustment' @jimczi mentioned above). So there will likely be more changes even apart from clientNum. To me it seems okay to introduce fixes/ improvements incrementally instead of assembling a single large update to the algorithm.

}

public double rank(long outstandingRequests) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.node.stats;

import org.elasticsearch.node.ResponseCollectorService.ComputedNodeStats;
import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;

/**
* Tests the core calculation used for ranking nodes in adaptive replica selection.
*/
public class ComputedNodeStatsTests extends ESTestCase {

public void testBasicInvariants() {
// When queue size estimate is 0, the rank should equal response time.
ComputedNodeStats stats = createStats(0, 150, 100);
assertThat(stats.rank(0), equalTo(150.0));

stats = createStats(0, 20, 19);
assertThat(stats.rank(0), equalTo(20.0));
}

public void testParameterScaling() {
// With non-zero queue size estimate, a larger service time should result in a larger rank.
ComputedNodeStats first = createStats(0, 150, 100);
ComputedNodeStats second = createStats(0, 150, 120);
assertTrue(first.rank(1) < second.rank(1));

first = createStats(1, 200, 199);
second = createStats(1, 200, 200);
assertTrue(first.rank(3) < second.rank(3));

// A larger response time should always result in a larger rank.
first = createStats(2, 150, 100);
second = createStats(2, 200, 100);
assertTrue(first.rank(1) < second.rank(1));

first = createStats(2, 150, 150);
second = createStats(2, 200, 150);
assertTrue(first.rank(1) < second.rank(1));

// More queued requests should always result in a larger rank.
first = createStats(2, 150, 100);
second = createStats(3, 150, 100);
assertTrue(first.rank(1) < second.rank(1));

// More pending requests should always result in a larger rank.
first = createStats(2, 150, 100);
second = createStats(2, 150, 100);
assertTrue(first.rank(0) < second.rank(1));
}

private ComputedNodeStats createStats(int queueSize, int responseTimeMillis, int serviceTimeMillis) {
return new ComputedNodeStats("node0", 5, queueSize, 1_000_000 * responseTimeMillis, 1_000_000 * serviceTimeMillis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ public void testThatOnlyNodesSupportNodeIds() throws InterruptedException, IOExc
}
}

public void testAdaptiveReplicaSelection() throws Exception {
public void testARSRanking() throws Exception {
final int numIndices = 1;
final int numShards = 1;
final int numReplicas = 2;
Expand All @@ -523,13 +523,11 @@ public void testAdaptiveReplicaSelection() throws Exception {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
opRouting.setUseAdaptiveReplicaSelection(true);
List<ShardRouting> searchedShards = new ArrayList<>(numShards);
Set<String> selectedNodes = new HashSet<>(numShards);
TestThreadPool threadPool = new TestThreadPool("testThatOnlyNodesSupportNodeIds");
TestThreadPool threadPool = new TestThreadPool("test");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
ResponseCollectorService collector = new ResponseCollectorService(clusterService);
Map<String, Long> outstandingRequests = new HashMap<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a 'winner' node is selected, we update the local 'outstanding requests' map to increment its value by 1. This only affects the local copy, it doesn't update global statistics. While this has a purpose in the ARS logic, it doesn't make sense in the context of this test. So I removed the shared outstandingRequests map here, and added another test that explicitly checks this behavior.

GroupShardsIterator<ShardIterator> groupIterator = opRouting.searchShards(state,
indexNames, null, null, collector, outstandingRequests);
indexNames, null, null, collector, new HashMap<>());

assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards));

Expand All @@ -538,58 +536,156 @@ public void testAdaptiveReplicaSelection() throws Exception {
ShardRouting firstChoice = groupIterator.get(0).nextOrNull();
assertNotNull(firstChoice);
searchedShards.add(firstChoice);
selectedNodes.add(firstChoice.currentNodeId());

groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>());

assertThat(groupIterator.size(), equalTo(numIndices * numShards));
ShardRouting secondChoice = groupIterator.get(0).nextOrNull();
assertNotNull(secondChoice);
searchedShards.add(secondChoice);
selectedNodes.add(secondChoice.currentNodeId());

groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>());

assertThat(groupIterator.size(), equalTo(numIndices * numShards));
ShardRouting thirdChoice = groupIterator.get(0).nextOrNull();
assertNotNull(thirdChoice);
searchedShards.add(thirdChoice);
selectedNodes.add(thirdChoice.currentNodeId());

// All three shards should have been separate, because there are no stats yet so they're all ranked equally.
assertThat(searchedShards.size(), equalTo(3));

// Now let's start adding node metrics, since that will affect which node is chosen
collector.addNodeStatistics("node_0", 2, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos());
collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(100).nanos(), TimeValue.timeValueMillis(50).nanos());
collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(150).nanos(), TimeValue.timeValueMillis(50).nanos());
collector.addNodeStatistics("node_2", 1, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(200).nanos());
outstandingRequests.put("node_0", 1L);
outstandingRequests.put("node_1", 1L);
outstandingRequests.put("node_2", 1L);

groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>());
ShardRouting shardChoice = groupIterator.get(0).nextOrNull();
// node 1 should be the lowest ranked node to start
assertThat(shardChoice.currentNodeId(), equalTo("node_1"));

// node 1 starts getting more loaded...
collector.addNodeStatistics("node_1", 2, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos());
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(100).nanos());
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>());
shardChoice = groupIterator.get(0).nextOrNull();
assertThat(shardChoice.currentNodeId(), equalTo("node_1"));

// and more loaded...
collector.addNodeStatistics("node_1", 3, TimeValue.timeValueMillis(250).nanos(), TimeValue.timeValueMillis(200).nanos());
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
collector.addNodeStatistics("node_1", 2, TimeValue.timeValueMillis(220).nanos(), TimeValue.timeValueMillis(120).nanos());
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>());
shardChoice = groupIterator.get(0).nextOrNull();
assertThat(shardChoice.currentNodeId(), equalTo("node_1"));

// and even more
collector.addNodeStatistics("node_1", 4, TimeValue.timeValueMillis(300).nanos(), TimeValue.timeValueMillis(250).nanos());
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
collector.addNodeStatistics("node_1", 3, TimeValue.timeValueMillis(250).nanos(), TimeValue.timeValueMillis(150).nanos());
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>());
shardChoice = groupIterator.get(0).nextOrNull();
// finally, node 2 is chosen instead
assertThat(shardChoice.currentNodeId(), equalTo("node_2"));
// finally, node 0 is chosen instead
assertThat(shardChoice.currentNodeId(), equalTo("node_0"));

IOUtils.close(clusterService);
terminate(threadPool);
}

public void testARSStatsAdjustment() throws Exception {
int numIndices = 1;
int numShards = 1;
int numReplicas = 1;
String[] indexNames = new String[numIndices];
for (int i = 0; i < numIndices; i++) {
indexNames[i] = "test" + i;
}

ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(indexNames, numShards, numReplicas);
OperationRouting opRouting = new OperationRouting(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
opRouting.setUseAdaptiveReplicaSelection(true);
TestThreadPool threadPool = new TestThreadPool("test");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);

ResponseCollectorService collector = new ResponseCollectorService(clusterService);
GroupShardsIterator<ShardIterator> groupIterator = opRouting.searchShards(state,
indexNames, null, null, collector, new HashMap<>());
assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards));

// We have two nodes, where the second has more load
collector.addNodeStatistics("node_0", 1, TimeValue.timeValueMillis(50).nanos(), TimeValue.timeValueMillis(40).nanos());
collector.addNodeStatistics("node_1", 2, TimeValue.timeValueMillis(100).nanos(), TimeValue.timeValueMillis(60).nanos());

// Check the first node is usually selected, if it's stats don't change much
for (int i = 0; i < 10; i++) {
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>());
ShardRouting shardChoice = groupIterator.get(0).nextOrNull();
assertThat(shardChoice.currentNodeId(), equalTo("node_0"));

int responseTime = 50 + randomInt(5);
int serviceTime = 40 + randomInt(5);
collector.addNodeStatistics("node_0", 1,
TimeValue.timeValueMillis(responseTime).nanos(),
TimeValue.timeValueMillis(serviceTime).nanos());
}

// Check that we try the second when the first node slows down more
collector.addNodeStatistics("node_0", 2, TimeValue.timeValueMillis(60).nanos(), TimeValue.timeValueMillis(50).nanos());
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, new HashMap<>());
ShardRouting shardChoice = groupIterator.get(0).nextOrNull();
assertThat(shardChoice.currentNodeId(), equalTo("node_1"));

IOUtils.close(clusterService);
terminate(threadPool);
}

public void testARSOutstandingRequestTracking() throws Exception {
int numIndices = 1;
int numShards = 2;
int numReplicas = 1;
String[] indexNames = new String[numIndices];
for (int i = 0; i < numIndices; i++) {
indexNames[i] = "test" + i;
}

ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(indexNames, numShards, numReplicas);
OperationRouting opRouting = new OperationRouting(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
opRouting.setUseAdaptiveReplicaSelection(true);
TestThreadPool threadPool = new TestThreadPool("test");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);

ResponseCollectorService collector = new ResponseCollectorService(clusterService);

// We have two nodes with very similar statistics
collector.addNodeStatistics("node_0", 1, TimeValue.timeValueMillis(50).nanos(), TimeValue.timeValueMillis(40).nanos());
collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(51).nanos(), TimeValue.timeValueMillis(40).nanos());
Map<String, Long> outstandingRequests = new HashMap<>();
outstandingRequests.put("node_0", 1L);
outstandingRequests.put("node_1", 1L);

// Check that we choose to search over both nodes
GroupShardsIterator<ShardIterator> groupIterator = opRouting.searchShards(
state, indexNames, null, null, collector, outstandingRequests);

Set<String> nodeIds = new HashSet<>();
nodeIds.add(groupIterator.get(0).nextOrNull().currentNodeId());
nodeIds.add(groupIterator.get(1).nextOrNull().currentNodeId());
assertThat(nodeIds, equalTo(Set.of("node_0", "node_1")));
assertThat(outstandingRequests.get("node_0"), equalTo(2L));
assertThat(outstandingRequests.get("node_1"), equalTo(2L));

// The first node becomes much more loaded
collector.addNodeStatistics("node_0", 5, TimeValue.timeValueMillis(300).nanos(), TimeValue.timeValueMillis(200).nanos());
outstandingRequests = new HashMap<>();
outstandingRequests.put("node_0", 1L);
outstandingRequests.put("node_1", 1L);

// Check that we always choose the second node
groupIterator = opRouting.searchShards(
state, indexNames, null, null, collector, outstandingRequests);

nodeIds = new HashSet<>();
nodeIds.add(groupIterator.get(0).nextOrNull().currentNodeId());
nodeIds.add(groupIterator.get(1).nextOrNull().currentNodeId());
assertThat(nodeIds, equalTo(Set.of("node_1")));
assertThat(outstandingRequests.get("node_1"), equalTo(3L));

IOUtils.close(clusterService);
terminate(threadPool);
Expand Down