Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Aggregator#buildTopLevel() to search worker thread. #98715

Merged
merged 20 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/search/profile.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ The API returns the following result:
"time_in_nanos": 22577
},
{
"name": "BucketCollectorWrapper: [BucketCollectorWrapper[bucketCollector=[my_scoped_agg, my_global_agg]]]",
"name": "AggregatorCollector: [my_scoped_agg, my_global_agg]",
"reason": "aggregation",
"time_in_nanos": 867617
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@
*/
package org.elasticsearch.search.aggregations;

import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.elasticsearch.action.search.SearchShardTask;
import org.elasticsearch.search.aggregations.support.TimeSeriesIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.query.QueryPhase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

Expand All @@ -31,36 +28,30 @@ public static void preProcess(SearchContext context) {
if (context.aggregations() == null) {
return;
}
final Supplier<Collector> collectorSupplier;
final Supplier<AggregatorCollector> collectorSupplier;
if (context.aggregations().isInSortOrderExecutionRequired()) {
executeInSortOrder(context, newBucketCollector(context));
collectorSupplier = () -> BucketCollector.NO_OP_COLLECTOR;
AggregatorCollector collector = newAggregatorCollector(context);
executeInSortOrder(context, collector.bucketCollector);
collectorSupplier = () -> new AggregatorCollector(collector.aggregators, BucketCollector.NO_OP_BUCKET_COLLECTOR);
} else {
collectorSupplier = () -> newBucketCollector(context).asCollector();
collectorSupplier = () -> newAggregatorCollector(context);
}
context.aggregations().registerAggsCollectorManager(new CollectorManager<>() {
@Override
public Collector newCollector() {
return collectorSupplier.get();
}

@Override
public Void reduce(Collection<Collector> collectors) {
// we cannot run post-collection method here because we need to do it after the optional timeout
// has been removed from the index searcher. Therefore, we delay this processing to the
// AggregationPhase#execute method.
return null;
}
});
context.aggregations()
.registerAggsCollectorManager(
new AggregatorCollectorManager(
collectorSupplier,
internalAggregations -> context.queryResult().aggregations(internalAggregations),
() -> context.aggregations().getAggregationReduceContextBuilder().forPartialReduction()
)
);
}

private static BucketCollector newBucketCollector(SearchContext context) {
private static AggregatorCollector newAggregatorCollector(SearchContext context) {
try {
Aggregator[] aggregators = context.aggregations().factories().createTopLevelAggregators();
context.aggregations().aggregators(aggregators);
BucketCollector bucketCollector = MultiBucketCollector.wrap(true, List.of(aggregators));
bucketCollector.preCollection();
return bucketCollector;
return new AggregatorCollector(aggregators, bucketCollector);
} catch (IOException e) {
throw new AggregationInitializationException("Could not initialize aggregators", e);
}
Expand Down Expand Up @@ -96,48 +87,4 @@ private static List<Runnable> getCancellationChecks(SearchContext context) {

return cancellationChecks;
}

public static void execute(SearchContext context) {
if (context.aggregations() == null) {
context.queryResult().aggregations(null);
return;
}

if (context.queryResult().hasAggs()) {
// no need to compute the aggs twice, they should be computed on a per context basis
return;
}

final List<InternalAggregations> internalAggregations = new ArrayList<>(context.aggregations().aggregators().size());
for (Aggregator[] aggregators : context.aggregations().aggregators()) {
final List<InternalAggregation> aggregations = new ArrayList<>(aggregators.length);
for (Aggregator aggregator : aggregators) {
try {
aggregations.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
// release the aggregator to claim the used bytes as we don't need it anymore
aggregator.releaseAggregations();
}
internalAggregations.add(InternalAggregations.from(aggregations));
}

if (internalAggregations.size() > 1) {
// we execute this search using more than one slice. In order to keep memory requirements
// low, we do a partial reduction here.
context.queryResult()
.aggregations(
InternalAggregations.topLevelReduce(
internalAggregations,
context.aggregations().getAggregationReduceContextBuilder().forPartialReduction()
)
);
} else {
context.queryResult().aggregations(internalAggregations.get(0));
}

// disable aggregations so that they don't run on next pages in case of scrolling
context.aggregations(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.search.internal.TwoPhaseCollector;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/** Collector that controls the life cycle of an aggregation document collection. */
public class AggregatorCollector implements TwoPhaseCollector {
final Aggregator[] aggregators;
final BucketCollector bucketCollector;
final List<InternalAggregation> internalAggregations;

public AggregatorCollector(Aggregator[] aggregators, BucketCollector bucketCollector) {
this.aggregators = aggregators;
this.bucketCollector = bucketCollector;
this.internalAggregations = new ArrayList<>(aggregators.length);
}

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
return bucketCollector.getLeafCollector(new AggregationExecutionContext(context, null, null, null));
}

@Override
public ScoreMode scoreMode() {
return bucketCollector.scoreMode();
}

@Override
public void doPostCollection() throws IOException {
bucketCollector.postCollection();
for (Aggregator aggregator : aggregators) {
internalAggregations.add(aggregator.buildTopLevel());
// release the aggregator to claim the used bytes as we don't need it anymore
aggregator.releaseAggregations();
}
}

@Override
public String toString() {
String[] aggNames = new String[aggregators.length];
for (int i = 0; i < aggregators.length; i++) {
aggNames[i] = aggregators[i].name();
}
return Arrays.toString(aggNames);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations;

import org.apache.lucene.search.CollectorManager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;

/** Collector manager that produces {@link AggregatorCollector} and merges them during the reduce phase. */
public class AggregatorCollectorManager implements CollectorManager<AggregatorCollector, Void> {

private final Supplier<AggregatorCollector> collectorSupplier;
private final Consumer<InternalAggregations> internalAggregationsConsumer;
private final Supplier<AggregationReduceContext> reduceContextSupplier;

public AggregatorCollectorManager(
Supplier<AggregatorCollector> collectorSupplier,
Consumer<InternalAggregations> internalAggregationsConsumer,
Supplier<AggregationReduceContext> reduceContextSupplier
) {
this.collectorSupplier = collectorSupplier;
this.internalAggregationsConsumer = internalAggregationsConsumer;
this.reduceContextSupplier = reduceContextSupplier;
}

@Override
public AggregatorCollector newCollector() throws IOException {
return collectorSupplier.get();
}

@Override
public Void reduce(Collection<AggregatorCollector> collectors) throws IOException {
if (collectors.size() > 1) {
// we execute this search using more than one slice. In order to keep memory requirements
// low, we do a partial reduction here.
final List<InternalAggregations> internalAggregations = new ArrayList<>(collectors.size());
collectors.forEach(c -> internalAggregations.add(InternalAggregations.from(c.internalAggregations)));
internalAggregationsConsumer.accept(InternalAggregations.topLevelReduce(internalAggregations, reduceContextSupplier.get()));
} else if (collectors.size() == 1) {
internalAggregationsConsumer.accept(InternalAggregations.from(collectors.iterator().next().internalAggregations));
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@
*/
package org.elasticsearch.search.aggregations;

import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;

/**
Expand All @@ -21,8 +18,7 @@ public class SearchContextAggregations {

private final AggregatorFactories factories;
private final Supplier<AggregationReduceContext.Builder> toAggregationReduceContextBuilder;
private final List<Aggregator[]> aggregators;
private CollectorManager<Collector, Void> aggCollectorManager;
private CollectorManager<AggregatorCollector, Void> aggCollectorManager;

/**
* Creates a new aggregation context with the parsed aggregator factories
Expand All @@ -33,37 +29,23 @@ public SearchContextAggregations(
) {
this.factories = factories;
this.toAggregationReduceContextBuilder = toAggregationReduceContextBuilder;
this.aggregators = new ArrayList<>();
}

public AggregatorFactories factories() {
return factories;
}

public List<Aggregator[]> aggregators() {
return aggregators;
}

/**
* Registers all the created aggregators (top level aggregators) for the search execution context.
*
* @param aggregators The top level aggregators of the search execution.
*/
public void aggregators(Aggregator[] aggregators) {
this.aggregators.add(aggregators);
}

/**
* Registers the collector to be run for the aggregations phase
*/
public void registerAggsCollectorManager(CollectorManager<Collector, Void> aggCollectorManager) {
public void registerAggsCollectorManager(CollectorManager<AggregatorCollector, Void> aggCollectorManager) {
this.aggCollectorManager = aggCollectorManager;
}

/**
* Returns the collector to be run for the aggregations phase
*/
public CollectorManager<Collector, Void> getAggsCollectorManager() {
public CollectorManager<AggregatorCollector, Void> getAggsCollectorManager() {
return aggCollectorManager;
}

Expand Down
Loading