From 71b102970d8f5a9f0de11b4905476e700b3663a4 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Tue, 17 Dec 2019 14:02:06 +0100 Subject: [PATCH] Optimize composite aggregation based on index sorting (#48399) Co-authored-by: Daniel Huang This is a spinoff of #48130 that generalizes the proposal to allow early termination with the composite aggregation when leading sources match a prefix or the entire index sort specification. In such case the composite aggregation can use the index sort natural order to early terminate the collection when it reaches a composite key that is greater than the bottom of the queue. The optimization is also applicable when a query other than match_all is provided. However the optimization is deactivated for sources that match the index sort in the following cases: * Multi-valued source, in such case early termination is not possible. * missing_bucket is set to true --- .../bucket/composite-aggregation.asciidoc | 125 +++++++++- .../CompositeAggregationBuilder.java | 3 +- .../bucket/composite/CompositeAggregator.java | 197 +++++++++++++--- .../CompositeValuesCollectorQueue.java | 60 +++-- .../CompositeValuesSourceBuilder.java | 2 +- .../CompositeValuesSourceConfig.java | 14 +- .../DateHistogramValuesSourceBuilder.java | 3 +- .../GeoTileGridValuesSourceBuilder.java | 3 +- .../HistogramValuesSourceBuilder.java | 3 +- .../bucket/composite/InternalComposite.java | 27 ++- .../bucket/composite/SortedDocsProducer.java | 3 +- .../composite/TermsValuesSourceBuilder.java | 2 +- .../bucket/terms/TermsAggregationBuilder.java | 5 + .../searchafter/SearchAfterBuilder.java | 3 +- .../composite/CompositeAggregatorTests.java | 222 ++++++++++++++---- .../CompositeValuesCollectorQueueTests.java | 73 ++++-- .../composite/InternalCompositeTests.java | 4 +- .../aggregations/AggregatorTestCase.java | 44 +++- 18 files changed, 662 insertions(+), 131 deletions(-) diff --git a/docs/reference/aggregations/bucket/composite-aggregation.asciidoc b/docs/reference/aggregations/bucket/composite-aggregation.asciidoc index bcc79fd989aaa..dd9aea6be3e90 100644 --- a/docs/reference/aggregations/bucket/composite-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/composite-aggregation.asciidoc @@ -116,6 +116,7 @@ Example: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -134,6 +135,7 @@ Like the `terms` aggregation it is also possible to use a script to create the v -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -168,6 +170,7 @@ Example: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -186,6 +189,7 @@ The values are built from a numeric field or a script that return numerical valu -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -218,6 +222,7 @@ is specified by date/time expression: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -247,6 +252,7 @@ the format specified with the format parameter: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -289,6 +295,7 @@ For example: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -311,6 +318,7 @@ in the composite buckets. -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -340,6 +348,7 @@ For example: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -366,6 +375,7 @@ It is possible to include them in the response by setting `missing_bucket` to -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -391,7 +401,7 @@ first 10 composite buckets created from the values source. The response contains the values for each composite bucket in an array containing the values extracted from each value source. -==== After +==== Pagination If the number of composite buckets is too high (or unknown) to be returned in a single response it is possible to split the retrieval in multiple requests. @@ -405,6 +415,7 @@ For example: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -470,6 +481,7 @@ round of result can be retrieved with: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -487,6 +499,116 @@ GET /_search <1> Should restrict the aggregation to buckets that sort **after** the provided values. +==== Early termination + +For optimal performance the <> should be set on the index so that it matches +parts or fully the source order in the composite aggregation. +For instance the following index sort: + +[source,console] +-------------------------------------------------- +PUT twitter +{ + "settings" : { + "index" : { + "sort.field" : ["username", "timestamp"], <1> + "sort.order" : ["asc", "desc"] <2> + } + }, + "mappings": { + "properties": { + "username": { + "type": "keyword", + "doc_values": true + }, + "timestamp": { + "type": "date" + } + } + } +} +-------------------------------------------------- + +<1> This index is sorted by `username` first then by `timestamp`. +<2> ... in ascending order for the `username` field and in descending order for the `timestamp` field. + +.. could be used to optimize these composite aggregations: + +[source,console] +-------------------------------------------------- +GET /_search +{ + "size": 0, + "aggs" : { + "my_buckets": { + "composite" : { + "sources" : [ + { "user_name": { "terms" : { "field": "user_name" } } } <1> + ] + } + } + } +} +-------------------------------------------------- + +<1> `user_name` is a prefix of the index sort and the order matches (`asc`). + +[source,console] +-------------------------------------------------- +GET /_search +{ + "size": 0, + "aggs" : { + "my_buckets": { + "composite" : { + "sources" : [ + { "user_name": { "terms" : { "field": "user_name" } } }, <1> + { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } } <2> + ] + } + } + } +} +-------------------------------------------------- + +<1> `user_name` is a prefix of the index sort and the order matches (`asc`). +<2> `timestamp` matches also the prefix and the order matches (`desc`). + +In order to optimize the early termination it is advised to set `track_total_hits` in the request +to `false`. The number of total hits that match the request can be retrieved on the first request +and it would be costly to compute this number on every page: + +[source,console] +-------------------------------------------------- +GET /_search +{ + "size": 0, + "track_total_hits": false, + "aggs" : { + "my_buckets": { + "composite" : { + "sources" : [ + { "user_name": { "terms" : { "field": "user_name" } } }, + { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } } + ] + } + } + } +} +-------------------------------------------------- + +Note that the order of the source is important, in the example below switching the `user_name` with the `timestamp` +would deactivate the sort optimization since this configuration wouldn't match the index sort specification. +If the order of sources do not matter for your use case you can follow these simple guidelines: + + * Put the fields with the highest cardinality first. + * Make sure that the order of the field matches the order of the index sort. + * Put multi-valued fields last since they cannot be used for early termination. + +WARNING: <> can slowdown indexing, it is very important to test index sorting +with your specific use case and dataset to ensure that it matches your requirement. If it doesn't note that `composite` +aggregations will also try to early terminate on non-sorted indices if the query matches all document (`match_all` query). + ==== Sub-aggregations Like any `multi-bucket` aggregations the `composite` aggregation can hold sub-aggregations. @@ -499,6 +621,7 @@ per composite bucket: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java index b3712e231fded..243d1057bbf4a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java @@ -235,7 +235,8 @@ protected AggregatorFactory doBuild(QueryShardContext queryShardContext, Aggrega } else { afterKey = null; } - return new CompositeAggregationFactory(name, queryShardContext, parent, subfactoriesBuilder, metaData, size, configs, afterKey); + return new CompositeAggregationFactory(name, queryShardContext, parent, subfactoriesBuilder, metaData, size, + configs, afterKey); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java index 4effb22f30cb2..0e01615492939 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -20,18 +20,28 @@ package org.elasticsearch.search.aggregations.bucket.composite; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.DocValues; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.queries.SearchAfterSortedDocQuery; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.DocIdSet; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; import org.apache.lucene.search.Weight; import org.apache.lucene.util.RoaringDocIdSet; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.index.IndexSortConfig; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -46,6 +56,8 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.searchafter.SearchAfterBuilder; +import org.elasticsearch.search.sort.SortAndFormats; import java.io.IOException; import java.util.ArrayList; @@ -60,11 +72,12 @@ final class CompositeAggregator extends BucketsAggregator { private final int size; - private final SortedDocsProducer sortedDocsProducer; private final List sourceNames; private final int[] reverseMuls; private final List formats; + private final CompositeKey rawAfterKey; + private final CompositeValuesSourceConfig[] sourceConfigs; private final SingleDimensionValuesSource[] sources; private final CompositeValuesCollectorQueue queue; @@ -73,6 +86,8 @@ final class CompositeAggregator extends BucketsAggregator { private RoaringDocIdSet.Builder docIdSetBuilder; private BucketCollector deferredCollectors; + private boolean earlyTerminated; + CompositeAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent, List pipelineAggregators, Map metaData, int size, CompositeValuesSourceConfig[] sourceConfigs, CompositeKey rawAfterKey) throws IOException { @@ -89,11 +104,12 @@ final class CompositeAggregator extends BucketsAggregator { " to: [" + bucketLimit + "] but was [" + size + "]. This limit can be set by changing the [" + MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", bucketLimit); } + this.sourceConfigs = sourceConfigs; for (int i = 0; i < sourceConfigs.length; i++) { this.sources[i] = createValuesSource(context.bigArrays(), context.searcher().getIndexReader(), sourceConfigs[i], size); } this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); - this.sortedDocsProducer = sources[0].createSortedDocsProducerOrNull(context.searcher().getIndexReader(), context.query()); + this.rawAfterKey = rawAfterKey; } @Override @@ -121,7 +137,6 @@ protected void doPostCollection() throws IOException { public InternalAggregation buildAggregation(long zeroBucket) throws IOException { assert zeroBucket == 0L; consumeBucketsAndMaybeBreak(queue.size()); - if (deferredCollectors != NO_OP_COLLECTOR) { // Replay all documents that contain at least one top bucket (collected during the first pass). runDeferredCollections(); @@ -138,13 +153,13 @@ public InternalAggregation buildAggregation(long zeroBucket) throws IOException } CompositeKey lastBucket = num > 0 ? buckets[num-1].getRawKey() : null; return new InternalComposite(name, size, sourceNames, formats, Arrays.asList(buckets), lastBucket, reverseMuls, - pipelineAggregators(), metaData()); + earlyTerminated, pipelineAggregators(), metaData()); } @Override public InternalAggregation buildEmptyAggregation() { return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), null, reverseMuls, - pipelineAggregators(), metaData()); + false, pipelineAggregators(), metaData()); } private void finishLeaf() { @@ -156,58 +171,179 @@ private void finishLeaf() { } } + /** Return true if the provided field may have multiple values per document in the leaf **/ + private boolean isMaybeMultivalued(LeafReaderContext context, SortField sortField) throws IOException { + SortField.Type type = IndexSortConfig.getSortFieldType(sortField); + switch (type) { + case STRING: + final SortedSetDocValues v1 = context.reader().getSortedSetDocValues(sortField.getField()); + return v1 != null && DocValues.unwrapSingleton(v1) == null; + + case DOUBLE: + case FLOAT: + case LONG: + case INT: + final SortedNumericDocValues v2 = context.reader().getSortedNumericDocValues(sortField.getField()); + return v2 != null && DocValues.unwrapSingleton(v2) == null; + + default: + // we have no clue whether the field is multi-valued or not so we assume it is. + return true; + } + } + + /** + * Returns the {@link Sort} prefix that is eligible to index sort + * optimization and null if index sort is not applicable. + */ + private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException { + Sort indexSort = context.reader().getMetaData().getSort(); + if (indexSort == null) { + return null; + } + List sortFields = new ArrayList<>(); + for (int i = 0; i < indexSort.getSort().length; i++) { + CompositeValuesSourceConfig sourceConfig = sourceConfigs[i]; + SingleDimensionValuesSource source = sources[i]; + SortField indexSortField = indexSort.getSort()[i]; + if (source.fieldType == null + // TODO: can we handle missing bucket when using index sort optimization ? + || source.missingBucket + || indexSortField.getField().equals(source.fieldType.name()) == false + || isMaybeMultivalued(context, indexSortField) + || sourceConfig.hasScript()) { + break; + } + + if (indexSortField.getReverse() != (source.reverseMul == -1)) { + if (i == 0) { + // the leading index sort matches the leading source field but the order is reversed + // so we don't check the other sources. + return new Sort(indexSortField); + } + break; + } + sortFields.add(indexSortField); + } + return sortFields.isEmpty() ? null : new Sort(sortFields.toArray(new SortField[0])); + } + + /** + * Return the number of leading sources that match the index sort. + * + * @param indexSortPrefix The index sort prefix that matches the sources + * @return The length of the index sort prefix if the sort order matches + * or -1 if the leading index sort is in the reverse order of the + * leading source. A value of 0 indicates that the index sort is + * not applicable. + */ + private int computeSortPrefixLen(Sort indexSortPrefix) { + if (indexSortPrefix == null) { + return 0; + } + if (indexSortPrefix.getSort()[0].getReverse() != (sources[0].reverseMul == -1)) { + assert indexSortPrefix.getSort().length == 1; + return -1; + } else { + return indexSortPrefix.getSort().length; + } + } + + private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) throws IOException { + DocValueFormat[] formats = new DocValueFormat[indexSortPrefix.getSort().length]; + for (int i = 0; i < formats.length; i++) { + formats[i] = sources[i].format; + } + FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc(new SortAndFormats(indexSortPrefix, formats), + Arrays.copyOfRange(rawAfterKey.values(), 0, formats.length)); + if (indexSortPrefix.getSort().length < sources.length) { + // include all docs that belong to the partial bucket + fieldDoc.doc = 0; + } + BooleanQuery newQuery = new BooleanQuery.Builder() + .add(context.query(), BooleanClause.Occur.MUST) + .add(new SearchAfterSortedDocQuery(indexSortPrefix, fieldDoc), BooleanClause.Occur.FILTER) + .build(); + Weight weight = context.searcher().createWeight(context.searcher().rewrite(newQuery), ScoreMode.COMPLETE_NO_SCORES, 1f); + Scorer scorer = weight.scorer(ctx); + if (scorer != null) { + DocIdSetIterator docIt = scorer.iterator(); + final LeafBucketCollector inner = queue.getLeafCollector(ctx, + getFirstPassCollector(docIdSetBuilder, indexSortPrefix.getSort().length)); + inner.setScorer(scorer); + while (docIt.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + inner.collect(docIt.docID()); + } + } + } + @Override protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { finishLeaf(); + boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR; + + Sort indexSortPrefix = buildIndexSortPrefix(ctx); + int sortPrefixLen = computeSortPrefixLen(indexSortPrefix); + + SortedDocsProducer sortedDocsProducer = sortPrefixLen == 0 ? + sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query()) : null; if (sortedDocsProducer != null) { - /* - The producer will visit documents sorted by the leading source of the composite definition - and terminates when the leading source value is guaranteed to be greater than the lowest - composite bucket in the queue. - */ + // Visit documents sorted by the leading source of the composite definition and terminates + // when the leading source value is guaranteed to be greater than the lowest composite bucket + // in the queue. DocIdSet docIdSet = sortedDocsProducer.processLeaf(context.query(), queue, ctx, fillDocIdSet); if (fillDocIdSet) { entries.add(new Entry(ctx, docIdSet)); } - - /* - We can bypass search entirely for this segment, all the processing has been done in the previous call. - Throwing this exception will terminate the execution of the search for this root aggregation, - see {@link org.apache.lucene.search.MultiCollector} for more details on how we handle early termination in aggregations. - */ + // We can bypass search entirely for this segment, the processing is done in the previous call. + // Throwing this exception will terminate the execution of the search for this root aggregation, + // see {@link MultiCollector} for more details on how we handle early termination in aggregations. + earlyTerminated = true; throw new CollectionTerminatedException(); } else { if (fillDocIdSet) { currentLeaf = ctx; docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc()); } - final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder)); - return new LeafBucketCollector() { - @Override - public void collect(int doc, long zeroBucket) throws IOException { - assert zeroBucket == 0L; - inner.collect(doc); - } - }; + if (rawAfterKey != null && sortPrefixLen > 0) { + // We have an after key and index sort is applicable so we jump directly to the doc + // that is after the index sort prefix using the rawAfterKey and we start collecting + // document from there. + processLeafFromQuery(ctx, indexSortPrefix); + throw new CollectionTerminatedException(); + } else { + final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, sortPrefixLen)); + return new LeafBucketCollector() { + @Override + public void collect(int doc, long zeroBucket) throws IOException { + assert zeroBucket == 0L; + inner.collect(doc); + } + }; + } } } /** * The first pass selects the top composite buckets from all matching documents. */ - private LeafBucketCollector getFirstPassCollector(RoaringDocIdSet.Builder builder) { + private LeafBucketCollector getFirstPassCollector(RoaringDocIdSet.Builder builder, int indexSortPrefix) { return new LeafBucketCollector() { int lastDoc = -1; @Override public void collect(int doc, long bucket) throws IOException { - int slot = queue.addIfCompetitive(); - if (slot != -1) { - if (builder != null && lastDoc != doc) { - builder.add(doc); - lastDoc = doc; + try { + if (queue.addIfCompetitive(indexSortPrefix)) { + if (builder != null && lastDoc != doc) { + builder.add(doc); + lastDoc = doc; + } } + } catch (CollectionTerminatedException exc) { + earlyTerminated = true; + throw exc; } } }; @@ -274,7 +410,6 @@ public void collect(int doc, long zeroBucket) throws IOException { private SingleDimensionValuesSource createValuesSource(BigArrays bigArrays, IndexReader reader, CompositeValuesSourceConfig config, int size) { - final int reverseMul = config.reverseMul(); if (config.valuesSource() instanceof ValuesSource.Bytes.WithOrdinals && reader instanceof DirectoryReader) { ValuesSource.Bytes.WithOrdinals vs = (ValuesSource.Bytes.WithOrdinals) config.valuesSource(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java index 58887d9e6a2dc..93511498e2258 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java @@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations.bucket.composite; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; @@ -63,6 +64,7 @@ public int hashCode() { private final int maxSize; private final Map map; private final SingleDimensionValuesSource[] arrays; + private IntArray docCounts; private boolean afterKeyIsSet = false; @@ -153,7 +155,7 @@ int compare(int slot1, int slot2) { cmp = arrays[i].compare(slot1, slot2); } if (cmp != 0) { - return cmp; + return cmp > 0 ? i+1 : -(i+1); } } return 0; @@ -244,27 +246,57 @@ LeafBucketCollector getLeafCollector(Comparable forceLeadSourceValue, /** * Check if the current candidate should be added in the queue. - * @return The target slot of the candidate or -1 is the candidate is not competitive. + * @return true if the candidate is competitive (added or already in the queue). + */ + boolean addIfCompetitive() { + return addIfCompetitive(0); + } + + + /** + * Add or update the current composite key in the queue if the values are competitive. + * + * @param indexSortSourcePrefix 0 if the index sort is null or doesn't match any of the sources field, + * a value greater than 0 indicates the prefix len of the sources that match the index sort + * and a negative value indicates that the index sort match the source field but the order is reversed. + * @return true if the candidate is competitive (added or already in the queue). + * + * @throws CollectionTerminatedException if the current collection can be terminated early due to index sorting. */ - int addIfCompetitive() { + boolean addIfCompetitive(int indexSortSourcePrefix) { // checks if the candidate key is competitive Integer topSlot = compareCurrent(); if (topSlot != null) { // this key is already in the top N, skip it docCounts.increment(topSlot, 1); - return topSlot; + return true; } - if (afterKeyIsSet && compareCurrentWithAfter() <= 0) { - // this key is greater than the top value collected in the previous round, skip it - return -1; + if (afterKeyIsSet) { + int cmp = compareCurrentWithAfter(); + if (cmp <= 0) { + if (indexSortSourcePrefix < 0 && cmp == indexSortSourcePrefix) { + // the leading index sort is in the reverse order of the leading source + // so we can early terminate when we reach a document that is smaller + // than the after key (collected on a previous page). + throw new CollectionTerminatedException(); + } + // key was collected on a previous page, skip it (>= afterKey). + return false; + } } - if (size() >= maxSize - // the tree map is full, check if the candidate key should be kept - && compare(CANDIDATE_SLOT, top()) > 0) { - // the candidate key is not competitive, skip it - return -1; + if (size() >= maxSize) { + // the tree map is full, check if the candidate key should be kept + int cmp = compare(CANDIDATE_SLOT, top()); + if (cmp > 0) { + if (cmp <= indexSortSourcePrefix) { + // index sort guarantees that there is no key greater or equal than the + // current one in the subsequent documents so we can early terminate. + throw new CollectionTerminatedException(); + } + // the candidate key is not competitive, skip it. + return false; + } } - // the candidate key is competitive final int newSlot; if (size() >= maxSize) { @@ -280,7 +312,7 @@ && compare(CANDIDATE_SLOT, top()) > 0) { copyCurrent(newSlot); map.put(new Slot(newSlot), newSlot); add(newSlot); - return newSlot; + return true; } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java index 70687cf9efe0c..d113ded687f81 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java @@ -200,7 +200,7 @@ public ValueType valueType() { } /** - * If true an explicit `null bucket will represent documents with missing values. + * If true an explicit null bucket will represent documents with missing values. */ @SuppressWarnings("unchecked") public AB missingBucket(boolean missingBucket) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java index bf88285c190c8..5c9378d44eff4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java @@ -33,23 +33,28 @@ class CompositeValuesSourceConfig { private final DocValueFormat format; private final int reverseMul; private final boolean missingBucket; + private final boolean hasScript; /** * Creates a new {@link CompositeValuesSourceConfig}. + * * @param name The name of the source. * @param fieldType The field type or null if the source is a script. * @param vs The underlying {@link ValuesSource}. * @param format The {@link DocValueFormat} of this source. * @param order The sort order associated with this source. + * @param missingBucket If true an explicit null bucket will represent documents with missing values. + * @param hasScript true if the source contains a script that can change the value. */ CompositeValuesSourceConfig(String name, @Nullable MappedFieldType fieldType, ValuesSource vs, DocValueFormat format, - SortOrder order, boolean missingBucket) { + SortOrder order, boolean missingBucket, boolean hasScript) { this.name = name; this.fieldType = fieldType; this.vs = vs; this.format = format; this.reverseMul = order == SortOrder.ASC ? 1 : -1; this.missingBucket = missingBucket; + this.hasScript = hasScript; } /** @@ -88,6 +93,13 @@ boolean missingBucket() { return missingBucket; } + /** + * Returns true if the source contains a script that can change the value. + */ + boolean hasScript() { + return hasScript; + } + /** * The sort order for the values source (e.g. -1 for descending and 1 for ascending). */ diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java index f1c1f5502dfd4..564399d4c2647 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java @@ -228,7 +228,8 @@ protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardCon // is specified in the builder. final DocValueFormat docValueFormat = format() == null ? DocValueFormat.RAW : config.format(); final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null; - return new CompositeValuesSourceConfig(name, fieldType, vs, docValueFormat, order(), missingBucket()); + return new CompositeValuesSourceConfig(name, fieldType, vs, docValueFormat, order(), + missingBucket(), config.script() != null); } else { throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GeoTileGridValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GeoTileGridValuesSourceBuilder.java index 17a5b3c0e9993..b6f2b2788cd25 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GeoTileGridValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GeoTileGridValuesSourceBuilder.java @@ -113,7 +113,8 @@ protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardCon // is specified in the builder. final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null; CellIdSource cellIdSource = new CellIdSource(geoPoint, precision, GeoTileUtils::longEncode); - return new CompositeValuesSourceConfig(name, fieldType, cellIdSource, DocValueFormat.GEOTILE, order(), missingBucket()); + return new CompositeValuesSourceConfig(name, fieldType, cellIdSource, DocValueFormat.GEOTILE, order(), + missingBucket(), script() != null); } else { throw new IllegalArgumentException("invalid source, expected geo_point, got " + orig.getClass().getSimpleName()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java index daafa6f14418e..aa15d5a6947d9 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java @@ -119,7 +119,8 @@ protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardCon ValuesSource.Numeric numeric = (ValuesSource.Numeric) orig; final HistogramValuesSource vs = new HistogramValuesSource(numeric, interval); final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null; - return new CompositeValuesSourceConfig(name, fieldType, vs, config.format(), order(), missingBucket()); + return new CompositeValuesSourceConfig(name, fieldType, vs, config.format(), order(), + missingBucket(), script() != null); } else { throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java index 243ae557bfa2c..d3d4c34953216 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java @@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations.bucket.composite; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -53,8 +54,10 @@ public class InternalComposite private final List sourceNames; private final List formats; + private final boolean earlyTerminated; + InternalComposite(String name, int size, List sourceNames, List formats, - List buckets, CompositeKey afterKey, int[] reverseMuls, + List buckets, CompositeKey afterKey, int[] reverseMuls, boolean earlyTerminated, List pipelineAggregators, Map metaData) { super(name, pipelineAggregators, metaData); this.sourceNames = sourceNames; @@ -63,6 +66,7 @@ public class InternalComposite this.afterKey = afterKey; this.size = size; this.reverseMuls = reverseMuls; + this.earlyTerminated = earlyTerminated; } public InternalComposite(StreamInput in) throws IOException { @@ -75,7 +79,8 @@ public InternalComposite(StreamInput in) throws IOException { } this.reverseMuls = in.readIntArray(); this.buckets = in.readList((input) -> new InternalBucket(input, sourceNames, formats, reverseMuls)); - this.afterKey = in.readBoolean() ? new CompositeKey(in) : null; + this.afterKey = in.readOptionalWriteable(CompositeKey::new); + this.earlyTerminated = in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readBoolean() : false; } @Override @@ -87,9 +92,9 @@ protected void doWriteTo(StreamOutput out) throws IOException { } out.writeIntArray(reverseMuls); out.writeList(buckets); - out.writeBoolean(afterKey != null); - if (afterKey != null) { - afterKey.writeTo(out); + out.writeOptionalWriteable(afterKey); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(earlyTerminated); } } @@ -111,7 +116,7 @@ public InternalComposite create(List newBuckets) { * to be able to retrieve the next page even if all buckets have been filtered. */ return new InternalComposite(name, size, sourceNames, formats, newBuckets, afterKey, - reverseMuls, pipelineAggregators(), getMetaData()); + reverseMuls, earlyTerminated, pipelineAggregators(), getMetaData()); } @Override @@ -137,6 +142,11 @@ public Map afterKey() { return null; } + // Visible for tests + boolean isTerminatedEarly() { + return earlyTerminated; + } + // Visible for tests int[] getReverseMuls() { return reverseMuls; @@ -145,8 +155,10 @@ int[] getReverseMuls() { @Override public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { PriorityQueue pq = new PriorityQueue<>(aggregations.size()); + boolean earlyTerminated = false; for (InternalAggregation agg : aggregations) { InternalComposite sortedAgg = (InternalComposite) agg; + earlyTerminated |= sortedAgg.earlyTerminated; BucketIterator it = new BucketIterator(sortedAgg.buckets); if (it.next() != null) { pq.add(it); @@ -178,7 +190,8 @@ public InternalAggregation reduce(List aggregations, Reduce result.add(reduceBucket); } final CompositeKey lastKey = result.size() > 0 ? result.get(result.size()-1).getRawKey() : null; - return new InternalComposite(name, size, sourceNames, formats, result, lastKey, reverseMuls, pipelineAggregators(), metaData); + return new InternalComposite(name, size, sourceNames, formats, result, lastKey, reverseMuls, + earlyTerminated, pipelineAggregators(), metaData); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java index 63530a4eed6ed..01e2c0a3ae5b0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java @@ -66,8 +66,7 @@ protected boolean processBucket(CompositeValuesCollectorQueue queue, LeafReaderC @Override public void collect(int doc, long bucket) throws IOException { hasCollected[0] = true; - int slot = queue.addIfCompetitive(); - if (slot != -1) { + if (queue.addIfCompetitive()) { topCompositeCollected[0]++; if (adder != null && doc != lastDoc) { if (remainingBits == 0) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java index 8d02eb4b19d87..cd88a56614ec5 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java @@ -85,6 +85,6 @@ protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardCon } else { format = config.format(); } - return new CompositeValuesSourceConfig(name, fieldType, vs, format, order(), missingBucket()); + return new CompositeValuesSourceConfig(name, fieldType, vs, format, order(), missingBucket(), script() != null); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java index 228a3ca7ee2b1..b7a01099597b1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode; @@ -385,4 +386,8 @@ public String getType() { return NAME; } + @Override + protected AggregationBuilder doRewrite(QueryRewriteContext queryShardContext) throws IOException { + return super.doRewrite(queryShardContext); + } } diff --git a/server/src/main/java/org/elasticsearch/search/searchafter/SearchAfterBuilder.java b/server/src/main/java/org/elasticsearch/search/searchafter/SearchAfterBuilder.java index 304a639a8981a..6c3ac160bc661 100644 --- a/server/src/main/java/org/elasticsearch/search/searchafter/SearchAfterBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/searchafter/SearchAfterBuilder.java @@ -184,7 +184,8 @@ private static Object convertValueFromSortType(String fieldName, SortField.Type if (value instanceof Number) { return ((Number) value).longValue(); } - return Long.parseLong(value.toString()); + return format.parseLong(value.toString(), false, + () -> { throw new IllegalStateException("now() is not allowed in [search_after] key"); }); case FLOAT: if (value instanceof Number) { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index 1520dfde8a116..601154234e792 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations.bucket.composite; +import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.DoublePoint; import org.apache.lucene.document.Field; @@ -31,17 +32,28 @@ import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.index.Term; import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.search.SortedSetSortField; +import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.NumericUtils; +import org.apache.lucene.util.TestUtil; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatters; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.ContentPath; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.GeoPointFieldMapper; @@ -63,6 +75,7 @@ import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.IndexSettingsModule; import org.junit.After; import org.junit.Before; @@ -82,12 +95,13 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; -public class CompositeAggregatorTests extends AggregatorTestCase { +public class CompositeAggregatorTests extends AggregatorTestCase { private static MappedFieldType[] FIELD_TYPES; @Override @@ -109,6 +123,7 @@ public void setUp() throws Exception { DateFieldMapper.Builder builder = new DateFieldMapper.Builder("date"); builder.docValues(true); + builder.format("yyyy-MM-dd||epoch_millis"); DateFieldMapper fieldMapper = builder.build(new Mapper.BuilderContext(createIndexSettings().getSettings(), new ContentPath(0))); FIELD_TYPES[3] = fieldMapper.fieldType(); @@ -419,7 +434,7 @@ public void testWithKeywordMissingAfter() throws Exception { ); } - public void testWithKeywordDesc() throws Exception { + public void testWithKeywordDesc() throws Exception { final List>> dataset = new ArrayList<>(); dataset.addAll( Arrays.asList( @@ -485,19 +500,19 @@ public void testMultiValuedWithKeyword() throws Exception { return new CompositeAggregationBuilder("name", Collections.singletonList(terms)); }, (result) -> { - assertEquals(5, result.getBuckets().size()); - assertEquals("{keyword=z}", result.afterKey().toString()); - assertEquals("{keyword=a}", result.getBuckets().get(0).getKeyAsString()); - assertEquals(2L, result.getBuckets().get(0).getDocCount()); - assertEquals("{keyword=b}", result.getBuckets().get(1).getKeyAsString()); - assertEquals(2L, result.getBuckets().get(1).getDocCount()); - assertEquals("{keyword=c}", result.getBuckets().get(2).getKeyAsString()); - assertEquals(1L, result.getBuckets().get(2).getDocCount()); - assertEquals("{keyword=d}", result.getBuckets().get(3).getKeyAsString()); - assertEquals(1L, result.getBuckets().get(3).getDocCount()); - assertEquals("{keyword=z}", result.getBuckets().get(4).getKeyAsString()); - assertEquals(1L, result.getBuckets().get(4).getDocCount()); - } + assertEquals(5, result.getBuckets().size()); + assertEquals("{keyword=z}", result.afterKey().toString()); + assertEquals("{keyword=a}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(0).getDocCount()); + assertEquals("{keyword=b}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(1).getDocCount()); + assertEquals("{keyword=c}", result.getBuckets().get(2).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(2).getDocCount()); + assertEquals("{keyword=d}", result.getBuckets().get(3).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(3).getDocCount()); + assertEquals("{keyword=z}", result.getBuckets().get(4).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(4).getDocCount()); + } ); testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, @@ -589,10 +604,10 @@ public void testWithKeywordAndLong() throws Exception { ); testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", - Arrays.asList( - new TermsValuesSourceBuilder("keyword").field("keyword"), - new TermsValuesSourceBuilder("long").field("long") - ) + Arrays.asList( + new TermsValuesSourceBuilder("keyword").field("keyword"), + new TermsValuesSourceBuilder("long").field("long") + ) ), (result) -> { assertEquals(4, result.getBuckets().size()); @@ -610,11 +625,11 @@ public void testWithKeywordAndLong() throws Exception { testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", - Arrays.asList( - new TermsValuesSourceBuilder("keyword").field("keyword"), - new TermsValuesSourceBuilder("long").field("long") - ) - ).aggregateAfter(createAfterKey("keyword", "a", "long", 100L) + Arrays.asList( + new TermsValuesSourceBuilder("keyword").field("keyword"), + new TermsValuesSourceBuilder("long").field("long") + ) + ).aggregateAfter(createAfterKey("keyword", "a", "long", 100L) ), (result) -> { assertEquals(2, result.getBuckets().size()); @@ -942,7 +957,7 @@ public void testMultiValuedWithKeywordLongAndDouble() throws Exception { new TermsValuesSourceBuilder("double").field("double") ) ).aggregateAfter(createAfterKey("keyword", "a", "long", 100L, "double", 0.4d)) - ,(result) -> { + , (result) -> { assertEquals(10, result.getBuckets().size()); assertEquals("{keyword=z, long=0, double=0.09}", result.afterKey().toString()); assertEquals("{keyword=b, long=100, double=0.4}", result.getBuckets().get(0).getKeyAsString()); @@ -1152,8 +1167,9 @@ public void testThatDateHistogramFailsFormatAfter() throws IOException { return new CompositeAggregationBuilder("name", Collections.singletonList(histo)) .aggregateAfter(createAfterKey("date", "now")); }, - (result) -> {} - )); + (result) -> { + } + )); assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class)); assertThat(exc.getCause().getMessage(), containsString("now() is not supported in [after] key")); @@ -1167,7 +1183,8 @@ public void testThatDateHistogramFailsFormatAfter() throws IOException { return new CompositeAggregationBuilder("name", Collections.singletonList(histo)) .aggregateAfter(createAfterKey("date", "1474329600000")); }, - (result) -> {} + (result) -> { + } )); assertThat(exc.getMessage(), containsString("failed to parse date field [1474329600000]")); assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future."); @@ -1486,7 +1503,7 @@ public void testWithKeywordAndDateHistogram() throws IOException { new DateHistogramValuesSourceBuilder("date_histo").field("date") .dateHistogramInterval(DateHistogramInterval.days(1)) ) - ).aggregateAfter(createAfterKey("keyword","c", "date_histo", 1474329600000L)) + ).aggregateAfter(createAfterKey("keyword", "c", "date_histo", 1474329600000L)) , (result) -> { assertEquals(4, result.getBuckets().size()); assertEquals("{keyword=z, date_histo=1474329600000}", result.afterKey().toString()); @@ -1668,7 +1685,7 @@ public void testDuplicateNames() { builders.add(new TermsValuesSourceBuilder("duplicate1").field("baz")); builders.add(new TermsValuesSourceBuilder("duplicate2").field("bar")); builders.add(new TermsValuesSourceBuilder("duplicate2").field("baz")); - new CompositeAggregationBuilder("foo", builders); + new CompositeAggregationBuilder("foo", builders); }); assertThat(e.getMessage(), equalTo("Composite source names must be unique, found duplicates: [duplicate2, duplicate1]")); } @@ -1705,7 +1722,7 @@ private , V extends Comparable> void testRandomTerms( List>> dataset = new ArrayList<>(); Set valuesSet = new HashSet<>(); - Map, AtomicLong> expectedDocCounts = new HashMap<> (); + Map, AtomicLong> expectedDocCounts = new HashMap<>(); for (int i = 0; i < numDocs; i++) { int numValues = randomIntBetween(1, 5); Set values = new HashSet<>(); @@ -1725,13 +1742,13 @@ private , V extends Comparable> void testRandomTerms( List> seen = new ArrayList<>(); AtomicBoolean finish = new AtomicBoolean(false); - int size = randomIntBetween(1, expected.size()); + int size = randomIntBetween(1, expected.size()); while (finish.get() == false) { testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(field)), dataset, () -> { Map afterKey = null; if (seen.size() > 0) { - afterKey = Collections.singletonMap(field, seen.get(seen.size()-1)); + afterKey = Collections.singletonMap(field, seen.get(seen.size() - 1)); } TermsValuesSourceBuilder source = new TermsValuesSourceBuilder(field).field(field); return new CompositeAggregationBuilder("name", Collections.singletonList(source)) @@ -1838,44 +1855,130 @@ public void testWithGeoPoint() throws Exception { ); } + public void testEarlyTermination() throws Exception { + final List>> dataset = new ArrayList<>(); + dataset.addAll( + Arrays.asList( + createDocument("keyword", "a", "long", 100L, "foo", "bar"), + createDocument("keyword", "c", "long", 100L, "foo", "bar"), + createDocument("keyword", "a", "long", 0L, "foo", "bar"), + createDocument("keyword", "d", "long", 10L, "foo", "bar"), + createDocument("keyword", "b", "long", 10L, "foo", "bar"), + createDocument("keyword", "c", "long", 10L, "foo", "bar"), + createDocument("keyword", "e", "long", 100L, "foo", "bar"), + createDocument("keyword", "e", "long", 10L, "foo", "bar") + ) + ); + + executeTestCase(true, false, new TermQuery(new Term("foo", "bar")), + dataset, + () -> + new CompositeAggregationBuilder("name", + Arrays.asList( + new TermsValuesSourceBuilder("keyword").field("keyword"), + new TermsValuesSourceBuilder("long").field("long") + )).aggregateAfter(createAfterKey("keyword", "b", "long", 10L)).size(2), + (result) -> { + assertEquals(2, result.getBuckets().size()); + assertEquals("{keyword=c, long=100}", result.afterKey().toString()); + assertEquals("{keyword=c, long=10}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(0).getDocCount()); + assertEquals("{keyword=c, long=100}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(1).getDocCount()); + assertTrue(result.isTerminatedEarly()); + } + ); + + // source field and index sorting config have different order + executeTestCase(true, false, new TermQuery(new Term("foo", "bar")), + dataset, + () -> + new CompositeAggregationBuilder("name", + Arrays.asList( + // reverse source order + new TermsValuesSourceBuilder("keyword").field("keyword").order(SortOrder.DESC), + new TermsValuesSourceBuilder("long").field("long").order(SortOrder.DESC) + ) + ).aggregateAfter(createAfterKey("keyword", "c", "long", 10L)).size(2), + (result) -> { + assertEquals(2, result.getBuckets().size()); + assertEquals("{keyword=a, long=100}", result.afterKey().toString()); + assertEquals("{keyword=b, long=10}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(0).getDocCount()); + assertEquals("{keyword=a, long=100}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(1).getDocCount()); + assertTrue(result.isTerminatedEarly()); + } + ); + } + private void testSearchCase(List queries, List>> dataset, Supplier create, Consumer verify) throws IOException { for (Query query : queries) { - executeTestCase(false, query, dataset, create, verify); - executeTestCase(true, query, dataset, create, verify); + executeTestCase(false, false, query, dataset, create, verify); + executeTestCase(false, true, query, dataset, create, verify); + executeTestCase(true, true, query, dataset, create, verify); } } - private void executeTestCase(boolean reduced, + private void executeTestCase(boolean useIndexSort, + boolean reduced, Query query, List>> dataset, Supplier create, Consumer verify) throws IOException { + Map types = + Arrays.stream(FIELD_TYPES).collect(Collectors.toMap(MappedFieldType::name, Function.identity())); + CompositeAggregationBuilder aggregationBuilder = create.get(); + Sort indexSort = useIndexSort ? buildIndexSort(aggregationBuilder.sources(), types) : null; + IndexSettings indexSettings = createIndexSettings(indexSort); try (Directory directory = newDirectory()) { - try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + IndexWriterConfig config = newIndexWriterConfig(random(), new MockAnalyzer(random())); + if (indexSort != null) { + config.setIndexSort(indexSort); + config.setCodec(TestUtil.getDefaultCodec()); + } + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, config)) { Document document = new Document(); for (Map> fields : dataset) { addToDocument(document, fields); indexWriter.addDocument(document); document.clear(); } + if (reduced == false && randomBoolean()) { + indexWriter.forceMerge(1); + } } try (IndexReader indexReader = DirectoryReader.open(directory)) { IndexSearcher indexSearcher = new IndexSearcher(indexReader); - CompositeAggregationBuilder aggregationBuilder = create.get(); final InternalComposite composite; if (reduced) { - composite = searchAndReduce(indexSearcher, query, aggregationBuilder, FIELD_TYPES); + composite = searchAndReduce(indexSettings, indexSearcher, query, aggregationBuilder, FIELD_TYPES); } else { - composite = search(indexSearcher, query, aggregationBuilder, FIELD_TYPES); + composite = search(indexSettings, indexSearcher, query, aggregationBuilder, FIELD_TYPES); } verify.accept(composite); } } } + private static IndexSettings createIndexSettings(Sort sort) { + Settings.Builder builder = Settings.builder(); + if (sort != null) { + String[] fields = Arrays.stream(sort.getSort()) + .map(SortField::getField) + .toArray(String[]::new); + String[] orders = Arrays.stream(sort.getSort()) + .map((o) -> o.getReverse() ? "desc" : "asc") + .toArray(String[]::new); + builder.putList("index.sort.field", fields); + builder.putList("index.sort.order", orders); + } + return IndexSettingsModule.newIndexSettings(new Index("_index", "0"), builder.build()); + } + private void addToDocument(Document doc, Map> keys) { for (Map.Entry> entry : keys.entrySet()) { final String name = entry.getKey(); @@ -1935,4 +2038,43 @@ private static Map> createDocument(Object... fields) { private static long asLong(String dateTime) { return DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(dateTime)).toInstant().toEpochMilli(); } + + private static Sort buildIndexSort(List> sources, Map fieldTypes) { + List sortFields = new ArrayList<>(); + for (CompositeValuesSourceBuilder source : sources) { + MappedFieldType type = fieldTypes.get(source.field()); + if (type instanceof KeywordFieldMapper.KeywordFieldType) { + sortFields.add(new SortedSetSortField(type.name(), false)); + } else if (type instanceof DateFieldMapper.DateFieldType) { + sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.LONG, false)); + } else if (type instanceof NumberFieldMapper.NumberFieldType) { + boolean comp = false; + switch (type.typeName()) { + case "byte": + case "short": + case "integer": + comp = true; + sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.INT, false)); + break; + + case "long": + sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.LONG, false)); + break; + + case "float": + case "double": + comp = true; + sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.DOUBLE, false)); + break; + + default: + break; + } + if (comp == false) { + break; + } + } + } + return sortFields.size() > 0 ? new Sort(sortFields.toArray(new SortField[0])) : null; + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java index 6516309de965f..ff893314e5988 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java @@ -29,10 +29,16 @@ import org.apache.lucene.index.DocValues; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.DocIdSet; import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.search.SortedSetSortField; import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; @@ -56,6 +62,7 @@ import static org.elasticsearch.index.mapper.NumberFieldMapper.NumberType.DOUBLE; import static org.elasticsearch.index.mapper.NumberFieldMapper.NumberType.LONG; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public class CompositeValuesCollectorQueueTests extends AggregatorTestCase { static class ClassAndName { @@ -133,31 +140,47 @@ public void testRandom() throws IOException { } private void testRandomCase(ClassAndName... types) throws IOException { - testRandomCase(true, true, types); - testRandomCase(true, false, types); - testRandomCase(false, true, types); - testRandomCase(false, false, types); + for (int i = 0; i < types.length; i++) { + testRandomCase(true, true, i, types); + testRandomCase(true, false, i, types); + testRandomCase(false, true, i, types); + testRandomCase(false, false, i, types); + } } - private void testRandomCase(boolean forceMerge, boolean missingBucket, ClassAndName... types) throws IOException { + private void testRandomCase(boolean forceMerge, + boolean missingBucket, + int indexSortSourcePrefix, + ClassAndName... types) throws IOException { final BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE; int numDocs = randomIntBetween(50, 100); List[]> possibleValues = new ArrayList<>(); - for (ClassAndName type : types) { + SortField[] indexSortFields = indexSortSourcePrefix == 0 ? null : new SortField[indexSortSourcePrefix]; + for (int i = 0; i < types.length; i++) { + ClassAndName type = types[i]; final Comparable[] values; int numValues = randomIntBetween(1, numDocs * 2); values = new Comparable[numValues]; if (type.clazz == Long.class) { - for (int i = 0; i < numValues; i++) { - values[i] = randomLong(); + if (i < indexSortSourcePrefix) { + indexSortFields[i] = new SortedNumericSortField(type.fieldType.name(), SortField.Type.LONG); + } + for (int j = 0; j < numValues; j++) { + values[j] = randomLong(); } } else if (type.clazz == Double.class) { - for (int i = 0; i < numValues; i++) { - values[i] = randomDouble(); + if (i < indexSortSourcePrefix) { + indexSortFields[i] = new SortedNumericSortField(type.fieldType.name(), SortField.Type.DOUBLE); + } + for (int j = 0; j < numValues; j++) { + values[j] = randomDouble(); } } else if (type.clazz == BytesRef.class) { - for (int i = 0; i < numValues; i++) { - values[i] = new BytesRef(randomAlphaOfLengthBetween(5, 50)); + if (i < indexSortSourcePrefix) { + indexSortFields[i] = new SortedSetSortField(type.fieldType.name(), false); + } + for (int j = 0; j < numValues; j++) { + values[j] = new BytesRef(randomAlphaOfLengthBetween(5, 50)); } } else { assert (false); @@ -167,13 +190,17 @@ private void testRandomCase(boolean forceMerge, boolean missingBucket, ClassAndN Set keys = new HashSet<>(); try (Directory directory = newDirectory()) { + final IndexWriterConfig writerConfig = newIndexWriterConfig(); + if (indexSortFields != null) { + writerConfig.setIndexSort(new Sort(indexSortFields)); + } try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, new KeywordAnalyzer())) { for (int i = 0; i < numDocs; i++) { Document document = new Document(); List>> docValues = new ArrayList<>(); boolean hasAllField = true; for (int j = 0; j < types.length; j++) { - int numValues = randomIntBetween(0, 5); + int numValues = indexSortSourcePrefix-1 >= j ? 1 : randomIntBetween(0, 5); List> values = new ArrayList<>(); if (numValues == 0) { hasAllField = false; @@ -212,7 +239,7 @@ private void testRandomCase(boolean forceMerge, boolean missingBucket, ClassAndN } } IndexReader reader = DirectoryReader.open(directory); - int size = randomIntBetween(1, keys.size()); + int size = keys.size() > 1 ? randomIntBetween(1, keys.size()) : 1; SingleDimensionValuesSource[] sources = new SingleDimensionValuesSource[types.length]; for (int i = 0; i < types.length; i++) { final MappedFieldType fieldType = types[i].fieldType; @@ -276,21 +303,25 @@ private void testRandomCase(boolean forceMerge, boolean missingBucket, ClassAndN new CompositeValuesCollectorQueue(BigArrays.NON_RECYCLING_INSTANCE, sources, size, last); final SortedDocsProducer docsProducer = sources[0].createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery()); for (LeafReaderContext leafReaderContext : reader.leaves()) { - final LeafBucketCollector leafCollector = new LeafBucketCollector() { - @Override - public void collect(int doc, long bucket) throws IOException { - queue.addIfCompetitive(); - } - }; if (docsProducer != null && withProducer) { assertEquals(DocIdSet.EMPTY, docsProducer.processLeaf(new MatchAllDocsQuery(), queue, leafReaderContext, false)); } else { + final LeafBucketCollector leafCollector = new LeafBucketCollector() { + @Override + public void collect(int doc, long bucket) throws IOException { + queue.addIfCompetitive(indexSortSourcePrefix); + } + }; final LeafBucketCollector queueCollector = queue.getLeafCollector(leafReaderContext, leafCollector); final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); for (int i = 0; i < leafReaderContext.reader().maxDoc(); i++) { if (liveDocs == null || liveDocs.get(i)) { - queueCollector.collect(i); + try { + queueCollector.collect(i); + } catch (CollectionTerminatedException exc) { + assertThat(indexSortSourcePrefix, greaterThan(0)); + } } } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/InternalCompositeTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/InternalCompositeTests.java index 98263d2ebb680..4edd694a3ac03 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/InternalCompositeTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/InternalCompositeTests.java @@ -170,7 +170,7 @@ protected InternalComposite createTestInstance(String name, List o1.compareKey(o2)); CompositeKey lastBucket = buckets.size() > 0 ? buckets.get(buckets.size()-1).getRawKey() : null; - return new InternalComposite(name, size, sourceNames, formats, buckets, lastBucket, reverseMuls, + return new InternalComposite(name, size, sourceNames, formats, buckets, lastBucket, reverseMuls, randomBoolean(), Collections.emptyList(), metaData); } @@ -207,7 +207,7 @@ protected InternalComposite mutateInstance(InternalComposite instance) throws IO } CompositeKey lastBucket = buckets.size() > 0 ? buckets.get(buckets.size()-1).getRawKey() : null; return new InternalComposite(instance.getName(), instance.getSize(), sourceNames, formats, buckets, lastBucket, reverseMuls, - instance.pipelineAggregators(), metaData); + randomBoolean(), instance.pipelineAggregators(), metaData); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 221030cd8d157..7c02fb2481f4c 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -306,7 +306,15 @@ protected A search(IndexSe Query query, AggregationBuilder builder, MappedFieldType... fieldTypes) throws IOException { - return search(searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes); + return search(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes); + } + + protected A search(IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + MappedFieldType... fieldTypes) throws IOException { + return search(indexSettings, searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes); } protected A search(IndexSearcher searcher, @@ -314,8 +322,17 @@ protected A search(IndexSe AggregationBuilder builder, int maxBucket, MappedFieldType... fieldTypes) throws IOException { + return search(createIndexSettings(), searcher, query, builder, maxBucket, fieldTypes); + } + + protected A search(IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + int maxBucket, + MappedFieldType... fieldTypes) throws IOException { MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket); - C a = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes); + C a = createAggregator(query, builder, searcher, indexSettings, bucketConsumer, fieldTypes); a.preCollection(); searcher.search(query, a); a.postCollection(); @@ -329,7 +346,23 @@ protected A searchAndReduc Query query, AggregationBuilder builder, MappedFieldType... fieldTypes) throws IOException { - return searchAndReduce(searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes); + return searchAndReduce(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes); + } + + protected A searchAndReduce(IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + MappedFieldType... fieldTypes) throws IOException { + return searchAndReduce(indexSettings, searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes); + } + + protected A searchAndReduce(IndexSearcher searcher, + Query query, + AggregationBuilder builder, + int maxBucket, + MappedFieldType... fieldTypes) throws IOException { + return searchAndReduce(createIndexSettings(), searcher, query, builder, maxBucket, fieldTypes); } /** @@ -337,7 +370,8 @@ protected A searchAndReduc * builds an aggregator for each sub-searcher filtered by the provided {@link Query} and * returns the reduced {@link InternalAggregation}. */ - protected A searchAndReduce(IndexSearcher searcher, + protected A searchAndReduce(IndexSettings indexSettings, + IndexSearcher searcher, Query query, AggregationBuilder builder, int maxBucket, @@ -366,7 +400,7 @@ protected A searchAndReduc for (ShardSearcher subSearcher : subSearchers) { MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer(maxBucket); - C a = createAggregator(query, builder, subSearcher, shardBucketConsumer, fieldTypes); + C a = createAggregator(query, builder, subSearcher, indexSettings, shardBucketConsumer, fieldTypes); a.preCollection(); subSearcher.search(weight, a); a.postCollection();