diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index b3562cc002b5b..0bd7afa7f61c2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -39,6 +39,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; @@ -564,6 +565,7 @@ public InternalSearchResponse buildResponse(SearchHits hits) { * iff the buffer is exhausted. */ static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults { + private final SearchShardTarget[] processedShards; private final InternalAggregations[] aggsBuffer; private final TopDocs[] topDocsBuffer; private final boolean hasAggs; @@ -600,6 +602,7 @@ private QueryPhaseResultConsumer(SearchProgressListener progressListener, Search } this.controller = controller; this.progressListener = progressListener; + this.processedShards = new SearchShardTarget[expectedResultSize]; // no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time. this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0]; this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0]; @@ -636,7 +639,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { numReducePhases++; index = 1; if (hasAggs) { - progressListener.notifyPartialReduce(progressListener.searchShards(results.asList()), + progressListener.notifyPartialReduce(progressListener.searchShards(processedShards), topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases); } } @@ -650,6 +653,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex()); topDocsBuffer[i] = topDocs.topDocs; } + processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget(); } private synchronized List getRemainingAggs() { diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java index c5b3d35159491..87146719a0f52 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java @@ -25,8 +25,10 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.InternalAggregations; +import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -169,6 +171,13 @@ final List searchShards(List results) .collect(Collectors.toUnmodifiableList()); } + final List searchShards(SearchShardTarget[] results) { + return Arrays.stream(results) + .filter(Objects::nonNull) + .map(e -> new SearchShard(e.getClusterAlias(), e.getShardId())) + .collect(Collectors.toUnmodifiableList()); + } + final List searchShards(GroupShardsIterator its) { return StreamSupport.stream(its.spliterator(), false) .map(e -> new SearchShard(e.getClusterAlias(), e.shardId()))