From bbeda3c59f133e9ccabaea5cebebc4d0a5fa6df8 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 29 Jun 2017 17:07:37 -0600 Subject: [PATCH 01/22] Implement adaptive replica selection This implements the selection algorithm described in the C3 paper for determining which copy of the data a query should be routed to. By using the service time EWMA, response time EWMA, and queue size EWMA we calculate the score of a node by piggybacking these metrics with each search request. Since Elasticsearch lacks the "broadcast to every copy" behavior that Cassandra has (as mentioned in the C3 paper) to update metrics after a node has been highly weighted, this implementation adjusts a node's response stats using the average of the its own and the "best" node's metrics. This is so that a long GC or other activity that may cause a node's rank to increase dramatically does not permanently keep a node from having requests routed to it, instead it will eventually lower its score back to the realm where it is a potential candidate for new queries. This feature is off by default and can be turned on with the dynamic setting `cluster.routing.use_adaptive_replica_selection`. Relates to #24915, however instead of `b=3` I used `b=4` (after benchmarking) --- .../search/SearchExecutionStatsCollector.java | 2 +- .../action/search/SearchTransportService.java | 8 + .../action/search/TransportSearchAction.java | 4 +- .../routing/IndexShardRoutingTable.java | 142 ++++++++++++++++++ .../cluster/routing/OperationRouting.java | 52 ++++++- .../common/settings/ClusterSettings.java | 4 +- .../common/util/concurrent/EsExecutors.java | 4 - .../QueueResizingEsThreadPoolExecutor.java | 4 +- .../node/ResponseCollectorService.java | 73 +++++++-- .../search/query/QueryPhase.java | 2 +- .../transport/TransportService.java | 16 ++ ...ueueResizingEsThreadPoolExecutorTests.java | 12 +- .../node/ResponseCollectorServiceTests.java | 4 +- .../search/query/QueryPhaseTests.java | 2 +- 14 files changed, 292 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java b/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java index 72c3d5eaab6d2..0ffad5aa4065b 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java @@ -61,7 +61,7 @@ public void onResponse(SearchPhaseResult response) { final int queueSize = queryResult.nodeQueueSize(); final long responseDuration = System.nanoTime() - startNanos; // EWMA/queue size may be -1 if the query node doesn't support capturing it - if (serviceTimeEWMA > 0 && queueSize > 0) { + if (serviceTimeEWMA > 0 && queueSize >= 0) { collector.addNodeStatistics(nodeId, queueSize, responseDuration, serviceTimeEWMA); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index f4816bb2a21f4..ab23286c4f775 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -57,6 +57,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Map; import java.util.function.BiFunction; import java.util.function.Supplier; @@ -193,6 +194,13 @@ public RemoteClusterService getRemoteClusterService() { return transportService.getRemoteClusterService(); } + /** + * Return a map of nodeId to pending number of requests for the given action name + */ + public Map getPendingRequests(final String actionName) { + return transportService.getPendingRequests(actionName); + } + static class ScrollFreeContextRequest extends TransportRequest { private long id; diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index e1ae2a5a66866..7a993b519bab8 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.ClusterState; @@ -284,8 +285,9 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea for (int i = 0; i < indices.length; i++) { concreteIndices[i] = indices[i].getName(); } + Map nodeSearchCounts = searchTransportService.getPendingRequests(SearchAction.NAME); GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState, - concreteIndices, routingMap, searchRequest.preference()); + concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts); GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, localIndices, remoteShardIterators); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 8ed06736b6bb3..653aef69dda3a 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -29,18 +29,24 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.ResponseCollectorService; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; +import java.util.OptionalDouble; import java.util.Set; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; @@ -261,6 +267,142 @@ public ShardIterator activeInitializingShardsIt(int seed) { return new PlainShardIterator(shardId, ordered); } + /** + * Returns an iterator over active and initializing shards, ordered by the adaptive replica + * selection forumla. Making sure though that its random within the active shards of the same + * (or missing) rank, and initializing shards are the last to iterate through. + */ + public ShardIterator rankedActiveInitializingShardsIt(@Nullable ResponseCollectorService collector, + @Nullable Map nodeSearchCounts) { + final int seed = shuffler.nextSeed(); + if (allInitializingShards.isEmpty()) { + return new PlainShardIterator(shardId, rank(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts)); + } + + ArrayList ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size()); + List rankedActiveShards = rank(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts); + ordered.addAll(rankedActiveShards); + List rankedInitializingShards = rank(allInitializingShards, collector, nodeSearchCounts); + ordered.addAll(rankedInitializingShards); + return new PlainShardIterator(shardId, ordered); + } + + private static Set getAllNodeIds(final List shards) { + final Set nodeIds = new HashSet<>(); + for (ShardRouting shard : shards) { + nodeIds.add(shard.currentNodeId()); + } + return nodeIds; + } + + private static Map> + getNodeStats(final Set nodeIds, final ResponseCollectorService collector) { + + final Map> nodeStats = new HashMap<>(nodeIds.size()); + for (String nodeId : nodeIds) { + nodeStats.put(nodeId, collector.getNodeStatistics(nodeId)); + } + return nodeStats; + } + + private static Map rankNodes(final Map> nodeStats, + final Map nodeSearchCounts) { + final Map nodeRanks = new HashMap<>(nodeStats.size()); + for (Map.Entry> entry : nodeStats.entrySet()) { + entry.getValue().ifPresent(stats -> { + nodeRanks.put(entry.getKey(), stats.rank(nodeSearchCounts.getOrDefault(entry.getKey(), 1L))); + }); + } + return nodeRanks; + } + + private static void adjustStats(final ResponseCollectorService collector, + final Map> nodeStats, + final String minNodeId, + final ResponseCollectorService.ComputedNodeStats minStats) { + if (minNodeId != null) { + for (Map.Entry> entry : nodeStats.entrySet()) { + if (entry.getKey().equals(minNodeId) == false && entry.getValue().isPresent()) { + final ResponseCollectorService.ComputedNodeStats stats = entry.getValue().get(); + final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2; + final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2; + final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2; + collector.addNodeStatistics(stats.nodeId, updatedQueue, updatedResponse, updatedService); + } + } + } + } + + private static List rank(List shards, final ResponseCollectorService collector, + final Map nodeSearchCounts) { + if (collector == null || nodeSearchCounts == null || shards.size() <= 1) { + return shards; + } + + // Retrieve which nodes we can potentially send the query to + final Set nodeIds = getAllNodeIds(shards); + final int nodeCount = nodeIds.size(); + + final Map> nodeStats = getNodeStats(nodeIds, collector); + + // Retrieve all the nodes the shards exist on + final Map nodeRanks = rankNodes(nodeStats, nodeSearchCounts); + + String minNode = null; + ResponseCollectorService.ComputedNodeStats minStats = null; + // calculate the "winning" node and its stats (for adjusting other nodes later) + for (Map.Entry> entry : nodeStats.entrySet()) { + if (entry.getValue().isPresent()) { + ResponseCollectorService.ComputedNodeStats stats = entry.getValue().get(); + double rank = stats.rank(nodeSearchCounts.getOrDefault(entry.getKey(), 1L)); + if (minStats == null || rank < minStats.rank(nodeSearchCounts.getOrDefault(minStats.nodeId, 1L))) { + minStats = stats; + minNode = entry.getKey(); + } + } + } + + // sort all shards based on the shard rank + ArrayList sortedShards = new ArrayList<>(shards); + Collections.sort(sortedShards, new NodeRankComparator(nodeRanks)); + + // adjust the non-winner nodes' stats so they will get a chance to receive queries + adjustStats(collector, nodeStats, minNode, minStats); + + return sortedShards; + } + + private static class NodeRankComparator implements Comparator { + private final Map nodeRanks; + + NodeRankComparator(Map nodeRanks) { + this.nodeRanks = nodeRanks; + } + + @Override + public int compare(ShardRouting s1, ShardRouting s2) { + if (s1.currentNodeId().equals(s2.currentNodeId())) { + // these shards on the the same node + return 0; + } + Double shard1rank = nodeRanks.get(s1.currentNodeId()); + Double shard2rank = nodeRanks.get(s2.currentNodeId()); + if (shard1rank != null && shard2rank != null) { + if (shard1rank < shard2rank) { + return -1; + } else if (shard2rank < shard1rank) { + return 1; + } else { + // Yahtzee! + return 0; + } + } else { + // One or both of the nodes don't have stats + return 0; + } + } + } + /** * Returns true if no primaries are active or initializing for this shard */ diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 8a88ee1751a14..0d67394c09974 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -28,10 +28,12 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.node.ResponseCollectorService; import java.util.ArrayList; import java.util.Arrays; @@ -43,13 +45,24 @@ public class OperationRouting extends AbstractComponent { + public static final Setting USE_ADAPTIVE_REPLICA_SELECTION_SETTING = + Setting.boolSetting("cluster.routing.use_adaptive_replica_selection", false, + Setting.Property.Dynamic, Setting.Property.NodeScope); + private String[] awarenessAttributes; + private boolean useAdaptiveReplicaSelection; public OperationRouting(Settings settings, ClusterSettings clusterSettings) { super(settings); this.awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); + this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes); + clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection); + } + + private void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { + this.useAdaptiveReplicaSelection = useAdaptiveReplicaSelection; } private void setAwarenessAttributes(String[] awarenessAttributes) { @@ -61,19 +74,33 @@ public ShardIterator indexShards(ClusterState clusterState, String index, String } public ShardIterator getShards(ClusterState clusterState, String index, String id, @Nullable String routing, @Nullable String preference) { - return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference); + return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, null, null); } public ShardIterator getShards(ClusterState clusterState, String index, int shardId, @Nullable String preference) { final IndexShardRoutingTable indexShard = clusterState.getRoutingTable().shardRoutingTable(index, shardId); - return preferenceActiveShardIterator(indexShard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference); + return preferenceActiveShardIterator(indexShard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, null, null); + } + + public GroupShardsIterator searchShards(ClusterState clusterState, + String[] concreteIndices, + @Nullable Map> routing, + @Nullable String preference) { + return searchShards(clusterState, concreteIndices, routing, preference, null, null); } - public GroupShardsIterator searchShards(ClusterState clusterState, String[] concreteIndices, @Nullable Map> routing, @Nullable String preference) { + + public GroupShardsIterator searchShards(ClusterState clusterState, + String[] concreteIndices, + @Nullable Map> routing, + @Nullable String preference, + @Nullable ResponseCollectorService collectorService, + @Nullable Map nodeCounts) { final Set shards = computeTargetedShards(clusterState, concreteIndices, routing); final Set set = new HashSet<>(shards.size()); for (IndexShardRoutingTable shard : shards) { - ShardIterator iterator = preferenceActiveShardIterator(shard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference); + ShardIterator iterator = preferenceActiveShardIterator(shard, + clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, collectorService, nodeCounts); if (iterator != null) { set.add(iterator); } @@ -107,10 +134,17 @@ private Set computeTargetedShards(ClusterState clusterSt return set; } - private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId, DiscoveryNodes nodes, @Nullable String preference) { + private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId, + DiscoveryNodes nodes, @Nullable String preference, + @Nullable ResponseCollectorService collectorService, + @Nullable Map nodeCounts) { if (preference == null || preference.isEmpty()) { if (awarenessAttributes.length == 0) { - return indexShard.activeInitializingShardsRandomIt(); + if (useAdaptiveReplicaSelection) { + return indexShard.rankedActiveInitializingShardsIt(collectorService, nodeCounts); + } else { + return indexShard.activeInitializingShardsRandomIt(); + } } else { return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes); } @@ -141,7 +175,11 @@ private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable index // no more preference if (index == -1 || index == preference.length() - 1) { if (awarenessAttributes.length == 0) { - return indexShard.activeInitializingShardsRandomIt(); + if (useAdaptiveReplicaSelection) { + return indexShard.rankedActiveInitializingShardsIt(collectorService, nodeCounts); + } else { + return indexShard.activeInitializingShardsRandomIt(); + } } else { return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes); } diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index e0665ae2c8223..048d05dddfc30 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; @@ -402,6 +403,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING, ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING, FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE, - Node.BREAKER_TYPE_KEY + Node.BREAKER_TYPE_KEY, + OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING ))); } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index b37a6e14f02b3..45d9a208284f5 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -92,10 +92,6 @@ public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapa public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize, int maxQueueSize, int frameSize, TimeValue targetedResponseTime, ThreadFactory threadFactory, ThreadContext contextHolder) { - if (initialQueueCapacity == minQueueSize && initialQueueCapacity == maxQueueSize) { - return newFixed(name, size, initialQueueCapacity, threadFactory, contextHolder); - } - if (initialQueueCapacity <= 0) { throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " + initialQueueCapacity); diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java index 1f694d73fa709..2d1be51824efe 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java @@ -79,9 +79,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto this.minQueueSize = minQueueSize; this.maxQueueSize = maxQueueSize; this.targetedResponseTimeNanos = targetedResponseTime.getNanos(); - // We choose to start the EWMA with the targeted response time, reasoning that it is a - // better start point for a realistic task execution time than starting at 0 - this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, targetedResponseTimeNanos); + this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); logger.debug("thread pool [{}] will adjust queue by [{}] when determining automatic queue size", name, QUEUE_ADJUSTMENT_AMOUNT); } diff --git a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java index 1afbd3b299755..26957b21f79bc 100644 --- a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java +++ b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java @@ -31,7 +31,9 @@ import java.util.Collections; import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; @@ -65,13 +67,11 @@ void removeNode(String nodeId) { } public void addNodeStatistics(String nodeId, int queueSize, long responseTimeNanos, long avgServiceTimeNanos) { - NodeStatistics nodeStats = nodeIdToStats.get(nodeId); nodeIdToStats.compute(nodeId, (id, ns) -> { if (ns == null) { ExponentiallyWeightedMovingAverage queueEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, queueSize); ExponentiallyWeightedMovingAverage responseEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, responseTimeNanos); - NodeStatistics newStats = new NodeStatistics(nodeId, queueEWMA, responseEWMA, avgServiceTimeNanos); - return newStats; + return new NodeStatistics(nodeId, queueEWMA, responseEWMA, avgServiceTimeNanos); } else { ns.queueSize.addValue((double) queueSize); ns.responseTime.addValue((double) responseTimeNanos); @@ -82,39 +82,92 @@ public void addNodeStatistics(String nodeId, int queueSize, long responseTimeNan } public Map getAllNodeStatistics() { + final int clientNum = nodeIdToStats.size(); // Transform the mutable object internally used for accounting into the computed version Map nodeStats = new HashMap<>(nodeIdToStats.size()); nodeIdToStats.forEach((k, v) -> { - nodeStats.put(k, new ComputedNodeStats(v)); + nodeStats.put(k, new ComputedNodeStats(clientNum, v)); }); return nodeStats; } + /** + * Optionally return a {@code NodeStatistics} for the given nodeid, if + * response information exists for the given node. Returns an empty + * {@code Optional} if the node was not found. + */ + public Optional getNodeStatistics(final String nodeId) { + final int clientNum = nodeIdToStats.size(); + return Optional.ofNullable(nodeIdToStats.get(nodeId)).map(ns -> new ComputedNodeStats(clientNum, ns)); + } + /** * Struct-like class encapsulating a point-in-time snapshot of a particular * node's statistics. This includes the EWMA of queue size, response time, * and service time. */ public static class ComputedNodeStats { + // We store timestamps with nanosecond precision, however, the + // formula specifies milliseconds, therefore we need to convert + // the values so the times don't unduely weight the formula + private final double FACTOR = 1000000.0; + private final int clientNum; + + private double cachedRank = 0; + public final String nodeId; - public final double queueSize; + public final int queueSize; public final double responseTime; public final double serviceTime; - ComputedNodeStats(NodeStatistics nodeStats) { + ComputedNodeStats(int clientNum, NodeStatistics nodeStats) { + this.clientNum = clientNum; this.nodeId = nodeStats.nodeId; - this.queueSize = nodeStats.queueSize.getAverage(); + this.queueSize = (int) nodeStats.queueSize.getAverage(); this.responseTime = nodeStats.responseTime.getAverage(); this.serviceTime = nodeStats.serviceTime; } + private double innerRank(long outstandingRequests) { + // this is a placeholder value, the concurrency compensation is + // defined as the number of outstanding requests from the client + // to the node times the number of clients in the system + double concurrencyCompensation = outstandingRequests * clientNum; + + // Cubic queue adjustment. The paper goes for 3 but we need queue to + // weigh heavier, so this increases to 4. + int queueAdjustmentFactor = 4; + + // EWMA of queue size + double qBar = queueSize; + double qHatS = 1 + concurrencyCompensation + qBar; + + // EWMA of response time + double rS = responseTime / FACTOR; + // EWMA of service time + double muBarS = serviceTime / FACTOR; + + // The final formula + double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS); + return rank; + } + + public double rank(long outstandingRequests) { + if (cachedRank == 0) { + cachedRank = innerRank(outstandingRequests); + } + return cachedRank; + } + @Override public String toString() { StringBuilder sb = new StringBuilder("ComputedNodeStats["); sb.append(nodeId).append("]("); - sb.append("queue: ").append(queueSize); - sb.append(", response time: ").append(responseTime); - sb.append(", service time: ").append(serviceTime); + sb.append("nodes: ").append(clientNum); + sb.append(", queue: ").append(queueSize); + sb.append(", response time: ").append(String.format(Locale.ROOT, "%.1f", responseTime)); + sb.append(", service time: ").append(String.format(Locale.ROOT, "%.1f", serviceTime)); + sb.append(", rank: ").append(String.format(Locale.ROOT, "%.1f", rank(1))); sb.append(")"); return sb.toString(); } diff --git a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 9fdaae098b8de..500612974c851 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -283,7 +283,7 @@ static boolean execute(SearchContext searchContext, final IndexSearcher searcher ctx.postProcess(result, shouldCollect); } EsThreadPoolExecutor executor = (EsThreadPoolExecutor) - searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);; + searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); if (executor instanceof QueueResizingEsThreadPoolExecutor) { QueueResizingEsThreadPoolExecutor rExecutor = (QueueResizingEsThreadPoolExecutor) executor; queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 13034355366cf..b0517f3c0b389 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -56,6 +56,7 @@ import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -560,6 +561,21 @@ public void sendChildRequest(final Transport.Conne } + /** + * Return a map of nodeId to pending number of requests for the given action name + */ + public Map getPendingRequests(final String actionName) { + Map nodeCounts = new HashMap<>(); + for (Map.Entry entry : clientHandlers.entrySet()) { + RequestHolder reqHolder = entry.getValue(); + if (actionName.equals(reqHolder.action())) { + String nodeId = reqHolder.connection().getNode().getId(); + nodeCounts.put(nodeId, nodeCounts.getOrDefault(nodeId, 0L) + 1); + } + } + return nodeCounts; + } + private void sendRequestInternal(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java index 5365e1bb90931..125cb572ea54d 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java @@ -198,26 +198,26 @@ public void testExecutionEWMACalculation() throws Exception { executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(1000000L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(0L)); executeTask(executor, 1); assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(700030L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(30L)); }); executeTask(executor, 1); assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(490050L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(51L)); }); executeTask(executor, 1); assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(343065L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(65L)); }); executeTask(executor, 1); assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(240175L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(75L)); }); executeTask(executor, 1); assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(168153L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(83L)); }); executor.shutdown(); diff --git a/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java b/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java index 2ff3d15624707..767dae2de1e37 100644 --- a/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java +++ b/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java @@ -67,7 +67,7 @@ public void testNodeStats() throws Exception { collector.addNodeStatistics("node1", 1, 100, 10); Map nodeStats = collector.getAllNodeStatistics(); assertTrue(nodeStats.containsKey("node1")); - assertThat(nodeStats.get("node1").queueSize, equalTo(1.0)); + assertThat(nodeStats.get("node1").queueSize, equalTo(1)); assertThat(nodeStats.get("node1").responseTime, equalTo(100.0)); assertThat(nodeStats.get("node1").serviceTime, equalTo(10.0)); } @@ -113,7 +113,7 @@ public void testConcurrentAddingAndRemoving() throws Exception { logger.info("--> got stats: {}", nodeStats); for (String nodeId : nodes) { if (nodeStats.containsKey(nodeId)) { - assertThat(nodeStats.get(nodeId).queueSize, greaterThan(0.0)); + assertThat(nodeStats.get(nodeId).queueSize, greaterThan(0)); assertThat(nodeStats.get(nodeId).responseTime, greaterThan(0.0)); assertThat(nodeStats.get(nodeId).serviceTime, greaterThan(0.0)); } diff --git a/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index feca42e5495b3..4128c4a6aa68d 100644 --- a/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -226,7 +226,7 @@ public void testQueryCapturesThreadPoolStats() throws Exception { QueryPhase.execute(context, contextSearcher, checkCancelled -> {}, null); QuerySearchResult results = context.queryResult(); - assertThat(results.serviceTimeEWMA(), greaterThan(0L)); + assertThat(results.serviceTimeEWMA(), greaterThanOrEqualTo(0L)); assertThat(results.nodeQueueSize(), greaterThanOrEqualTo(0)); reader.close(); dir.close(); From adc6f43f1bbb2e9d7efad9546a059f3512104544 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 10 Aug 2017 16:19:30 -0600 Subject: [PATCH 02/22] Randomly use adaptive replica selection for internal test cluster --- .../main/java/org/elasticsearch/test/InternalTestCluster.java | 1 + 1 file changed, 1 insertion(+) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 458f77e3ed324..f196374c6236d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -336,6 +336,7 @@ public InternalTestCluster(long clusterSeed, Path baseDir, builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b"); // Some tests make use of scripting quite a bit, so increase the limit for integration tests builder.put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 1000); + builder.put(OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING.getKey(), random.nextBoolean()); if (TEST_NIGHTLY) { builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), RandomNumbers.randomIntBetween(random, 5, 10)); builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), RandomNumbers.randomIntBetween(random, 5, 10)); From 62fe5998501eafd04eabbd91f8161349f714d858 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Fri, 11 Aug 2017 10:45:29 -0600 Subject: [PATCH 03/22] Use an action name *prefix* for retrieving pending requests --- .../elasticsearch/action/search/SearchTransportService.java | 6 +++--- .../org/elasticsearch/node/ResponseCollectorService.java | 6 +++--- .../java/org/elasticsearch/transport/TransportService.java | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index ab23286c4f775..8ac8593a51d9d 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -195,10 +195,10 @@ public RemoteClusterService getRemoteClusterService() { } /** - * Return a map of nodeId to pending number of requests for the given action name + * Return a map of nodeId to pending number of requests for the given action name prefix */ - public Map getPendingRequests(final String actionName) { - return transportService.getPendingRequests(actionName); + public Map getPendingRequests(final String actionNamePrefix) { + return transportService.getPendingRequests(actionNamePrefix); } static class ScrollFreeContextRequest extends TransportRequest { diff --git a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java index 26957b21f79bc..0766bc36dd237 100644 --- a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java +++ b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java @@ -134,9 +134,9 @@ private double innerRank(long outstandingRequests) { // to the node times the number of clients in the system double concurrencyCompensation = outstandingRequests * clientNum; - // Cubic queue adjustment. The paper goes for 3 but we need queue to - // weigh heavier, so this increases to 4. - int queueAdjustmentFactor = 4; + // Cubic queue adjustment factor. The paper chose 3 though we could + // potentially make this configurable if desired. + int queueAdjustmentFactor = 3; // EWMA of queue size double qBar = queueSize; diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index b0517f3c0b389..325b67fb4676c 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -562,13 +562,13 @@ public void sendChildRequest(final Transport.Conne } /** - * Return a map of nodeId to pending number of requests for the given action name + * Return a map of nodeId to pending number of requests for the given action name prefix */ - public Map getPendingRequests(final String actionName) { + public Map getPendingRequests(final String actionNamePrefix) { Map nodeCounts = new HashMap<>(); for (Map.Entry entry : clientHandlers.entrySet()) { RequestHolder reqHolder = entry.getValue(); - if (actionName.equals(reqHolder.action())) { + if (reqHolder.action().startsWith(actionNamePrefix)) { String nodeId = reqHolder.connection().getNode().getId(); nodeCounts.put(nodeId, nodeCounts.getOrDefault(nodeId, 0L) + 1); } From 34d38e40b033ac986295627feca3050a6f4ed99c Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 14 Aug 2017 15:58:43 -0600 Subject: [PATCH 04/22] Add unit test for replica selection --- .../cluster/routing/OperationRouting.java | 2 +- .../routing/OperationRoutingTests.java | 88 +++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 0d67394c09974..528a1c1c9c4a4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -61,7 +61,7 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) { clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection); } - private void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { + void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { this.useAdaptiveReplicaSelection = useAdaptiveReplicaSelection; } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java index be7ebd4a4c298..498edee12f90a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java @@ -28,8 +28,10 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -490,4 +492,90 @@ public void testThatOnlyNodesSupportNodeIds() throws InterruptedException, IOExc } } + public void testAdaptiveReplicaSelection() throws Exception { + final int numIndices = 1; + final int numShards = 1; + final int numReplicas = 2; + final String[] indexNames = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + indexNames[i] = "test" + i; + } + ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(indexNames, numShards, numReplicas); + final int numRepeatedSearches = 4; + OperationRouting opRouting = new OperationRouting(Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + opRouting.setUseAdaptiveReplicaSelection(true); + List searchedShards = new ArrayList<>(numShards); + Set selectedNodes = new HashSet<>(numShards); + TestThreadPool threadPool = new TestThreadPool("testThatOnlyNodesSupportNodeIds"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + ResponseCollectorService collector = new ResponseCollectorService(Settings.EMPTY, clusterService); + Map outstandingRequests = new HashMap<>(); + GroupShardsIterator groupIterator = opRouting.searchShards(state, + indexNames, null, null, collector, outstandingRequests); + + assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); + + // Test that the shards use a round-robin pattern when there are no stats + assertThat(groupIterator.get(0).size(), equalTo(numReplicas + 1)); + ShardRouting firstChoice = groupIterator.get(0).nextOrNull(); + assertNotNull(firstChoice); + searchedShards.add(firstChoice); + selectedNodes.add(firstChoice.currentNodeId()); + + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + + assertThat(groupIterator.size(), equalTo(numIndices * numShards)); + ShardRouting secondChoice = groupIterator.get(0).nextOrNull(); + assertNotNull(secondChoice); + searchedShards.add(secondChoice); + selectedNodes.add(secondChoice.currentNodeId()); + + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + + assertThat(groupIterator.size(), equalTo(numIndices * numShards)); + ShardRouting thirdChoice = groupIterator.get(0).nextOrNull(); + assertNotNull(thirdChoice); + searchedShards.add(thirdChoice); + selectedNodes.add(thirdChoice.currentNodeId()); + + // All three shards should have been separate, because there are no stats yet so they're all ranked equally. + assertThat(searchedShards.size(), equalTo(3)); + + // Now let's start adding node metrics, since that will affect which node is chosen + collector.addNodeStatistics("node_0", 2, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos()); + collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(100).nanos(), TimeValue.timeValueMillis(50).nanos()); + collector.addNodeStatistics("node_2", 1, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(200).nanos()); + outstandingRequests.put("node_0", 1L); + outstandingRequests.put("node_1", 1L); + outstandingRequests.put("node_2", 1L); + + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + ShardRouting shardChoice = groupIterator.get(0).nextOrNull(); + // node 1 should be the lowest ranked node to start + assertThat(shardChoice.currentNodeId(), equalTo("node_1")); + + // node 1 starts getting more loaded... + collector.addNodeStatistics("node_1", 2, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + shardChoice = groupIterator.get(0).nextOrNull(); + assertThat(shardChoice.currentNodeId(), equalTo("node_1")); + + // and more loaded... + collector.addNodeStatistics("node_1", 3, TimeValue.timeValueMillis(250).nanos(), TimeValue.timeValueMillis(200).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + shardChoice = groupIterator.get(0).nextOrNull(); + assertThat(shardChoice.currentNodeId(), equalTo("node_1")); + + // and even more + collector.addNodeStatistics("node_1", 4, TimeValue.timeValueMillis(300).nanos(), TimeValue.timeValueMillis(250).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + shardChoice = groupIterator.get(0).nextOrNull(); + // finally, node 2 is choosen instead + assertThat(shardChoice.currentNodeId(), equalTo("node_2")); + + IOUtils.close(clusterService); + terminate(threadPool); + } + } From e16754d9351aac5f8a6f8db753b2f63ce02a91ba Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 14 Aug 2017 17:32:46 -0600 Subject: [PATCH 05/22] don't use adaptive replica selection in SearchPreferenceIT --- .../search/preference/SearchPreferenceIT.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceIT.java b/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceIT.java index 9163ee572cfc2..6478446a1a254 100644 --- a/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceIT.java +++ b/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; @@ -49,6 +50,13 @@ @ESIntegTestCase.ClusterScope(minNumDataNodes = 2) public class SearchPreferenceIT extends ESIntegTestCase { + + @Override + public Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING.getKey(), false).build(); + } + // see #2896 public void testStopOneNodePreferenceWithRedState() throws InterruptedException, IOException { assertAcked(prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", cluster().numDataNodes()+2).put("index.number_of_replicas", 0))); From c7122f860a8abcc3af8b10c61703c0e4d7aff1be Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 21 Aug 2017 14:42:12 -0600 Subject: [PATCH 06/22] Track client connections in a SearchTransportService instead of TransportService --- .../action/search/SearchTransportService.java | 54 +++++++++++++++---- .../transport/TransportService.java | 15 ------ 2 files changed, 44 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 8ac8593a51d9d..3ecd1804f5f9d 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -50,6 +51,7 @@ import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TaskAwareTransportRequestHandler; import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; @@ -57,6 +59,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Collections; import java.util.Map; import java.util.function.BiFunction; import java.util.function.Supplier; @@ -81,6 +84,7 @@ public class SearchTransportService extends AbstractComponent { private final TransportService transportService; private final BiFunction responseWrapper; + private final Map clientConnections = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); public SearchTransportService(Settings settings, TransportService transportService, BiFunction responseWrapper) { @@ -132,7 +136,7 @@ public void sendClearAllScrollContexts(Transport.Connection connection, final Ac public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, DFS_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, DfsSearchResult::new)); + new ConnectionCountingHandler<>(listener, DfsSearchResult::new, clientConnections, connection.getNode().getId())); } public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, @@ -144,25 +148,26 @@ public void sendExecuteQuery(Transport.Connection connection, final ShardSearchT final ActionListener handler = responseWrapper.apply(connection, listener); transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(handler, supplier)); + new ConnectionCountingHandler<>(handler, supplier, clientConnections, connection.getNode().getId())); } public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, QUERY_ID_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, QuerySearchResult::new)); + new ConnectionCountingHandler<>(listener, QuerySearchResult::new, clientConnections, connection.getNode().getId())); } public void sendExecuteScrollQuery(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, QUERY_SCROLL_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new)); + new ConnectionCountingHandler<>(listener, ScrollQuerySearchResult::new, clientConnections, connection.getNode().getId())); } public void sendExecuteScrollFetch(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, QUERY_FETCH_SCROLL_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new)); + new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new, + clientConnections, connection.getNode().getId())); } public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task, @@ -178,16 +183,17 @@ public void sendExecuteFetchScroll(Transport.Connection connection, final ShardF private void sendExecuteFetch(Transport.Connection connection, String action, final ShardFetchRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, action, request, task, - new ActionListenerResponseHandler<>(listener, FetchSearchResult::new)); + new ConnectionCountingHandler<>(listener, FetchSearchResult::new, clientConnections, connection.getNode().getId())); } /** * Used by {@link TransportSearchAction} to send the expand queries (field collapsing). */ void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task, - final ActionListener listener) { - transportService.sendChildRequest(transportService.getConnection(transportService.getLocalNode()), MultiSearchAction.NAME, request, - task, new ActionListenerResponseHandler<>(listener, MultiSearchResponse::new)); + final ActionListener listener) { + final Transport.Connection connection = transportService.getConnection(transportService.getLocalNode()); + transportService.sendChildRequest(connection, MultiSearchAction.NAME, request, task, + new ConnectionCountingHandler<>(listener, MultiSearchResponse::new, clientConnections, connection.getNode().getId())); } public RemoteClusterService getRemoteClusterService() { @@ -198,7 +204,7 @@ public RemoteClusterService getRemoteClusterService() { * Return a map of nodeId to pending number of requests for the given action name prefix */ public Map getPendingRequests(final String actionNamePrefix) { - return transportService.getPendingRequests(actionNamePrefix); + return Collections.unmodifiableMap(clientConnections); } static class ScrollFreeContextRequest extends TransportRequest { @@ -486,4 +492,32 @@ Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { return transportService.getRemoteClusterService().getConnection(node, clusterAlias); } } + + final class ConnectionCountingHandler extends ActionListenerResponseHandler { + private final Map clientConnections; + private final String nodeId; + + ConnectionCountingHandler(final ActionListener listener, final Supplier responseSupplier, + final Map clientConnections, final String nodeId) { + super(listener, responseSupplier); + this.clientConnections = clientConnections; + this.nodeId = nodeId; + // Increment the number of connections for this node by one + clientConnections.compute(nodeId, (id, conns) -> conns == null ? 1 : conns + 1); + } + + @Override + public void handleResponse(Response response) { + super.handleResponse(response); + // Decrement the number of connections or remove it entirely if there are no more connections + clientConnections.computeIfPresent(nodeId, (id, conns) -> conns == 1 ? null : conns - 1); + } + + @Override + public void handleException(TransportException e) { + super.handleException(e); + // Decrement the number of connections or remove it entirely if there are no more connections + clientConnections.computeIfPresent(nodeId, (id, conns) -> conns == 1 ? null : conns - 1); + } + } } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 325b67fb4676c..4bf06374004c1 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -561,21 +561,6 @@ public void sendChildRequest(final Transport.Conne } - /** - * Return a map of nodeId to pending number of requests for the given action name prefix - */ - public Map getPendingRequests(final String actionNamePrefix) { - Map nodeCounts = new HashMap<>(); - for (Map.Entry entry : clientHandlers.entrySet()) { - RequestHolder reqHolder = entry.getValue(); - if (reqHolder.action().startsWith(actionNamePrefix)) { - String nodeId = reqHolder.connection().getNode().getId(); - nodeCounts.put(nodeId, nodeCounts.getOrDefault(nodeId, 0L) + 1); - } - } - return nodeCounts; - } - private void sendRequestInternal(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, From 2ba499e09ebd3f5f4994a26efd94ada882b069ef Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 22 Aug 2017 11:06:05 -0600 Subject: [PATCH 07/22] Bind `entry` pieces in local variables --- .../cluster/routing/IndexShardRoutingTable.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 653aef69dda3a..c2ce811be8ac3 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -352,12 +352,14 @@ private static List rank(List shards, final Response ResponseCollectorService.ComputedNodeStats minStats = null; // calculate the "winning" node and its stats (for adjusting other nodes later) for (Map.Entry> entry : nodeStats.entrySet()) { - if (entry.getValue().isPresent()) { - ResponseCollectorService.ComputedNodeStats stats = entry.getValue().get(); - double rank = stats.rank(nodeSearchCounts.getOrDefault(entry.getKey(), 1L)); + final String nodeId = entry.getKey(); + final Optional maybeNodeStats = entry.getValue(); + if (maybeNodeStats.isPresent()) { + ResponseCollectorService.ComputedNodeStats stats = maybeNodeStats.get(); + double rank = stats.rank(nodeSearchCounts.getOrDefault(nodeId, 1L)); if (minStats == null || rank < minStats.rank(nodeSearchCounts.getOrDefault(minStats.nodeId, 1L))) { minStats = stats; - minNode = entry.getKey(); + minNode = nodeId; } } } From 4f6024301ca6c31bb87221db29a0389d6f10748c Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 22 Aug 2017 11:13:54 -0600 Subject: [PATCH 08/22] Add javadoc link to C3 paper and javadocs for stat adjustments --- .../cluster/routing/IndexShardRoutingTable.java | 11 +++++++++++ .../elasticsearch/node/ResponseCollectorService.java | 4 ++++ 2 files changed, 15 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index c2ce811be8ac3..4a21af0114456 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -316,6 +316,17 @@ private static Map rankNodes(final Map> nodeStats, final String minNodeId, diff --git a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java index 0766bc36dd237..3d8f7a5d791d6 100644 --- a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java +++ b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java @@ -128,6 +128,10 @@ public static class ComputedNodeStats { this.serviceTime = nodeStats.serviceTime; } + /** + * Rank this copy of the data, according to the adaptive replica selection formula from the C3 paper + * {@link https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf} + */ private double innerRank(long outstandingRequests) { // this is a placeholder value, the concurrency compensation is // defined as the number of outstanding requests from the client From b3dc3d92fba425b8c8249eb7db0cf44de46685bc Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 22 Aug 2017 11:15:53 -0600 Subject: [PATCH 09/22] Bind entry's key and value to local variables --- .../cluster/routing/IndexShardRoutingTable.java | 8 +++++--- .../org/elasticsearch/node/ResponseCollectorService.java | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 4a21af0114456..1e2b12c48593b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -333,12 +333,14 @@ private static void adjustStats(final ResponseCollectorService collector, final ResponseCollectorService.ComputedNodeStats minStats) { if (minNodeId != null) { for (Map.Entry> entry : nodeStats.entrySet()) { - if (entry.getKey().equals(minNodeId) == false && entry.getValue().isPresent()) { - final ResponseCollectorService.ComputedNodeStats stats = entry.getValue().get(); + final String nodeId = entry.getKey(); + final Optional maybeStats = entry.getValue(); + if (nodeId.equals(minNodeId) == false && maybeStats.isPresent()) { + final ResponseCollectorService.ComputedNodeStats stats = maybeStats.get(); final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2; final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2; final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2; - collector.addNodeStatistics(stats.nodeId, updatedQueue, updatedResponse, updatedService); + collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService); } } } diff --git a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java index 3d8f7a5d791d6..6fea0e2e1c0a2 100644 --- a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java +++ b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java @@ -130,7 +130,7 @@ public static class ComputedNodeStats { /** * Rank this copy of the data, according to the adaptive replica selection formula from the C3 paper - * {@link https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf} + * https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf */ private double innerRank(long outstandingRequests) { // this is a placeholder value, the concurrency compensation is From a41c75a3a7dcdf9ed2a9899f7170a2b8691ea211 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 22 Aug 2017 15:51:01 -0600 Subject: [PATCH 10/22] Remove unneeded actionNamePrefix parameter --- .../elasticsearch/action/search/SearchTransportService.java | 4 ++-- .../elasticsearch/action/search/TransportSearchAction.java | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 3ecd1804f5f9d..1f8e24810b789 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -201,9 +201,9 @@ public RemoteClusterService getRemoteClusterService() { } /** - * Return a map of nodeId to pending number of requests for the given action name prefix + * Return a map of nodeId to pending number of search requests */ - public Map getPendingRequests(final String actionNamePrefix) { + public Map getPendingSearchRequests() { return Collections.unmodifiableMap(clientConnections); } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 7a993b519bab8..edc68e1e55c1a 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; -import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.ClusterState; @@ -285,7 +284,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea for (int i = 0; i < indices.length; i++) { concreteIndices[i] = indices[i].getName(); } - Map nodeSearchCounts = searchTransportService.getPendingRequests(SearchAction.NAME); + Map nodeSearchCounts = searchTransportService.getPendingSearchRequests(); GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts); GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, localIndices, From 546f5fb2eb30c24cea21666fb306c54e38299096 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 22 Aug 2017 15:52:08 -0600 Subject: [PATCH 11/22] Use conns.longValue() instead of cached Long --- .../elasticsearch/action/search/SearchTransportService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 1f8e24810b789..58c2d39014f6f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -510,14 +510,14 @@ final class ConnectionCountingHandler extend public void handleResponse(Response response) { super.handleResponse(response); // Decrement the number of connections or remove it entirely if there are no more connections - clientConnections.computeIfPresent(nodeId, (id, conns) -> conns == 1 ? null : conns - 1); + clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1); } @Override public void handleException(TransportException e) { super.handleException(e); // Decrement the number of connections or remove it entirely if there are no more connections - clientConnections.computeIfPresent(nodeId, (id, conns) -> conns == 1 ? null : conns - 1); + clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1); } } } From 91f7a126f24b7c9dc324f125ce5e28e21949885e Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 22 Aug 2017 15:52:41 -0600 Subject: [PATCH 12/22] Add comments about removing entries from the map --- .../org/elasticsearch/action/search/SearchTransportService.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 58c2d39014f6f..4d2fe4668dbf6 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -510,6 +510,7 @@ final class ConnectionCountingHandler extend public void handleResponse(Response response) { super.handleResponse(response); // Decrement the number of connections or remove it entirely if there are no more connections + // We need to remove the entry here so we don't leak when nodes go away forever clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1); } @@ -517,6 +518,7 @@ public void handleResponse(Response response) { public void handleException(TransportException e) { super.handleException(e); // Decrement the number of connections or remove it entirely if there are no more connections + // We need to remove the entry here so we don't leak when nodes go away forever clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1); } } From 3ddc0acfecfbba75ab78ef018b70d5c2c4adf116 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 22 Aug 2017 15:54:55 -0600 Subject: [PATCH 13/22] Pull out bindings for `entry` in IndexShardRoutingTable --- .../cluster/routing/IndexShardRoutingTable.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 1e2b12c48593b..0f40f9d5228b7 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -309,8 +309,10 @@ private static Map rankNodes(final Map nodeSearchCounts) { final Map nodeRanks = new HashMap<>(nodeStats.size()); for (Map.Entry> entry : nodeStats.entrySet()) { - entry.getValue().ifPresent(stats -> { - nodeRanks.put(entry.getKey(), stats.rank(nodeSearchCounts.getOrDefault(entry.getKey(), 1L))); + Optional maybeStats = entry.getValue(); + maybeStats.ifPresent(stats -> { + final String nodeId = entry.getKey(); + nodeRanks.put(nodeId, stats.rank(nodeSearchCounts.getOrDefault(nodeId, 1L))); }); } return nodeRanks; From a451763713fa228de48b0097d762acd7b3d6046a Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 22 Aug 2017 15:58:31 -0600 Subject: [PATCH 14/22] Use .compareTo instead of manually comparing --- .../cluster/routing/IndexShardRoutingTable.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 0f40f9d5228b7..aa80676005c2c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -405,16 +405,9 @@ public int compare(ShardRouting s1, ShardRouting s2) { Double shard1rank = nodeRanks.get(s1.currentNodeId()); Double shard2rank = nodeRanks.get(s2.currentNodeId()); if (shard1rank != null && shard2rank != null) { - if (shard1rank < shard2rank) { - return -1; - } else if (shard2rank < shard1rank) { - return 1; - } else { - // Yahtzee! - return 0; - } + return shard1rank.compareTo(shard2rank); } else { - // One or both of the nodes don't have stats + // One or both of the nodes don't have stats, treat them as equal return 0; } } From 867424942dd246be4edd8a9805137d52dfb593e0 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 23 Aug 2017 12:14:41 -0600 Subject: [PATCH 15/22] add assert for connections not being null and gte to 1 --- .../action/search/SearchTransportService.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 4d2fe4668dbf6..696750629b40f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -511,6 +511,7 @@ public void handleResponse(Response response) { super.handleResponse(response); // Decrement the number of connections or remove it entirely if there are no more connections // We need to remove the entry here so we don't leak when nodes go away forever + assert assertNodePresent(); clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1); } @@ -519,7 +520,19 @@ public void handleException(TransportException e) { super.handleException(e); // Decrement the number of connections or remove it entirely if there are no more connections // We need to remove the entry here so we don't leak when nodes go away forever + assert assertNodePresent(); clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1); } + + private boolean assertNodePresent() { + clientConnections.compute(nodeId, (id, conns) -> { + assert conns != null : "number of connections for " + id + " is null, but should be an integer"; + assert conns >= 1 : "number of connections for " + id + " should be >= 1 but was " + conns; + return conns; + }); + // Always return true, there is additional asserting here, the boolean is just so this + // can be skipped when assertions are not enabled + return true; + } } } From fc59d532f40305de592ae0404d59cfc3c9e8bae3 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 23 Aug 2017 14:33:47 -0600 Subject: [PATCH 16/22] Copy map for pending search connections instead of "live" map --- .../elasticsearch/action/search/SearchTransportService.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 696750629b40f..7abf7aaceff45 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -60,6 +60,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.function.BiFunction; import java.util.function.Supplier; @@ -201,10 +202,11 @@ public RemoteClusterService getRemoteClusterService() { } /** - * Return a map of nodeId to pending number of search requests + * Return a map of nodeId to pending number of search requests. + * This is a snapshot of the current pending search and not a live map. */ public Map getPendingSearchRequests() { - return Collections.unmodifiableMap(clientConnections); + return new HashMap<>(clientConnections); } static class ScrollFreeContextRequest extends TransportRequest { From e945a5dc216be60ae0605cdfe2fa65979d73ca28 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 29 Aug 2017 15:15:52 -0600 Subject: [PATCH 17/22] Increase the number of pending search requests used for calculating rank when chosen When a node gets chosen, this increases the number of search counts for the winning node so that it will not be as likely to be chosen again for non-concurrent search requests. --- .../cluster/routing/IndexShardRoutingTable.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index aa80676005c2c..76c4632eeef64 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -385,6 +385,13 @@ private static List rank(List shards, final Response // adjust the non-winner nodes' stats so they will get a chance to receive queries adjustStats(collector, nodeStats, minNode, minStats); + if (minNode != null) { + // Increase the number of searches for the "winning" node by one. + // Note that this doesn't actually affect the "real" counts, instead + // it only affects the captured node search counts, which is + // captured once for each query in TransportSearchAction + nodeSearchCounts.compute(minNode, (id, conns) -> conns == null ? 1 : conns + 1); + } return sortedShards; } From 6df44f48feb4f2ff0f20a02783c29bf1c6357f7a Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 30 Aug 2017 08:39:41 -0600 Subject: [PATCH 18/22] Remove unused HashMap import --- .../main/java/org/elasticsearch/transport/TransportService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 670907f6ebb05..a68e319bb2c11 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -58,7 +58,6 @@ import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; From 62747bf0dc7fff235c90d268ad9b6a2883cf6cb7 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 30 Aug 2017 08:59:56 -0600 Subject: [PATCH 19/22] Rename rank -> rankShardsAndUpdateStats --- .../cluster/routing/IndexShardRoutingTable.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 76c4632eeef64..d89a6641408e3 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -276,13 +276,16 @@ public ShardIterator rankedActiveInitializingShardsIt(@Nullable ResponseCollecto @Nullable Map nodeSearchCounts) { final int seed = shuffler.nextSeed(); if (allInitializingShards.isEmpty()) { - return new PlainShardIterator(shardId, rank(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts)); + return new PlainShardIterator(shardId, + rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts)); } ArrayList ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size()); - List rankedActiveShards = rank(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts); + List rankedActiveShards = + rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts); ordered.addAll(rankedActiveShards); - List rankedInitializingShards = rank(allInitializingShards, collector, nodeSearchCounts); + List rankedInitializingShards = + rankShardsAndUpdateStats(allInitializingShards, collector, nodeSearchCounts); ordered.addAll(rankedInitializingShards); return new PlainShardIterator(shardId, ordered); } @@ -348,8 +351,8 @@ private static void adjustStats(final ResponseCollectorService collector, } } - private static List rank(List shards, final ResponseCollectorService collector, - final Map nodeSearchCounts) { + private static List rankShardsAndUpdateStats(List shards, final ResponseCollectorService collector, + final Map nodeSearchCounts) { if (collector == null || nodeSearchCounts == null || shards.size() <= 1) { return shards; } From e289581f78dfdb80c2551287530670a0c68f1a5a Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 30 Aug 2017 09:12:07 -0600 Subject: [PATCH 20/22] Rename rankedActiveInitializingShardsIt -> activeInitializingShardsRankedIt --- .../elasticsearch/cluster/routing/IndexShardRoutingTable.java | 2 +- .../org/elasticsearch/cluster/routing/OperationRouting.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index d89a6641408e3..1f68af1f01aa1 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -272,7 +272,7 @@ public ShardIterator activeInitializingShardsIt(int seed) { * selection forumla. Making sure though that its random within the active shards of the same * (or missing) rank, and initializing shards are the last to iterate through. */ - public ShardIterator rankedActiveInitializingShardsIt(@Nullable ResponseCollectorService collector, + public ShardIterator activeInitializingShardsRankedIt(@Nullable ResponseCollectorService collector, @Nullable Map nodeSearchCounts) { final int seed = shuffler.nextSeed(); if (allInitializingShards.isEmpty()) { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 528a1c1c9c4a4..4b3f254c9f5fc 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -141,7 +141,7 @@ private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable index if (preference == null || preference.isEmpty()) { if (awarenessAttributes.length == 0) { if (useAdaptiveReplicaSelection) { - return indexShard.rankedActiveInitializingShardsIt(collectorService, nodeCounts); + return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); } else { return indexShard.activeInitializingShardsRandomIt(); } @@ -176,7 +176,7 @@ private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable index if (index == -1 || index == preference.length() - 1) { if (awarenessAttributes.length == 0) { if (useAdaptiveReplicaSelection) { - return indexShard.rankedActiveInitializingShardsIt(collectorService, nodeCounts); + return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); } else { return indexShard.activeInitializingShardsRandomIt(); } From dcae338e52068f51f9e5de4bacf2cff407eedb7d Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 30 Aug 2017 09:43:03 -0600 Subject: [PATCH 21/22] Instead of precalculating winning node, use "winning" shard from ranked list --- .../routing/IndexShardRoutingTable.java | 39 ++++++++----------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 1f68af1f01aa1..0021d1550c3e5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -366,34 +366,27 @@ private static List rankShardsAndUpdateStats(List sh // Retrieve all the nodes the shards exist on final Map nodeRanks = rankNodes(nodeStats, nodeSearchCounts); - String minNode = null; - ResponseCollectorService.ComputedNodeStats minStats = null; - // calculate the "winning" node and its stats (for adjusting other nodes later) - for (Map.Entry> entry : nodeStats.entrySet()) { - final String nodeId = entry.getKey(); - final Optional maybeNodeStats = entry.getValue(); - if (maybeNodeStats.isPresent()) { - ResponseCollectorService.ComputedNodeStats stats = maybeNodeStats.get(); - double rank = stats.rank(nodeSearchCounts.getOrDefault(nodeId, 1L)); - if (minStats == null || rank < minStats.rank(nodeSearchCounts.getOrDefault(minStats.nodeId, 1L))) { - minStats = stats; - minNode = nodeId; - } - } - } - // sort all shards based on the shard rank ArrayList sortedShards = new ArrayList<>(shards); Collections.sort(sortedShards, new NodeRankComparator(nodeRanks)); // adjust the non-winner nodes' stats so they will get a chance to receive queries - adjustStats(collector, nodeStats, minNode, minStats); - if (minNode != null) { - // Increase the number of searches for the "winning" node by one. - // Note that this doesn't actually affect the "real" counts, instead - // it only affects the captured node search counts, which is - // captured once for each query in TransportSearchAction - nodeSearchCounts.compute(minNode, (id, conns) -> conns == null ? 1 : conns + 1); + if (sortedShards.size() > 1) { + ShardRouting minShard = sortedShards.get(0); + // If the winning shard is not started we are ranking initializing + // shards, don't bother to do adjustments + if (minShard.started()) { + String minNodeId = minShard.currentNodeId(); + Optional maybeMinStats = nodeStats.get(minNodeId); + if (maybeMinStats.isPresent()) { + adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get()); + // Increase the number of searches for the "winning" node by one. + // Note that this doesn't actually affect the "real" counts, instead + // it only affects the captured node search counts, which is + // captured once for each query in TransportSearchAction + nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1); + } + } } return sortedShards; From 3d1dd2be7c8e59605e295f02f83c2187149eef94 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 30 Aug 2017 15:27:20 -0600 Subject: [PATCH 22/22] Sort null ranked nodes before nodes that have a rank --- .../routing/IndexShardRoutingTable.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 0021d1550c3e5..4376980eca8a9 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -407,11 +407,21 @@ public int compare(ShardRouting s1, ShardRouting s2) { } Double shard1rank = nodeRanks.get(s1.currentNodeId()); Double shard2rank = nodeRanks.get(s2.currentNodeId()); - if (shard1rank != null && shard2rank != null) { - return shard1rank.compareTo(shard2rank); + if (shard1rank != null) { + if (shard2rank != null) { + return shard1rank.compareTo(shard2rank); + } else { + // place non-nulls after null values + return 1; + } } else { - // One or both of the nodes don't have stats, treat them as equal - return 0; + if (shard2rank != null) { + // place nulls before non-null values + return -1; + } else { + // Both nodes do not have stats, they are equal + return 0; + } } } }