Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple pipeline reductions from final agg reduction #45796

Merged
merged 11 commits into from
Dec 5, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public Object getProperty(List<String> path) {
}

@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
// merge stats across all shards
List<InternalAggregation> aggs = new ArrayList<>(aggregations);
aggs.removeIf(p -> ((InternalMatrixStats)p).stats == null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
}
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
InternalAggregations.reduce(aggregationsList, reduceContext);
InternalAggregations.topLevelReduce(aggregationsList, reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
reducedCompletionSuggestions);
Expand Down Expand Up @@ -617,7 +617,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (index == bufferSize) {
if (hasAggs) {
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext);
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ SearchResponse getMergedResponse(Clusters clusters) {
SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, topDocsStats);
setSuggestShardIndex(shards, groupedSuggestions);
Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true));
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, reduceContextFunction.apply(true));
ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY);
SearchProfileShardResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
//make failures ordering consistent between ordinary search and CCS by looking at the shard they come from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,25 @@ public String getName() {
return name;
}

/**
* Creates the output from all pipeline aggs that this aggregation is associated with. Should only
* be called after all aggregations have been fully reduced
*/
public InternalAggregation materializePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) {
polyfractal marked this conversation as resolved.
Show resolved Hide resolved
assert reduceContext.isFinalReduce();
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
reducedAggs = pipelineAggregator.materializePipeline(reducedAggs, reduceContext);
}
return reducedAggs;
}

/**
* Reduces the given aggregations to a single one and returns it. In <b>most</b> cases, the assumption will be the all given
* aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing,
* try reusing an existing instance (typically the first in the given list) to save on redundant object
* construction.
*/
public final InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
InternalAggregation aggResult = doReduce(aggregations, reduceContext);
if (reduceContext.isFinalReduce()) {
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
aggResult = pipelineAggregator.reduce(aggResult, reduceContext);
}
}
return aggResult;
}

public abstract InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
public abstract InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);

/**
* Return true if this aggregation is mapped, and can lead a reduction. If this agg returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* An internal implementation of {@link Aggregations}.
Expand Down Expand Up @@ -91,10 +92,46 @@ public List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
return topLevelPipelineAggregators;
}

@SuppressWarnings("unchecked")
private List<InternalAggregation> getInternalAggregations() {
return (List<InternalAggregation>) aggregations;
}

/**
* Begin the reduction process. This should be the entry point for the "first" reduction, e.g. called by
* SearchPhaseController or anywhere else that wants to initiate a reduction. It _should not_ be called
* as an intermediate reduction step (e.g. in the middle of an aggregation tree).
*
* This method first reduces the aggregations, and if it is the final reduce, then materializes the pipeline
* aggregations (both embedded parent/sibling as well as top-level sibling pipelines)
*/
public static InternalAggregations topLevelReduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
InternalAggregations reduced = reduce(aggregationsList, context);
if (reduced == null) {
return null;
}

if (context.isFinalReduce()) {
List<InternalAggregation> reducedInternalAggs = reduced.getInternalAggregations();
reducedInternalAggs = reducedInternalAggs.stream()
.map(agg -> agg.materializePipelines(agg, context)).collect(Collectors.toList());
polyfractal marked this conversation as resolved.
Show resolved Hide resolved

List<SiblingPipelineAggregator> topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators();
for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) {
InternalAggregation newAgg
= pipelineAggregator.doMaterializePipelines(new InternalAggregations(reducedInternalAggs), context);
reducedInternalAggs.add(newAgg);
}
return new InternalAggregations(reducedInternalAggs);
}
return reduced;
}

/**
* Reduces the given list of aggregations as well as the top-level pipeline aggregators extracted from the first
* {@link InternalAggregations} object found in the list.
* Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched.
* Note that pipeline aggregations _are not_ reduced by this method. Pipelines are handled
* separately by {@link InternalAggregations#topLevelReduce(List, ReduceContext)}
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
if (aggregationsList.isEmpty()) {
Expand Down Expand Up @@ -123,13 +160,6 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
reducedAggregations.add(first.reduce(aggregations, context));
}

if (context.isFinalReduce()) {
for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) {
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context);
reducedAggregations.add(newAgg);
}
return new InternalAggregations(reducedAggregations);
}
return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public abstract class InternalMultiBucketAggregation<A extends InternalMultiBucketAggregation,
B extends InternalMultiBucketAggregation.InternalBucket>
Expand Down Expand Up @@ -73,7 +74,7 @@ protected InternalMultiBucketAggregation(StreamInput in) throws IOException {
protected abstract B reduceBucket(List<B> buckets, ReduceContext context);

@Override
public abstract List<? extends InternalBucket> getBuckets();
public abstract List<B> getBuckets();

@Override
public Object getProperty(List<String> path) {
Expand Down Expand Up @@ -141,6 +142,28 @@ public static int countInnerBucket(Aggregation agg) {
return size;
}

/**
* Unlike {@link InternalAggregation#materializePipelines(InternalAggregation, ReduceContext)}, a multi-bucket
* agg needs to first materialize the buckets (and their parent pipelines) before allowing sibling pipelines
* to materialize
*/
@Override
public final InternalAggregation materializePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) {
assert reduceContext.isFinalReduce();
List<B> materializedBuckets = materializeBuckets(reduceContext);
return super.materializePipelines(create(materializedBuckets), reduceContext);
}

private List<B> materializeBuckets(ReduceContext reduceContext) {
return getBuckets().stream().map(internalBucket -> {
polyfractal marked this conversation as resolved.
Show resolved Hide resolved
List<InternalAggregation> aggs = internalBucket.getAggregations().asList()
.stream().map(aggregation
-> ((InternalAggregation)aggregation).materializePipelines((InternalAggregation)aggregation, reduceContext))
.collect(Collectors.toList());
return createBucket(new InternalAggregations(aggs), internalBucket);
}).collect(Collectors.toList());
}

public abstract static class InternalBucket implements Bucket, Writeable {

public Object getProperty(String containingAggName, List<String> path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public InternalSingleBucketAggregation create(InternalAggregations subAggregatio
protected abstract InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations);

@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long docCount = 0L;
List<InternalAggregations> subAggregationsList = new ArrayList<>(aggregations.size());
for (InternalAggregation aggregation : aggregations) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public InternalBucket getBucketByKey(String key) {
}

@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
Map<String, List<InternalBucket>> bucketsMap = new HashMap<>();
for (InternalAggregation aggregation : aggregations) {
InternalAdjacencyMatrix filters = (InternalAdjacencyMatrix) aggregation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ int[] getReverseMuls() {
}

@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
PriorityQueue<BucketIterator> pq = new PriorityQueue<>(aggregations.size());
for (InternalAggregation agg : aggregations) {
InternalComposite sortedAgg = (InternalComposite) agg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public InternalBucket getBucketByKey(String key) {
}

@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<List<InternalBucket>> bucketsList = null;
for (InternalAggregation aggregation : aggregations) {
InternalFilters filters = (InternalFilters) aggregation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public List<InternalGeoGridBucket> getBuckets() {
}

@Override
public InternalGeoGrid doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalGeoGrid reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
LongObjectPagedHashMap<List<InternalGeoGridBucket>> buckets = null;
for (InternalAggregation aggregation : aggregations) {
InternalGeoGrid grid = (InternalGeoGrid) aggregation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ static int getAppropriateRounding(long minKey, long maxKey, int roundingIdx,
}

@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext);

if (reduceContext.isFinalReduce()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
}

@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
if (reduceContext.isFinalReduce()) {
if (minDocCount == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
}

@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
if (reduceContext.isFinalReduce()) {
if (minDocCount == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype)
}

@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
reduceContext.consumeBucketsAndMaybeBreak(buckets.size());
long[] docCounts = new long[buckets.size()];
InternalAggregations[][] aggs = new InternalAggregations[buckets.size()][];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public B createBucket(InternalAggregations aggregations, B prototype) {

@SuppressWarnings("unchecked")
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
reduceContext.consumeBucketsAndMaybeBreak(ranges.size());
List<B>[] rangeList = new List[ranges.size()];
for (int i = 0; i < rangeList.length; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public String getWriteableName() {
}

@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
return new UnmappedSampler(name, pipelineAggregators(), metaData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws
}

if (spare == null) {
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format);
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
}
spare.bucketOrd = bucketOrd;
copy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ public long getSubsetSize() {
return subsetSize;
}

// TODO we should refactor to remove this, since buckets should be immutable after they are generated.
// This can lead to confusing bugs if the bucket is re-created (via createBucket() or similar) without
// the score
void updateScore(SignificanceHeuristic significanceHeuristic) {
score = significanceHeuristic.getScore(subsetDf, subsetSize, supersetDf, supersetSize);
}
Expand Down Expand Up @@ -191,7 +194,7 @@ protected final void doWriteTo(StreamOutput out) throws IOException {
public abstract List<B> getBuckets();

@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long globalSubsetSize = 0;
long globalSupersetSize = 0;
// Compute the overall result set size and the corpus size using the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,9 @@ static class Bucket extends InternalSignificantTerms.Bucket<Bucket> {
long term;

Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations,
DocValueFormat format) {
DocValueFormat format, double score) {
super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
this.term = term;
}

Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations,
double score) {
this(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations, null);
this.score = score;
}

Expand Down Expand Up @@ -134,7 +129,7 @@ public SignificantLongTerms create(List<SignificantLongTerms.Bucket> buckets) {
@Override
public Bucket createBucket(InternalAggregations aggregations, SignificantLongTerms.Bucket prototype) {
return new Bucket(prototype.subsetDf, prototype.subsetSize, prototype.supersetDf, prototype.supersetSize, prototype.term,
aggregations, prototype.format);
aggregations, prototype.format, prototype.score);
}

@Override
Expand All @@ -151,6 +146,6 @@ protected Bucket[] createBucketsArray(int size) {
@Override
Bucket createBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize,
InternalAggregations aggregations, SignificantLongTerms.Bucket prototype) {
return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, prototype.term, aggregations, format);
return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, prototype.term, aggregations, format, prototype.score);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public SignificantLongTerms buildAggregation(long owningBucketOrdinal) throws IO
continue;
}
if (spare == null) {
spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, format);
spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, format, 0);
}
spare.term = bucketOrds.get(i);
spare.subsetDf = docCount;
Expand Down
Loading