Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement adaptive replica selection #26128

Merged
merged 25 commits into from
Aug 31, 2017

Conversation

dakrone
Copy link
Member

@dakrone dakrone commented Aug 9, 2017

This implements the selection algorithm described in the C3 paper for
determining which copy of the data a query should be routed to.

By using the service time EWMA, response time EWMA, and queue size EWMA we
calculate the score of a node by piggybacking these metrics with each search
request.

Since Elasticsearch lacks the "broadcast to every copy" behavior that Cassandra
has (as mentioned in the C3 paper) to update metrics after a node has been
highly weighted, this implementation adjusts a node's response stats using the
average of the its own and the "best" node's metrics. This is so that a long GC
or other activity that may cause a node's rank to increase dramatically does not
permanently keep a node from having requests routed to it, instead it will
eventually lower its score back to the realm where it is a potential candidate
for new queries.

This feature is off by default and can be turned on with the dynamic setting
cluster.routing.use_adaptive_replica_selection.

Relates to #24915, however instead of b=3 I used b=4 (after benchmarking)

This implements the selection algorithm described in the C3 paper for
determining which copy of the data a query should be routed to.

By using the service time EWMA, response time EWMA, and queue size EWMA we
calculate the score of a node by piggybacking these metrics with each search
request.

Since Elasticsearch lacks the "broadcast to every copy" behavior that Cassandra
has (as mentioned in the C3 paper) to update metrics after a node has been
highly weighted, this implementation adjusts a node's response stats using the
average of the its own and the "best" node's metrics. This is so that a long GC
or other activity that may cause a node's rank to increase dramatically does not
permanently keep a node from having requests routed to it, instead it will
eventually lower its score back to the realm where it is a potential candidate
for new queries.

This feature is off by default and can be turned on with the dynamic setting
`cluster.routing.use_adaptive_replica_selection`.

Relates to elastic#24915, however instead of `b=3` I used `b=4` (after benchmarking)
@dakrone dakrone added :Search/Search Search-related issues that do not fall into other categories v6.1.0 v7.0.0 WIP labels Aug 9, 2017
@dakrone
Copy link
Member Author

dakrone commented Aug 9, 2017

This is still missing tests (I wanted to verify the validity of the feature before adding them, hence the benchmarking before writing them). Here is a short summary of the benchmarking results:

This uses six machines running on Google Compute Engine, each machine has 16
CPUs and 60gb of RAM. Elasticsearch is given 31gb of RAM, all other
performance-related settings are unchanged.

Each test is 40,000 queries per lap, with 5 laps for a total of 200,000 queries.
Rally is used with 100 clients simultaneously sending requests as quickly as
possible. Rally is configured to send requests to the client node (esclient).

It consists for 4 different benchmarks:

  • One replica with no load applied to any of the nodes
  • One replica with load applied to one of the nodes
  • Four replicas with no load applied
  • Four replicas with load applied

For the "load" scenarios, load was introduced on the es3 node with stress -i 8 -c 8 -m 8 -d 8.

1 replica, no load summary

Metric No ARS ARS Improvement %
Median Throughput (ops/s) 95.7866 97.6061 1.8995350
50th percentile latency (ms) 1003.29 985.724 -1.7508397
90th percentile latency (ms) 1339.69 1346.43 0.50310146
99th percentile latency (ms) 1648.34 1670.29 1.3316427

Without any load, the median throughput and latency has roughly a 1% difference
(+/-) when hitting only the client node.

1 replica, with load summary

Metric No ARS ARS Improvement %
Median throughput (ops/s) 41.1558 88.8048 115.77712
50th percentile latency (ms) 411.721 943.221 129.09227
90th percentile latency (ms) 5215.34 1962.73 -62.366212
99th percentile latency (ms) 6181.48 2648.86 -57.148450

So a trade of 50th percentile latency for a large reduction in 90th/99th
percentile latency, while doubling the median throughput from 41 ops/s to 88
ops/s.

You can see the distribution of requests with ARS, requests were routed away
from es3 (the stressed node) to be handled by the non-loaded nodes:

node_name name                active queue rejected completed
es4       search                   0     0        0    241512
es1       search                   0     0        0    220730
es2       search                   0     0        0    226168
es3       search                   0     0        0    113115
esclient  search                   0     0        0    202875
es5       search                   0     0        0    212850

In the non-ARS scenario, the requests are evenly distributed.

4 replicas, no load summary

Metric No ARS ARS Improvement %
Median throughput (ops/s) 87.689 97.8271 11.561427
50th percentile latency (ms) 1348.53 981.778 -27.196429
90th percentile latency (ms) 1930.5 1377.62 -28.639213
99th percentile latency (ms) 2363.1 1751.05 -25.900300

ARS improves on both throughput and latency, for all of the percentiles.

Additionally, the requests are routed roughly evenly, even though round robin
selection is not used. Instead the formula can select the "least loaded" node to
send the request to.

4 replicas, with load summary

Metric No ARS ARS Improvement %
Median throughput (ops/s) 52.1863 86.5302 65.810184
50th percentile latency (ms) 2587.46 945.357 -63.463899
90th percentile latency (ms) 3489.49 2099.22 -39.841639
99th percentile latency (ms) 4168.83 3463.71 -16.914098

So an improvement in all latencies, while increasing the median throughput
from 52 ops/s to 86 ops/s.

As the number of replicas goes up, I expect the adaptive replica selection to be
better, since it has more choices of potentially unloaded machines to service
the request.

Round robin requests summary

One more test, instead of hitting the client node this time, I had Rally hit all
nodes in the cluster (including the client node). For this test I dropped back
to one replica since that is the most common use case.

Metric No ARS ARS Improvement %
Median throughput (ops/s) 89.6289 95.7207 6.7966917
50th percentile latency (ms) 1088.81 1010.36 -7.2051138
90th percentile latency (ms) 1706.07 1422.55 -16.618310
99th percentile latency (ms) 2481.1 1784.96 -28.057716

So still an improvement on throughput and latency for all categories.

Conclusion

Final summary, we want to see a high throughput improvement and a negative
latency percentile improvement to consider the feature successful.

Test case Throughput improvement % 50th % change 90th % change 99th % change
1 replica, no load 1.9% -1.7% 0.5% 1.3%
1 replica, with load 115.8% 129.0% -62.3% -57.1%
4 replicas, no load 11.6% -27.2% -28.6% -25.9%
4 replicas, with load 65.8% -63.5% -39.8% -16.9%
1 replica, round robin, no load 6.8% -7.2% -16.6% -28.0%

The adaptive replica selection shows an improvement for almost all tests in
throughput and latency. While not perfect, it should help route around
overloaded nodes.

Full benchmarks at https://writequit.org/org/es/design/adaptive-replica-selection-benchmarks.html

@dakrone dakrone requested a review from s1monw August 10, 2017 01:18
Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome lee, I left a question!

@@ -284,8 +285,9 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
for (int i = 0; i < indices.length; i++) {
concreteIndices[i] = indices[i].getName();
}
Map<String, Long> nodeSearchCounts = searchTransportService.getPendingRequests(SearchAction.NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is puzzeling to me, why are you useing the SearchAction.NAME it's the action that acts as a coordinator and we don't necessarily run this from within the cluster so the counts are expected to be 0 for almost all nodes? I wonder if that should be the shard actions here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right! I think instead it should be the prefix of "indices:data/read/search" so that all of these match it.

I'll make that change and then re-run the benchmarks to see if it affects things, it may mean we can lower b=4 back down to b=3, good catch!

* selection forumla. Making sure though that its random within the active shards of the same
* (or missing) rank, and initializing shards are the last to iterate through.
*/
public ShardIterator rankedActiveInitializingShardsIt(@Nullable ResponseCollectorService collector,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

down the road we really need to work on the number of objects being created in this process. It can totally be a followup but I think we can abstract it away quite nicely since it's all keyed by the node id and the set of nodes is static. We can use a bytesrefhash with parallel arrays in the future that also prevents all the boxing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I totally agree, it was even at the point where it was very elegantly implemented using streams, however the streams were too slow compared to their imperative counterparts, so it's definitely something I'd like to address in the future

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the other impl is called activeInitializingShardsRandomIt so maybe we should rename this one to activeInitializingShardsRankedIt or rename the other one to randomActiveInitializingShardsIt?

@dakrone
Copy link
Member Author

dakrone commented Aug 11, 2017

Thanks for taking a look @s1monw! I pushed a commit to change the pending requests calculation (thanks for catching that!) and re-ran the benchmarks to make sure it didn't change anything:

Single replica, non-loaded case:

Metric No ARS ARS Change %
Median Throughput (ops/s) 95.7866 98.537 2.8713828
50th percentile latency (ms) 1003.29 970.15 -3.3031327
90th percentile latency (ms) 1339.69 1326.79 -0.96290933
99th percentile latency (ms) 1648.34 1648.8 0.027906864

So again, not a huge latency difference, as expected for the unloaded cluster.

Single replica, es3 under load:

Metric No ARS ARS Change %
Median throughput (ops/s) 41.1558 87.8231 113.39179
50th percentile latency (ms) 411.721 1007.22 144.63654
90th percentile latency (ms) 5215.34 1839.46 -64.729816
99th percentile latency (ms) 6181.48 2433.55 -60.631596

And again, a large improvement in throughput for the loaded case as well as a trade-off of 50th percentile latency for a large improvement in 90th and 99th percentile latency.

Single replica, round robin requests:

Metric No ARS ARS Change %
Median throughput (ops/s) 89.6289 95.9452 7.0471689
50th percentile latency (ms) 1088.81 1013.61 -6.9066228
90th percentile latency (ms) 1706.07 1423.83 -16.543284
99th percentile latency (ms) 2481.1 1783.73 -28.107291

Again a nice improvement in both throughput and latency for the non-stressed round-robin test case.

Looks like it keeps the same improvement. I also dropped b back down to 3 (as specified in the paper) since the pending requests should now be correct.

@mcanini
Copy link

mcanini commented Aug 14, 2017

These are really impressinve numbers. Very good job!
@lalithsuresh Please have a look too.

I am not sure I understand what is meant by "Four replicas with no load applied". Can you describe the scenario?
This seems to be the case where the tput increase is very large but comes at the expense of median latency increase. It may have something to do with the rate limiters.

@lalithsuresh
Copy link

This looks very promising!

I don't understand the setup well, but why is ARS helping when there is only one replica of the data?

@dakrone
Copy link
Member Author

dakrone commented Aug 14, 2017

I am not sure I understand what is meant by "Four replicas with no load
applied". Can you describe the scenario?

Sure, so there are 5 data nodes, and for the index in question, there is 1
primary copy of the data and 4 replicas, since they are evenly distributed this
means that each of the data nodes can serve the request. In this case the
formula is applied to rank all 5 nodes and select the "best" to which to send
the request.

The "no load" part is that none of the nodes were artificially stressed.

This seems to be the case where the tput increase is very large but comes at
the expense of median latency increase. It may have something to do with the
rate limiters.

For the 4 replica no load case the throughput increase is 11% and the latency
decreases are at -27%, -28%, and -26%.

For the one with a median latency increase, do you mean the 1 replica with load
case? That's the only one that showed an increase in 50th percentile latency.
The rate limiters from the paper are not implemented as we don't have an exact
analogy in Elasticsearch that can be applied.

I don't understand the setup well, but why is ARS helping when there is only
one replica of the data?

Even when there is only one replica, we can still rank both of the nodes that
contain a copy of the data (the primary and the replica) and select the node
that is less loaded to route the request.

@lalithsuresh
Copy link

Ah, I see. So one replica means two copies of the data. That's what I'd missed. Thanks!

@dakrone dakrone removed the WIP label Aug 15, 2017
Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left another round of comments. Lemme know if we should just go with this impl or make it more efficient. I think we can iterate on it once it's in?

/**
* Return a map of nodeId to pending number of requests for the given action name prefix
*/
public Map<String, Long> getPendingRequests(final String actionNamePrefix) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be a very expensive operation I wonder if we should special case this here rather than adding a generic way of doing this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can keep a map inside SearchTransportService that is basically passed to every relevant request as an action listener. I think we can just keep things in the map until the counter goes to 0 and then we remove it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks better to me but I think I'd rather like a Map<String, AtomicLong>, ideally pre-filled with every possible action name so that the map is effectively immutable afterwards and concurrency is only handled at the AtomicLong level? It would also create fewer boxed longs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally pre-filled with every possible action name so that the map is effectively immutable afterwards and concurrency is only handled at the AtomicLong level? It would also create fewer boxed longs.

This is a map of nodeId to connectionCount, I'm not sure how pre-filling it with possible action names would help?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I got confused about what keys were about.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to your suggested patch. I'm just confused why it puts 0 as a default value when it sees a node id for the first time, should it be 1? Similarly it should remove entries from the map in handleResponse when the count is 1 rather than 0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, good call! I'll make those changes before pushing the commit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove the actionNamePrefix argument which is ignored?

* selection forumla. Making sure though that its random within the active shards of the same
* (or missing) rank, and initializing shards are the last to iterate through.
*/
public ShardIterator rankedActiveInitializingShardsIt(@Nullable ResponseCollectorService collector,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

@@ -43,13 +45,24 @@

public class OperationRouting extends AbstractComponent {

public static final Setting<Boolean> USE_ADAPTIVE_REPLICA_SELECTION_SETTING =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is false we should not collect any statistics in the SearchTransportService either no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should continue to collect the stats, especially since all of them are moving averages it's good to be able to turn ARS on and not have the numbers be wildly inaccurate. What do you think? I could go either way, though I think toggling the collection and and off is going to be more complex

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to keep collecting

@dakrone
Copy link
Member Author

dakrone commented Aug 21, 2017

Lemme know if we should just go with this impl or make it more efficient. I think we can iterate on it once it's in?

Yes, big +1 to iterating on this once it's in. I'm wary of making a lot of performance-related changes to this version of it, since it invalidates the benchmarks once I make a change, I'd rather get it in with these numbers, then work on the efficiency aspect.

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very interesting and benchmarks suggest this will be a great win! Some suggestions:

  • We should fix getPendingRequests to no longer run in linear time with the number of pending requests like you already started looking into.
  • I think readability would improve significantly if we unwrapped keys and values from Map.Entry objects and gave them meaningful names.
  • Could you add a link to the paper as a source comment. I think it is also important to add comments in places where you are not exactly following the paper recommendations (such as using the average with the best node), or where you think we might want to follow a different route.

Also if my understanding is correct, ranks only depend on the node that hosts a shard. So say that you have 2 nodes, 10 shards and 1 replica. Each node has an entire copy of the index: 10 shards and since the decision process only depends on the node statistics then it means that a given request will be served by only one of the two nodes, am I correct? If yes, I think this is problematic as it will make the situation worse for users who have a low throughput but care about latency?

for (Map.Entry<String, Optional<ResponseCollectorService.ComputedNodeStats>> entry : nodeStats.entrySet()) {
if (entry.getValue().isPresent()) {
ResponseCollectorService.ComputedNodeStats stats = entry.getValue().get();
double rank = stats.rank(nodeSearchCounts.getOrDefault(entry.getKey(), 1L));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd find it easier to read if you pulled entry.getKey and entry.getValue into their own variables with meaningful names

*/
public Map<String, Long> getPendingRequests(final String actionNamePrefix) {
Map<String, Long> nodeCounts = new HashMap<>();
for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like you could iterate over values directly

/**
* Return a map of nodeId to pending number of requests for the given action name prefix
*/
public Map<String, Long> getPendingRequests(final String actionNamePrefix) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks better to me but I think I'd rather like a Map<String, AtomicLong>, ideally pre-filled with every possible action name so that the map is effectively immutable afterwards and concurrency is only handled at the AtomicLong level? It would also create fewer boxed longs.

@dakrone
Copy link
Member Author

dakrone commented Aug 22, 2017

Thanks for taking another look @jpountz, I pushed commits addressing all of your comments!

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good to me overall. One thing I'd like to discuss is the behaviour of this option in the case that you don't have concurrent requests (see last paragraph of my previous comment) since shard requests will all go to the node that hosts the shard and has the lower rank. So to take an extreme example, if all nodes have a copy of all shards, then all shard requests will go to a single node: the one that has the best score.

I'm wondering whether we could fix it eg. by artificially increasing the number of connections to the best node in OperationRouting.searchShards in order to simulate that we are just about to send a request to this node. Something like that (oversimplified):

for (IndexShardRoutingTable shard : shards) {
  ShardIterator iterator = preferenceActiveShardIterator(..., nodeCounts);
  String firstShardNode = iterator.getShardRoutings().get(0).currentNodeId();
  nodeCounts[firstShardNode] += 1;
}

This is just a random idea in order to get the discussion started but I'm curious what you think about this issue.

/**
* Return a map of nodeId to pending number of search requests
*/
public Map<String, Long> getPendingSearchRequests() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a thought: this is going to return a "live" map, so getting the same entry twice in a row could return different counts. This does not seem to have the potential to cause bugs today, but I'm wondering whether we should take a snapshot instead in order to make it easier to reason about?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively I'd also be fine with documenting it here and in all methods that take this live node connection count.

super.handleResponse(response);
// Decrement the number of connections or remove it entirely if there are no more connections
// We need to remove the entry here so we don't leak when nodes go away forever
clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might be worth asserting that the current value is not null (using compute instead of computeIfPresent) and gte 1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no way the value can be null in this case? We only modify it in the three places all using compute*, and compute as well as computeIfPresent prevent null values in the map? When we expose the map it's also made an unmodifiableMap

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is why I said moving to compute instead of computeIfPresent so that we could assert that we do have a mapping for nodeId in that map at that point. To be clear I think that what you did is correct, I'd just like to add assertions to it to make sure the invariant is respected.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I added an assert for this in 8674249

final ResponseCollectorService.ComputedNodeStats stats = maybeStats.get();
final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2;
final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2;
final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the casts should not be necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are required, without them you get error: incompatible types: possible lossy conversion from double to long

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I had not realized we stored those times as doubles

@dakrone
Copy link
Member Author

dakrone commented Aug 23, 2017

One thing I'd like to discuss is the behaviour of this option in the case that
you don't have concurrent requests (see last paragraph of my previous comment)
since shard requests will all go to the node that hosts the shard and has the
lower rank. So to take an extreme example, if all nodes have a copy of all
shards, then all shard requests will go to a single node: the one that has the
best score.

This is indeed something where the behavior is a little different:

  1. Assuming that the volume of requests is so low that you don't have concurrent
    requests, does it really matter if the requests all go to one node? At the
    point where you have a single query executed at a time, does distributing it
    actually help at all? The load will be so low that it doesn't matter.

  2. The requests won't actually all go to one node. Here's an example scenario:

Consider four nodes, c, d1, d2, and d3, d* are data nodes each with a
copy of the data and c is the coordinating node (for the sake of simplicity).
For simplicity I'll use a single number for the "rank" of a node rather than the
three that actually go into the formula (I'm making up numbers):

  • Request comes in, c sees that it has no data for any of the nodes, so it
    does the normal "shuffle", the request goes to d1. d1 executes it, d1's
    rank is now 50
  • Request comes in, c sees that it does not have data for all copies of the
    shard, so it does the normal "shuffle" again, request goes to d2. d2
    executes it and d2's rank is now 40
  • Request comes in, c sees that it does not have data for all copies so it
    shuffles and sends the request to d3. d3 executes it and has a rank of 58
  • Request comes in, c sees that d2 has the lowest rank (40), so it sends the
    request to d2. d2s rank is now 45. d1 and d3 ranks are adjusted by the
    average between them and d2, so d1 is now (50+40)/2=45 and d3 is now
    (58+40/2)=49
  • Request comes in, c sees that d2 and d1 have the same rank (45), let's
    assume it sends it to d2 again. d2 has a rank of 46 now. d1 is adjusted
    to (45+45)/2=45 and d3 is adjusted to (49+45)/2=47
  • Request comes in, c sends it to d1 (rank 45) which is now the lowest
    ranked node, d1 is now at rank 62 (it was a slow node). d2 is adjusted to
    (46+45)/2=45.5, d3 is adjusted to (47+45)/2=46
  • Request comes in, c sends it to d2 (rank 45.5) and d2 comes back with a
    rank of 50. d1 is adjusted to (45.5+62)/2=53.75. d3 is adjusted to
    (45.5+46)/2=45.75
  • Request comes in, c sends it to d3 (rank 45.75), etc etc etc

After 8 serial requests, the request distribution is: d1=2, d2=4, d3=2

Yes, it is possible to conceive of a situation where serial requests go to only
a single node, however, in order to do that, that node would have to be
consistent in its response and service times or get faster. If it gets slower
then the request is sent to other nodes in the cluster, since their ranks will
slowly approach the winning node's rank as more and more requests are sent.

I would posit that if the requests all went to one node, and if that node's
response times were consistent or consistently getting faster, then it's not a
bad thing that the requests all go to that node.

@jpountz
Copy link
Contributor

jpountz commented Aug 23, 2017

Just had a quick discussion with Lee to clarify my concerns. Commenting here as well for reference.

Assuming that the volume of requests is so low that you don't have concurrent
requests, does it really matter if the requests all go to one node?

I think it matters. If you have more shards than cores per node, or if you have spinning disks that are not good at reading multiple locations concurrently, then querying a single node instead of distributing the query across multiple nodes is going to result in worse latencies.

Yes, it is possible to conceive of a situation where serial requests go to only
a single node

This is not what I am worried about. I actually think the algorithm and your idea to average metrics with those of the best node are going to do a good job at distributing the load based on the respective recent performance of nodes. My concern is about the fact that for a single search request, this algorithm will generally query fewer nodes than round-robin, which can be an issue for latency for the aforementioned reasons.

…ank when chosen

When a node gets chosen, this increases the number of search counts for the
winning node so that it will not be as likely to be chosen again for
non-concurrent search requests.
Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

} else {
// One or both of the nodes don't have stats, treat them as equal
return 0;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering that the fact that the order is not transitive could confuse sorting. For instance if you have s1 and s2 so that s1 < s2 and s3 which is null then s1 and s2 are both equal to s3 but not equal with each other. Maybe we should make nulls always compare less than non nulls so that the order is total?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay to keep the contract of treating situations where both nodes do not have stats as equal, I also expect it to be a very very tiny margin of requests since null stats only occurs on a brand new node with 0 prior searches

Copy link
Contributor

@jpountz jpountz Aug 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually javadocs of Arrays.sort that that an IllegalArgumentException may be thrown if the comparator violates the Comparator contract and the Comparator javadocs say that it must implement a total ordering so I think it's important to make nulls compare consistently less than or greater than non-null values.

Copy link
Member Author

@dakrone dakrone Aug 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, it's not strictly nulls (since the nodes do exists, their Optionals are just empty), but I understand what you're saying. I'll change this to make missing values compare consistently less

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh they are nulls, I originally wrote it with Optional but it's different now, sorry!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I pushed 3d1dd2b for this

@@ -58,6 +58,7 @@
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover?


// adjust the non-winner nodes' stats so they will get a chance to receive queries
adjustStats(collector, nodeStats, minNode, minStats);
if (minNode != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are ties, then minNode might not be the node that we first try to send the shard request to. Should we update stats of the node id of the first shard of sortedShards instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did make this change, to update the first stats of sortedShards, it also turns out to be more efficient because I don't have to loop through everything twice!

// it only affects the captured node search counts, which is
// captured once for each query in TransportSearchAction
nodeSearchCounts.compute(minNode, (id, conns) -> conns == null ? 1 : conns + 1);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it's confusion that a method called rank has side-effects (adjustStats, increase node counts). Maybe split the application of side-effects into a separate method or rename it to make it clear it will update stats?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll rename this

* selection forumla. Making sure though that its random within the active shards of the same
* (or missing) rank, and initializing shards are the last to iterate through.
*/
public ShardIterator rankedActiveInitializingShardsIt(@Nullable ResponseCollectorService collector,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the other impl is called activeInitializingShardsRandomIt so maybe we should rename this one to activeInitializingShardsRankedIt or rename the other one to randomActiveInitializingShardsIt?

@@ -43,13 +45,24 @@

public class OperationRouting extends AbstractComponent {

public static final Setting<Boolean> USE_ADAPTIVE_REPLICA_SELECTION_SETTING =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to keep collecting

@dakrone
Copy link
Member Author

dakrone commented Aug 30, 2017

@jpountz thanks! I pushed commits for your feedback and just re-ran the benchmarks to ensure it didn't make any appreciable difference. The numbers for different tests are the same as the prior benchmarking or slightly better! (I updated the big benchmarks page)

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I'm looking forward to getting feedback from users about this feature!

@dakrone
Copy link
Member Author

dakrone commented Aug 31, 2017

@elasticmachine retest this please

@dakrone dakrone merged commit c3da66d into elastic:master Aug 31, 2017
dakrone added a commit that referenced this pull request Aug 31, 2017
* Implement adaptive replica selection

This implements the selection algorithm described in the C3 paper for
determining which copy of the data a query should be routed to.

By using the service time EWMA, response time EWMA, and queue size EWMA we
calculate the score of a node by piggybacking these metrics with each search
request.

Since Elasticsearch lacks the "broadcast to every copy" behavior that Cassandra
has (as mentioned in the C3 paper) to update metrics after a node has been
highly weighted, this implementation adjusts a node's response stats using the
average of the its own and the "best" node's metrics. This is so that a long GC
or other activity that may cause a node's rank to increase dramatically does not
permanently keep a node from having requests routed to it, instead it will
eventually lower its score back to the realm where it is a potential candidate
for new queries.

This feature is off by default and can be turned on with the dynamic setting
`cluster.routing.use_adaptive_replica_selection`.

Relates to #24915, however instead of `b=3` I used `b=4` (after benchmarking)

* Randomly use adaptive replica selection for internal test cluster

* Use an action name *prefix* for retrieving pending requests

* Add unit test for replica selection

* don't use adaptive replica selection in SearchPreferenceIT

* Track client connections in a SearchTransportService instead of TransportService

* Bind `entry` pieces in local variables

* Add javadoc link to C3 paper and javadocs for stat adjustments

* Bind entry's key and value to local variables

* Remove unneeded actionNamePrefix parameter

* Use conns.longValue() instead of cached Long

* Add comments about removing entries from the map

* Pull out bindings for `entry` in IndexShardRoutingTable

* Use .compareTo instead of manually comparing

* add assert for connections not being null and gte to 1

* Copy map for pending search connections instead of "live" map

* Increase the number of pending search requests used for calculating rank when chosen

When a node gets chosen, this increases the number of search counts for the
winning node so that it will not be as likely to be chosen again for
non-concurrent search requests.

* Remove unused HashMap import

* Rename rank -> rankShardsAndUpdateStats

* Rename rankedActiveInitializingShardsIt -> activeInitializingShardsRankedIt

* Instead of precalculating winning node, use "winning" shard from ranked list

* Sort null ranked nodes before nodes that have a rank
jasontedor added a commit to jasontedor/elasticsearch that referenced this pull request Aug 31, 2017
* master:
  Allow abort of bulk items before processing (elastic#26434)
  [Tests] Improve testing of FieldSortBuilder (elastic#26437)
  Upgrade to lucene-7.0.0-snapshot-d94a5f0. (elastic#26441)
  Implement adaptive replica selection (elastic#26128)
  Build: Quiet bwc build output (elastic#26430)
  Migrate Search requests to use Writeable reading strategies (elastic#26428)
  Changed version from 7.0.0-alpha1 to 6.1.0 in the nested sorting serialization check.
  Remove dead path conf BWC code in build
@dakrone dakrone deleted the adaptive-replica-selection-mark2 branch December 13, 2017 20:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants