Skip to content

Commit

Permalink
Do not add unmatched shard routing
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Sep 9, 2020
1 parent 44f14d3 commit 0a99df6
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -581,14 +579,8 @@ static List<SearchShardIterator> getRemoteShardsIteratorFromPointInTime(Map<Stri
final String clusterAlias = entry.getKey();
final SearchContextIdForNode perNode = searchContextId.shards().get(shardId);
assert clusterAlias.equals(perNode.getClusterAlias()) : clusterAlias + " != " + perNode.getClusterAlias();
final List<String> nodeIds = new ArrayList<>();
nodeIds.add(perNode.getNode()); // always search with the matching node first
for (ShardRouting shard : group.getShards()) {
if (perNode.getNode().equals(shard.currentNodeId()) == false) {
nodeIds.add(shard.currentNodeId());
}
}
SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias, shardId, nodeIds,
final List<String> targetNodes = List.of(perNode.getNode());
SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias, shardId, targetNodes,
remoteClusterIndices.get(clusterAlias), perNode.getSearchContextId(), searchContextKeepAlive);
remoteShardIterators.add(shardIterator);
}
Expand Down Expand Up @@ -627,7 +619,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
aliasFilter = searchContext.aliasFilter();
indexRoutings = Map.of();
concreteLocalIndices = localIndices == null ? new String[0] : localIndices.indices();
localShardIterators = getLocalLocalShardsIteratorFromPointInTime(clusterState, localIndices,
localShardIterators = getLocalLocalShardsIteratorFromPointInTime(localIndices,
searchRequest.getLocalClusterAlias(), searchContext, searchRequest.pointInTimeBuilder().getKeepAlive());
} else {
final Index[] indices = resolveLocalIndices(localIndices, clusterState, timeProvider);
Expand Down Expand Up @@ -922,29 +914,16 @@ static Map<String, OriginalIndices> getIndicesFromSearchContexts(SearchContextId
.collect(Collectors.toMap(Map.Entry::getKey, e -> new OriginalIndices(e.getValue().toArray(String[]::new), indicesOptions)));
}

static List<SearchShardIterator> getLocalLocalShardsIteratorFromPointInTime(ClusterState clusterState,
OriginalIndices originalIndices,
String localClusterAlias,
SearchContextId searchContext,
TimeValue keepAlive) {
static List<SearchShardIterator> getLocalLocalShardsIteratorFromPointInTime(OriginalIndices originalIndices, String localClusterAlias,
SearchContextId searchContext, TimeValue keepAlive) {
final List<SearchShardIterator> iterators = new ArrayList<>(searchContext.shards().size());
for (Map.Entry<ShardId, SearchContextIdForNode> entry : searchContext.shards().entrySet()) {
final SearchContextIdForNode perNode = entry.getValue();
if (Strings.isEmpty(perNode.getClusterAlias())) {
final ShardId shardId = entry.getKey();
final ShardIterator shards = OperationRouting.getShards(clusterState, shardId);
final List<String> matchingNodeFirst = new ArrayList<>(shards.size());
final String nodeId = perNode.getNode();
// always search the matching node first even when its shard was relocated to another node
// because the point in time should keep the corresponding search context open.
matchingNodeFirst.add(nodeId);
for (ShardRouting shard : shards) {
if (shard.currentNodeId().equals(nodeId) == false) {
matchingNodeFirst.add(shard.currentNodeId());
}
}
iterators.add(new SearchShardIterator(localClusterAlias, shardId, matchingNodeFirst, originalIndices,
perNode.getSearchContextId(), keepAlive));
final List<String> targetNodes = List.of(perNode.getNode());
iterators.add(new SearchShardIterator(
localClusterAlias, shardId, targetNodes, originalIndices, perNode.getSearchContextId(), keepAlive));
}
}
return iterators;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ public GroupShardsIterator<ShardIterator> searchShards(ClusterState clusterState
return GroupShardsIterator.sortAndCreate(new ArrayList<>(set));
}

public static ShardIterator getShards(ClusterState clusterState, ShardId shardId) {
final IndexShardRoutingTable shard = clusterState.routingTable().shardRoutingTable(shardId);
return shard.activeInitializingShardsRandomIt();
}

private static final Map<String, Set<String>> EMPTY_ROUTING = Collections.emptyMap();

private Set<IndexShardRoutingTable> computeTargetedShards(ClusterState clusterState, String[] concreteIndices,
Expand Down

0 comments on commit 0a99df6

Please sign in to comment.