-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement adaptive replica selection #26128
Changes from 1 commit
bbeda3c
adc6f43
62fe599
34d38e4
e16754d
c7122f8
2ba499e
4f60243
b3dc3d9
205d78f
a41c75a
546f5fb
91f7a12
3ddc0ac
a451763
8674249
fc59d53
1beca3b
e945a5d
6df44f4
62747bf
e289581
dcae338
3d1dd2b
9c55c64
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,18 +29,24 @@ | |
import org.elasticsearch.common.util.set.Sets; | ||
import org.elasticsearch.index.Index; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.node.ResponseCollectorService; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.OptionalDouble; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
import static java.util.Collections.emptyMap; | ||
|
||
|
@@ -261,6 +267,142 @@ public ShardIterator activeInitializingShardsIt(int seed) { | |
return new PlainShardIterator(shardId, ordered); | ||
} | ||
|
||
/** | ||
* Returns an iterator over active and initializing shards, ordered by the adaptive replica | ||
* selection forumla. Making sure though that its random within the active shards of the same | ||
* (or missing) rank, and initializing shards are the last to iterate through. | ||
*/ | ||
public ShardIterator rankedActiveInitializingShardsIt(@Nullable ResponseCollectorService collector, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. down the road we really need to work on the number of objects being created in this process. It can totally be a followup but I think we can abstract it away quite nicely since it's all keyed by the node id and the set of nodes is static. We can use a bytesrefhash with parallel arrays in the future that also prevents all the boxing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I totally agree, it was even at the point where it was very elegantly implemented using streams, however the streams were too slow compared to their imperative counterparts, so it's definitely something I'd like to address in the future There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ++ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: the other impl is called |
||
@Nullable Map<String, Long> nodeSearchCounts) { | ||
final int seed = shuffler.nextSeed(); | ||
if (allInitializingShards.isEmpty()) { | ||
return new PlainShardIterator(shardId, rank(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts)); | ||
} | ||
|
||
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size()); | ||
List<ShardRouting> rankedActiveShards = rank(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts); | ||
ordered.addAll(rankedActiveShards); | ||
List<ShardRouting> rankedInitializingShards = rank(allInitializingShards, collector, nodeSearchCounts); | ||
ordered.addAll(rankedInitializingShards); | ||
return new PlainShardIterator(shardId, ordered); | ||
} | ||
|
||
private static Set<String> getAllNodeIds(final List<ShardRouting> shards) { | ||
final Set<String> nodeIds = new HashSet<>(); | ||
for (ShardRouting shard : shards) { | ||
nodeIds.add(shard.currentNodeId()); | ||
} | ||
return nodeIds; | ||
} | ||
|
||
private static Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> | ||
getNodeStats(final Set<String> nodeIds, final ResponseCollectorService collector) { | ||
|
||
final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = new HashMap<>(nodeIds.size()); | ||
for (String nodeId : nodeIds) { | ||
nodeStats.put(nodeId, collector.getNodeStatistics(nodeId)); | ||
} | ||
return nodeStats; | ||
} | ||
|
||
private static Map<String, Double> rankNodes(final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats, | ||
final Map<String, Long> nodeSearchCounts) { | ||
final Map<String, Double> nodeRanks = new HashMap<>(nodeStats.size()); | ||
for (Map.Entry<String, Optional<ResponseCollectorService.ComputedNodeStats>> entry : nodeStats.entrySet()) { | ||
entry.getValue().ifPresent(stats -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you unwrap the entry here too? |
||
nodeRanks.put(entry.getKey(), stats.rank(nodeSearchCounts.getOrDefault(entry.getKey(), 1L))); | ||
}); | ||
} | ||
return nodeRanks; | ||
} | ||
|
||
private static void adjustStats(final ResponseCollectorService collector, | ||
final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats, | ||
final String minNodeId, | ||
final ResponseCollectorService.ComputedNodeStats minStats) { | ||
if (minNodeId != null) { | ||
for (Map.Entry<String, Optional<ResponseCollectorService.ComputedNodeStats>> entry : nodeStats.entrySet()) { | ||
if (entry.getKey().equals(minNodeId) == false && entry.getValue().isPresent()) { | ||
final ResponseCollectorService.ComputedNodeStats stats = entry.getValue().get(); | ||
final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2; | ||
final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2; | ||
final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: the casts should not be necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They are required, without them you get There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, I had not realized we stored those times as doubles |
||
collector.addNodeStatistics(stats.nodeId, updatedQueue, updatedResponse, updatedService); | ||
} | ||
} | ||
} | ||
} | ||
|
||
private static List<ShardRouting> rank(List<ShardRouting> shards, final ResponseCollectorService collector, | ||
final Map<String, Long> nodeSearchCounts) { | ||
if (collector == null || nodeSearchCounts == null || shards.size() <= 1) { | ||
return shards; | ||
} | ||
|
||
// Retrieve which nodes we can potentially send the query to | ||
final Set<String> nodeIds = getAllNodeIds(shards); | ||
final int nodeCount = nodeIds.size(); | ||
|
||
final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = getNodeStats(nodeIds, collector); | ||
|
||
// Retrieve all the nodes the shards exist on | ||
final Map<String, Double> nodeRanks = rankNodes(nodeStats, nodeSearchCounts); | ||
|
||
String minNode = null; | ||
ResponseCollectorService.ComputedNodeStats minStats = null; | ||
// calculate the "winning" node and its stats (for adjusting other nodes later) | ||
for (Map.Entry<String, Optional<ResponseCollectorService.ComputedNodeStats>> entry : nodeStats.entrySet()) { | ||
if (entry.getValue().isPresent()) { | ||
ResponseCollectorService.ComputedNodeStats stats = entry.getValue().get(); | ||
double rank = stats.rank(nodeSearchCounts.getOrDefault(entry.getKey(), 1L)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd find it easier to read if you pulled entry.getKey and entry.getValue into their own variables with meaningful names |
||
if (minStats == null || rank < minStats.rank(nodeSearchCounts.getOrDefault(minStats.nodeId, 1L))) { | ||
minStats = stats; | ||
minNode = entry.getKey(); | ||
} | ||
} | ||
} | ||
|
||
// sort all shards based on the shard rank | ||
ArrayList<ShardRouting> sortedShards = new ArrayList<>(shards); | ||
Collections.sort(sortedShards, new NodeRankComparator(nodeRanks)); | ||
|
||
// adjust the non-winner nodes' stats so they will get a chance to receive queries | ||
adjustStats(collector, nodeStats, minNode, minStats); | ||
|
||
return sortedShards; | ||
} | ||
|
||
private static class NodeRankComparator implements Comparator<ShardRouting> { | ||
private final Map<String, Double> nodeRanks; | ||
|
||
NodeRankComparator(Map<String, Double> nodeRanks) { | ||
this.nodeRanks = nodeRanks; | ||
} | ||
|
||
@Override | ||
public int compare(ShardRouting s1, ShardRouting s2) { | ||
if (s1.currentNodeId().equals(s2.currentNodeId())) { | ||
// these shards on the the same node | ||
return 0; | ||
} | ||
Double shard1rank = nodeRanks.get(s1.currentNodeId()); | ||
Double shard2rank = nodeRanks.get(s2.currentNodeId()); | ||
if (shard1rank != null && shard2rank != null) { | ||
if (shard1rank < shard2rank) { | ||
return -1; | ||
} else if (shard2rank < shard1rank) { | ||
return 1; | ||
} else { | ||
// Yahtzee! | ||
return 0; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or just |
||
} else { | ||
// One or both of the nodes don't have stats | ||
return 0; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wondering that the fact that the order is not transitive could confuse sorting. For instance if you have s1 and s2 so that s1 < s2 and s3 which is null then s1 and s2 are both equal to s3 but not equal with each other. Maybe we should make nulls always compare less than non nulls so that the order is total? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's okay to keep the contract of treating situations where both nodes do not have stats as equal, I also expect it to be a very very tiny margin of requests since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually javadocs of Arrays.sort that that an IllegalArgumentException may be thrown if the comparator violates the Comparator contract and the Comparator javadocs say that it must implement a total ordering so I think it's important to make nulls compare consistently less than or greater than non-null values. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, it's not strictly nulls (since the nodes do exists, their There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doh they are nulls, I originally wrote it with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay I pushed 3d1dd2b for this |
||
} | ||
} | ||
|
||
/** | ||
* Returns true if no primaries are active or initializing for this shard | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,10 +28,12 @@ | |
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.component.AbstractComponent; | ||
import org.elasticsearch.common.settings.ClusterSettings; | ||
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.index.IndexNotFoundException; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.index.shard.ShardNotFoundException; | ||
import org.elasticsearch.node.ResponseCollectorService; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
|
@@ -43,13 +45,24 @@ | |
|
||
public class OperationRouting extends AbstractComponent { | ||
|
||
public static final Setting<Boolean> USE_ADAPTIVE_REPLICA_SELECTION_SETTING = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this is false we should not collect any statistics in the SearchTransportService either no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should continue to collect the stats, especially since all of them are moving averages it's good to be able to turn ARS on and not have the numbers be wildly inaccurate. What do you think? I could go either way, though I think toggling the collection and and off is going to be more complex There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to keep collecting |
||
Setting.boolSetting("cluster.routing.use_adaptive_replica_selection", false, | ||
Setting.Property.Dynamic, Setting.Property.NodeScope); | ||
|
||
private String[] awarenessAttributes; | ||
private boolean useAdaptiveReplicaSelection; | ||
|
||
public OperationRouting(Settings settings, ClusterSettings clusterSettings) { | ||
super(settings); | ||
this.awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); | ||
this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings); | ||
clusterSettings.addSettingsUpdateConsumer(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, | ||
this::setAwarenessAttributes); | ||
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection); | ||
} | ||
|
||
private void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { | ||
this.useAdaptiveReplicaSelection = useAdaptiveReplicaSelection; | ||
} | ||
|
||
private void setAwarenessAttributes(String[] awarenessAttributes) { | ||
|
@@ -61,19 +74,33 @@ public ShardIterator indexShards(ClusterState clusterState, String index, String | |
} | ||
|
||
public ShardIterator getShards(ClusterState clusterState, String index, String id, @Nullable String routing, @Nullable String preference) { | ||
return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference); | ||
return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, null, null); | ||
} | ||
|
||
public ShardIterator getShards(ClusterState clusterState, String index, int shardId, @Nullable String preference) { | ||
final IndexShardRoutingTable indexShard = clusterState.getRoutingTable().shardRoutingTable(index, shardId); | ||
return preferenceActiveShardIterator(indexShard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference); | ||
return preferenceActiveShardIterator(indexShard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, null, null); | ||
} | ||
|
||
public GroupShardsIterator<ShardIterator> searchShards(ClusterState clusterState, | ||
String[] concreteIndices, | ||
@Nullable Map<String, Set<String>> routing, | ||
@Nullable String preference) { | ||
return searchShards(clusterState, concreteIndices, routing, preference, null, null); | ||
} | ||
|
||
public GroupShardsIterator<ShardIterator> searchShards(ClusterState clusterState, String[] concreteIndices, @Nullable Map<String, Set<String>> routing, @Nullable String preference) { | ||
|
||
public GroupShardsIterator<ShardIterator> searchShards(ClusterState clusterState, | ||
String[] concreteIndices, | ||
@Nullable Map<String, Set<String>> routing, | ||
@Nullable String preference, | ||
@Nullable ResponseCollectorService collectorService, | ||
@Nullable Map<String, Long> nodeCounts) { | ||
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing); | ||
final Set<ShardIterator> set = new HashSet<>(shards.size()); | ||
for (IndexShardRoutingTable shard : shards) { | ||
ShardIterator iterator = preferenceActiveShardIterator(shard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference); | ||
ShardIterator iterator = preferenceActiveShardIterator(shard, | ||
clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, collectorService, nodeCounts); | ||
if (iterator != null) { | ||
set.add(iterator); | ||
} | ||
|
@@ -107,10 +134,17 @@ private Set<IndexShardRoutingTable> computeTargetedShards(ClusterState clusterSt | |
return set; | ||
} | ||
|
||
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId, DiscoveryNodes nodes, @Nullable String preference) { | ||
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId, | ||
DiscoveryNodes nodes, @Nullable String preference, | ||
@Nullable ResponseCollectorService collectorService, | ||
@Nullable Map<String, Long> nodeCounts) { | ||
if (preference == null || preference.isEmpty()) { | ||
if (awarenessAttributes.length == 0) { | ||
return indexShard.activeInitializingShardsRandomIt(); | ||
if (useAdaptiveReplicaSelection) { | ||
return indexShard.rankedActiveInitializingShardsIt(collectorService, nodeCounts); | ||
} else { | ||
return indexShard.activeInitializingShardsRandomIt(); | ||
} | ||
} else { | ||
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes); | ||
} | ||
|
@@ -141,7 +175,11 @@ private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable index | |
// no more preference | ||
if (index == -1 || index == preference.length() - 1) { | ||
if (awarenessAttributes.length == 0) { | ||
return indexShard.activeInitializingShardsRandomIt(); | ||
if (useAdaptiveReplicaSelection) { | ||
return indexShard.rankedActiveInitializingShardsIt(collectorService, nodeCounts); | ||
} else { | ||
return indexShard.activeInitializingShardsRandomIt(); | ||
} | ||
} else { | ||
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is puzzeling to me, why are you useing the
SearchAction.NAME
it's the action that acts as a coordinator and we don't necessarily run this from within the cluster so the counts are expected to be 0 for almost all nodes? I wonder if that should be the shard actions here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right! I think instead it should be the prefix of "indices:data/read/search" so that all of these match it.
I'll make that change and then re-run the benchmarks to see if it affects things, it may mean we can lower
b=4
back down tob=3
, good catch!