Skip to content

Commit

Permalink
Create weights lazily in filter and filters aggregation (#26983)
Browse files Browse the repository at this point in the history
Previous to this change the weights for the filter and filters aggregation were created in the `Filter(s)AggregatorFactory` which meant that they were created regardless of whether the aggregator actually collects any documents. This meant that for filters that are expensive to initialise, requests would not be quick when the query of the request was (or effectively was) a `match_none` query.

This change maintains a single Weight instance for each filter across parent buckets but passes a weight supplier to the aggregator instances which will create the weight on first call and then return that instance for subsequent calls.
  • Loading branch information
colings86 committed Oct 13, 2017
1 parent fe2d4a0 commit 3aea002
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,17 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

/**
* Aggregate all docs that match a filter.
*/
public class FilterAggregator extends SingleBucketAggregator {

private final Weight filter;
private final Supplier<Weight> filter;

public FilterAggregator(String name,
Weight filter,
Supplier<Weight> filter,
AggregatorFactories factories,
SearchContext context,
Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Expand All @@ -56,7 +57,7 @@ public FilterAggregator(String name,
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
// no need to provide deleted docs to the filter
final Bits bits = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), filter.scorerSupplier(ctx));
final Bits bits = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), filter.get().scorerSupplier(ctx));
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int doc, long bucket) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Weight;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AggregationInitializationException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
Expand All @@ -35,20 +36,40 @@

public class FilterAggregatorFactory extends AggregatorFactory<FilterAggregatorFactory> {

final Weight weight;
private Weight weight;
private Query filter;

public FilterAggregatorFactory(String name, QueryBuilder filterBuilder, SearchContext context,
AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
super(name, context, parent, subFactoriesBuilder, metaData);
IndexSearcher contextSearcher = context.searcher();
Query filter = filterBuilder.toFilter(context.getQueryShardContext());
weight = contextSearcher.createNormalizedWeight(filter, false);
filter = filterBuilder.toFilter(context.getQueryShardContext());
}

/**
* Returns the {@link Weight} for this filter aggregation, creating it if
* necessary. This is done lazily so that the {@link Weight} is only created
* if the aggregation collects documents reducing the overhead of the
* aggregation in teh case where no documents are collected.
*
* Note that as aggregations are initialsed and executed in a serial manner,
* no concurrency considerations are necessary here.
*/
public Weight getWeight() {
if (weight == null) {
IndexSearcher contextSearcher = context.searcher();
try {
weight = contextSearcher.createNormalizedWeight(filter, false);
} catch (IOException e) {
throw new AggregationInitializationException("Failed to initialse filter", e);
}
}
return weight;
}

@Override
public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new FilterAggregator(name, weight, factories, context, parent, pipelineAggregators, metaData);
return new FilterAggregator(name, () -> this.getWeight(), factories, context, parent, pipelineAggregators, metaData);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

/**
*
Expand Down Expand Up @@ -118,13 +119,13 @@ public boolean equals(Object obj) {
}

private final String[] keys;
private Weight[] filters;
private Supplier<Weight[]> filters;
private final boolean keyed;
private final boolean showOtherBucket;
private final String otherBucketKey;
private final int totalNumKeys;

public FiltersAggregator(String name, AggregatorFactories factories, String[] keys, Weight[] filters, boolean keyed, String otherBucketKey,
public FiltersAggregator(String name, AggregatorFactories factories, String[] keys, Supplier<Weight[]> filters, boolean keyed, String otherBucketKey,
SearchContext context,
Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {
Expand All @@ -145,6 +146,7 @@ public FiltersAggregator(String name, AggregatorFactories factories, String[] ke
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
// no need to provide deleted docs to the filter
Weight[] filters = this.filters.get();
final Bits[] bits = new Bits[filters.length];
for (int i = 0; i < filters.length; ++i) {
bits[i] = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), filters[i].scorerSupplier(ctx));
Expand All @@ -168,7 +170,7 @@ public void collect(int doc, long bucket) throws IOException {

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
List<InternalFilters.InternalBucket> buckets = new ArrayList<>(filters.length);
List<InternalFilters.InternalBucket> buckets = new ArrayList<>(keys.length);
for (int i = 0; i < keys.length; i++) {
long bucketOrd = bucketOrd(owningBucketOrdinal, i);
InternalFilters.InternalBucket bucket = new InternalFilters.InternalBucket(keys[i], bucketDocCount(bucketOrd), bucketAggregations(bucketOrd), keyed);
Expand All @@ -187,7 +189,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
@Override
public InternalAggregation buildEmptyAggregation() {
InternalAggregations subAggs = buildEmptySubAggregations();
List<InternalFilters.InternalBucket> buckets = new ArrayList<>(filters.length);
List<InternalFilters.InternalBucket> buckets = new ArrayList<>(keys.length);
for (int i = 0; i < keys.length; i++) {
InternalFilters.InternalBucket bucket = new InternalFilters.InternalBucket(keys[i], 0, subAggs, keyed);
buckets.add(bucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Weight;
import org.elasticsearch.search.aggregations.AggregationInitializationException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
Expand All @@ -36,7 +37,8 @@
public class FiltersAggregatorFactory extends AggregatorFactory<FiltersAggregatorFactory> {

private final String[] keys;
final Weight[] weights;
private final Query[] filters;
private Weight[] weights;
private final boolean keyed;
private final boolean otherBucket;
private final String otherBucketKey;
Expand All @@ -48,21 +50,43 @@ public FiltersAggregatorFactory(String name, List<KeyedFilter> filters, boolean
this.keyed = keyed;
this.otherBucket = otherBucket;
this.otherBucketKey = otherBucketKey;
IndexSearcher contextSearcher = context.searcher();
weights = new Weight[filters.size()];
keys = new String[filters.size()];
this.filters = new Query[filters.size()];
for (int i = 0; i < filters.size(); ++i) {
KeyedFilter keyedFilter = filters.get(i);
this.keys[i] = keyedFilter.key();
Query filter = keyedFilter.filter().toFilter(context.getQueryShardContext());
this.weights[i] = contextSearcher.createNormalizedWeight(filter, false);
this.filters[i] = keyedFilter.filter().toFilter(context.getQueryShardContext());
}
}

/**
* Returns the {@link Weight}s for this filter aggregation, creating it if
* necessary. This is done lazily so that the {@link Weight}s are only
* created if the aggregation collects documents reducing the overhead of
* the aggregation in the case where no documents are collected.
*
* Note that as aggregations are initialsed and executed in a serial manner,
* no concurrency considerations are necessary here.
*/
public Weight[] getWeights() {
if (weights == null) {
try {
IndexSearcher contextSearcher = context.searcher();
weights = new Weight[filters.length];
for (int i = 0; i < filters.length; ++i) {
this.weights[i] = contextSearcher.createNormalizedWeight(filters[i], false);
}
} catch (IOException e) {
throw new AggregationInitializationException("Failed to initialse filters for aggregation [" + name() + "]", e);
}
}
return weights;
}

@Override
public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new FiltersAggregator(name, factories, keys, weights, keyed, otherBucket ? otherBucketKey : null, context, parent,
return new FiltersAggregator(name, factories, keys, () -> getWeights(), keyed, otherBucket ? otherBucketKey : null, context, parent,
pipelineAggregators, metaData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations.bucket.nested;


import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.ReaderUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
import org.hamcrest.Matchers;
import org.junit.Before;

Expand Down Expand Up @@ -121,7 +118,7 @@ public void testParsedAsFilter() throws IOException {
AggregatorFactory<?> factory = createAggregatorFactory(builder, indexSearcher, fieldType);
assertThat(factory, Matchers.instanceOf(FilterAggregatorFactory.class));
FilterAggregatorFactory filterFactory = (FilterAggregatorFactory) factory;
Query parsedQuery = filterFactory.weight.getQuery();
Query parsedQuery = filterFactory.getWeight().getQuery();
assertThat(parsedQuery, Matchers.instanceOf(BooleanQuery.class));
assertEquals(2, ((BooleanQuery) parsedQuery).clauses().size());
// means the bool query has been parsed as a filter, if it was a query minShouldMatch would
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void testParsedAsFilter() throws IOException {
AggregatorFactory<?> factory = createAggregatorFactory(builder, indexSearcher, fieldType);
assertThat(factory, Matchers.instanceOf(FiltersAggregatorFactory.class));
FiltersAggregatorFactory filtersFactory = (FiltersAggregatorFactory) factory;
Query parsedQuery = filtersFactory.weights[0].getQuery();
Query parsedQuery = filtersFactory.getWeights()[0].getQuery();
assertThat(parsedQuery, Matchers.instanceOf(BooleanQuery.class));
assertEquals(2, ((BooleanQuery) parsedQuery).clauses().size());
// means the bool query has been parsed as a filter, if it was a query minShouldMatch would
Expand Down

0 comments on commit 3aea002

Please sign in to comment.