Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct service time parameter in ARS formula #70283

Merged
merged 7 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

package org.elasticsearch.search.preference;
package org.elasticsearch.search.routing;

import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.search.routing;

import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.ResponseCollectorService.ComputedNodeStats;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(numClientNodes = 1, numDataNodes = 3)
public class SearchReplicaSelectionIT extends ESIntegTestCase {

@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING.getKey(), true).build();
}

public void testNodeSelection() {
// We grab a client directly to avoid using a randomizing client that might set a search preference.
Client client = internalCluster().coordOnlyNodeClient();

client.admin().indices().prepareCreate("test")
.setSettings(Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 2))
.get();
ensureGreen();

client.prepareIndex("test").setSource("field", "value").get();
refresh();

// Before we've gathered stats for all nodes, we should try each node once.
Set<String> nodeIds = new HashSet<>();
SearchResponse searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
nodeIds.add(searchResponse.getHits().getAt(0).getShard().getNodeId());

searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
nodeIds.add(searchResponse.getHits().getAt(0).getShard().getNodeId());

searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
nodeIds.add(searchResponse.getHits().getAt(0).getShard().getNodeId());

assertEquals(3, nodeIds.size());

// Now after more searches, we should select the node with the best ARS rank.
for (int i = 0; i < 5; i++) {
client.prepareSearch().setQuery(matchAllQuery()).get();
}

ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().get();
ImmutableOpenMap<String, DiscoveryNode> coordinatingNodes = clusterStateResponse.getState().nodes().getCoordinatingOnlyNodes();
assertEquals(1, coordinatingNodes.size());

String coordinatingNodeId = coordinatingNodes.valuesIt().next().getId();
NodesStatsResponse statsResponse = client.admin().cluster().prepareNodesStats()
.setAdaptiveSelection(true)
.get();
NodeStats nodeStats = statsResponse.getNodesMap().get(coordinatingNodeId);
assertNotNull(nodeStats);
assertEquals(3, nodeStats.getAdaptiveSelectionStats().getComputedStats().size());

String bestNodeId = null;
double bestRank = Double.POSITIVE_INFINITY;
for (Map.Entry<String, ComputedNodeStats> entry : nodeStats.getAdaptiveSelectionStats().getComputedStats().entrySet()) {
double rank = entry.getValue().rank(1L);
if (rank < bestRank) {
bestNodeId = entry.getKey();
bestRank = rank;
}
}

searchResponse = client.prepareSearch().setQuery(matchAllQuery()).get();
assertEquals(bestNodeId, searchResponse.getHits().getAt(0).getShard().getNodeId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.cluster.routing;

import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -309,7 +310,12 @@ private static void adjustStats(final ResponseCollectorService collector,
final ResponseCollectorService.ComputedNodeStats stats = maybeStats.get();
final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2;
final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2;
final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2;

ExponentiallyWeightedMovingAverage avgServiceTime = new ExponentiallyWeightedMovingAverage(
ResponseCollectorService.ALPHA, stats.serviceTime);
avgServiceTime.addValue((minStats.serviceTime + stats.serviceTime) / 2);
final long updatedService = (long) avgServiceTime.getAverage();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, the "adjustment" seems odd. We should think about updating the node statistics explicitly. That's not in the scope of this PR but that would be a good follow up as discussed offline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jtibshirani
I'm curious, in NodeStatistics:

final ExponentiallyWeightedMovingAverage queueSize;
final ExponentiallyWeightedMovingAverage responseTime;
double serviceTime;

why not serviceTime also use type ExponentiallyWeightedMovingAverage, but adjust it here? It seems the effect is the same.


collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
*/
public final class ResponseCollectorService implements ClusterStateListener {

private static final double ALPHA = 0.3;
/**
* The weight parameter used for all moving averages of parameters.
*/
public static final double ALPHA = 0.3;

private final ConcurrentMap<String, NodeStatistics> nodeIdToStats = ConcurrentCollections.newConcurrentMap();

Expand Down Expand Up @@ -161,12 +164,12 @@ private double innerRank(long outstandingRequests) {

// EWMA of response time
double rS = responseTime / FACTOR;
// EWMA of service time
double muBarS = serviceTime / FACTOR;
// EWMA of service time. We match the paper's notation, which
// defines service time as the inverse of service rate (muBarS).
double muBarSInverse = serviceTime / FACTOR;

// The final formula
double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS);
return rank;
return rS - muBarSInverse + Math.pow(qHatS, queueAdjustmentFactor) * muBarSInverse;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change looks good. But I wonder if the clientNum above is representing the right number? As far as I can see, it is the number of nodes that this node has gotten a search response from. Compared to the paper this sounds more like "servers" than "clients". For instance in setups with a dedicated coordinating tier, this could be somewhat dynamic and the number of clients could differ between the nodes in the tier.

I wonder if you know the reasoning behind how the clientNum is derived?

Copy link
Contributor Author

@jtibshirani jtibshirani Mar 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also found this surprising and don't understand the reasoning. I am guessing it is meant as a loose approximation to the number of clients, since by default every node can serve as a coordinating node. Since we were dividing by service time before, this clientNum approximation didn't have a big impact.

I can see a couple options. We could avoid making changes to clientNum in this PR to keep it well-scoped, while recognizing that it may give too much weight to the queue size factor. Or we could always set clientNum to 1 for now, which can underestimate the queue size, but is simpler and makes the calculation more predictable.

In any case, I will make sure we track this through an issue or other means.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My worry is primarily that if we split changes here, we end up with some customers seeing multiple changes to the behavior over releases.

On the other hand, my intuition on this matter is not really strong enough that I think it is a show-stopper to merge this first and then deal with the other part later. It seems likely enough that clientNum will stabilize around a similar number over time for "client" nodes in the cluster. It might be too high (or low) though and building up the numClients could take time after restarts.

The dedicated role for coordinating nodes might come in handy here.

Copy link
Contributor Author

@jtibshirani jtibshirani Mar 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change identified a few aspects of ARS that could use improvement (including the 'stats adjustment' @jimczi mentioned above). So there will likely be more changes even apart from clientNum. To me it seems okay to introduce fixes/ improvements incrementally instead of assembling a single large update to the algorithm.

}

public double rank(long outstandingRequests) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.node.stats;

import org.elasticsearch.node.ResponseCollectorService.ComputedNodeStats;
import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;

/**
* Tests the core calculation used for ranking nodes in adaptive replica selection.
*/
public class ComputedNodeStatsTests extends ESTestCase {

public void testBasicInvariants() {
// When queue size estimate is 0, the rank should equal response time.
ComputedNodeStats stats = createStats(0, 150, 100);
assertThat(stats.rank(0), equalTo(150.0));

stats = createStats(0, 20, 19);
assertThat(stats.rank(0), equalTo(20.0));
}

public void testParameterScaling() {
// With non-zero queue size estimate, a larger service time should result in a larger rank.
ComputedNodeStats first = createStats(0, 150, 100);
ComputedNodeStats second = createStats(0, 150, 120);
assertTrue(first.rank(1) < second.rank(1));

first = createStats(1, 200, 199);
second = createStats(1, 200, 200);
assertTrue(first.rank(3) < second.rank(3));

// A larger response time should always result in a larger rank.
first = createStats(2, 150, 100);
second = createStats(2, 200, 100);
assertTrue(first.rank(1) < second.rank(1));

first = createStats(2, 150, 150);
second = createStats(2, 200, 150);
assertTrue(first.rank(1) < second.rank(1));

// More queued requests should always result in a larger rank.
first = createStats(2, 150, 100);
second = createStats(3, 150, 100);
assertTrue(first.rank(1) < second.rank(1));

// More pending requests should always result in a larger rank.
first = createStats(2, 150, 100);
second = createStats(2, 150, 100);
assertTrue(first.rank(0) < second.rank(1));
}

private ComputedNodeStats createStats(int queueSize, int responseTimeMillis, int serviceTimeMillis) {
return new ComputedNodeStats("node0", 5, queueSize, 1_000_000 * responseTimeMillis, 1_000_000 * serviceTimeMillis);
}
}
Loading