-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add composite aggregator #26800
Add composite aggregator #26800
Conversation
d9b7241
to
6a1b54c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general this change looks good to me. I left some questions and remarks. I'll do another review round later.
This PR is quite large and it would be great if someone else can also take a look at it. Maybe @colings86?
long ord; | ||
while ((ord = dvs.nextOrd()) != NO_MORE_ORDS) { | ||
values[0] = ord; | ||
next.collect(doc, 0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change 0L
into bucket
?
throw new CollectionTerminatedException(); | ||
} | ||
// just skip this key for now | ||
return ; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove whitespace?
} | ||
} | ||
|
||
private LeafBucketCollector getFirstPassCollector() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this method and the next method can just return LeafCollector
?
The bucket ord isn't used in this implementations. Also these collectors are not directly used by the aggs framework, but are wrapped by a LeafBucketCollector
instance in CompositeValuesSource.java
.
final LeafBucketCollector collector = array.getLeafCollector(context.ctx, getSecondPassCollector(context.subCollector)); | ||
int docID; | ||
while ((docID = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { | ||
collector.collect(docID, 0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use collector.collect(docID);
instead?
/** | ||
* A wrapper for {@link ValuesSource} that can record and compare values produced during a collection. | ||
*/ | ||
abstract class CompositeValuesSource<VS extends ValuesSource, T extends Comparable<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some comparing and ordering done here. I wonder if we can incorporate or extend Lucene's PriorityQueue
class here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using this class to create slots that can be referenced in a priority queue. It's mainly a comparator + a composite array that can update the values. It is equivalent to a FieldComparator
but for buckets.
import java.util.Map; | ||
import java.util.TreeMap; | ||
|
||
class CompositeAggregator extends BucketsAggregator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make class final?
public class CompositeKey { | ||
final Comparable<?>[] values; | ||
|
||
public CompositeKey(Comparable<?>... values) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the constructor be package protected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be used to build a composite aggregation so I think it's useful to keep it public for now. It adds the ability to start from anywhere a composite aggregation in the java client.
return new LeafBucketCollector() { | ||
@Override | ||
public void collect(int doc, long bucket) throws IOException { | ||
if (dvs.advanceExact(doc)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bucket variable is always zero here, right? If that is the case then maybe we should have assertions for this in these LeafBucketCollector anonymous classes?
private final List<InternalBucket> buckets; | ||
private final int[] reverseMuls; | ||
|
||
public InternalComposite(String name, int size, List<InternalBucket> buckets, int[] reverseMuls, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these constructors can be package protected?
import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode; | ||
import static org.hamcrest.Matchers.hasSize; | ||
|
||
public class CompositeAggregationBuilderTests extends ESTestCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a dedicated testcase for each build (extending from AbstractStreamableTestCase
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a test for the CompositeAggregationBuilder
so it randomizes the different value sources that can be used inside. Splitting the test for each source would defeat the purpose since the idea is to compose an aggregation from different value source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a first look, will need to review again later.
One thing that I am a little wary of is that we seem to be recreating a lot of the ValuesSource classes here. I am wondering if we can reuse more of the existing ValuesSource builder and parser code to avoid having it in two places? We will want the sources
to feel very like an array of ValuesSource configs so as far as possible it would be good to use the same code to parse them as we have for the regular aggs.
*/ | ||
CompositeKey afterKey(); | ||
|
||
static XContentBuilder bucketToXContentFragment(CompositeAggregation.Bucket bucket, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, can this build the object rather than just the fragment? It looks like it just gets wrapped in an object below?
inner.collect(doc); | ||
} | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its worth adding some JAvaDocs here to explain how the collection works and what is happening in the first pass and the second pass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
/** | ||
* A {@link ValuesSource} builder for {@link CompositeAggregationBuilder} | ||
*/ | ||
public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSourceBuilder<AB>> implements Writeable, ToXContentFragment { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very like the ValuesSourceBuilder that already exists. Are we not able to use that here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried but this values source builder is not an AbstractAggregationBuilder
. It is used solely to access the values, not to build a complete aggregator. I also need to access the values in a specific way in order to be able to build the combinations of multiple values source so my usage is really an edge case.
@martijnvg @colings86 thanks for reviewing |
This is looking good. I wonder if
Then the result would look like this:
This seems more human readable, but maybe using a hashmap instead of an array to represent the values is less useful for consumers of this API like Kibana. I'm easy. The only other thing I'd suggest is to add an introductory paragraph at the beginning of the docs explaining why you would use the |
experimental[] | ||
|
||
A multi-bucket aggregation that creates composite buckets from different sources. | ||
The buckets are build from the combinations of the values extracted/created for each document and each |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
build -> built
// CONSOLE | ||
|
||
WARNING: The optimization takes effect only if the fields used for sorting are single-valued and follow | ||
the same order than the aggregation (`desc` or `asc`). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
than -> as
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jimczi I left a few more comments
/** | ||
* Returns a new {@link TermsValuesSourceBuilder}. | ||
*/ | ||
public TermsValuesSourceBuilder termsSource() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods could be static? In fact do we need these methods since they just create the source builders directly anyway?
private CompositeKey afterKey; | ||
private int size = 10; | ||
|
||
public CompositeAggregationBuilder(String name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a parameter for the sources here since they are required? That way you can't create an invalid instance of this builder? We could also then validate the length of the after key in the setter rather than when doBuild()
is called.
return vs; | ||
} | ||
|
||
int reverseMul() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a javaDoc for this. It took me a while to work out what this actually was as it wasn't obvious from where its used
tzRoundingBuilder = Rounding.builder(TimeValue.timeValueMillis(interval)); | ||
} | ||
Rounding rounding = tzRoundingBuilder.build(); | ||
return rounding; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have timezone support here?
++ to some kind of ability to name sources. It'll be a lot easier for applications to use if they can refer to names rather than positions. Ditto to the
Two questions! :) If a composite key doesn't have any documents, is it returned in results with My understanding is that if index sorting is not enabled, paging through the results with |
The composite buckets are created from existing values only.
Your understanding is correct. We need to reevaluate all documents on every requests but the memory consumption (dictated by |
Thanks @jimczi!
I'm not sure... probably not. I was mostly just curious, not sure it's really needed. The main reason we went to
Good to know, thanks for the explanation. I'll run some tests here locally to get a feel for the impact. :) |
Does this mean that pipeline aggs are not supported under the |
@clintongormley it'll mean that pipeline aggregations like derivative, moving_average and cumulative sum which only work with histogram or date_histogram aggregations will not work but we wouldn't be able to make them work across pages anyway since it would need to maintain state across requests. Pipeline aggregations like bucket_selector and bucket_script I think will still work. |
@@ -546,7 +547,8 @@ private void parse(ParseContext parseContext, Token token, XContentParser parser | |||
} | |||
} | |||
} else { | |||
throw new ElasticsearchParseException("failed to parse expected text or object got" + token.name()); | |||
throw new ParsingException(parser.getTokenLocation(), "failed to parse expected text or object got " + token.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change looks unrelated? and I think was addressed in a different PR.
fa5ade8
to
626dff7
Compare
…aggregation named `composite`. The `composite` aggregation is a multi-buckets aggregation that creates composite buckets made of multiple sources. The sources for each bucket can be defined as: * A `terms` source, values are extracted from a field or a script. * A `date_histogram` source, values are extracted from a date field and rounded to the provided interval. This aggregation can be used to retrieve all buckets of a deeply nested aggregation by flattening the nested aggregation in composite buckets. A composite buckets is composed of one value per source and is built for each document as the combinations of values in the provided sources. For instance the following aggregation: ```` "test_agg": { "terms": { "field": "field1" }, "aggs": { "nested_test_agg": "terms": { "field": "field2" } } } ```` ... which retrieves the top N terms for `field1` and for each top term in `field1` the top N terms for `field2`, can be replaced by a `composite` aggregation in order to retrieve **all** the combinations of `field1`, `field2` in the matching documents: ```` "composite_agg": { "composite": { "sources": [ { "field1": { "terms": { "field": "field1" } } }, { "field2": { "terms": { "field": "field2" } } }, } } ```` The response of the aggregation looks like this: ```` "aggregations": { "composite_agg": { "buckets": [ { "key": { "field1": "alabama", "field2": "almanach" }, "doc_count": 100 }, { "key": { "field1": "alabama", "field2": "calendar" }, "doc_count": 1 }, { "key": { "field1": "arizona", "field2": "calendar" }, "doc_count": 1 } ] } } ```` By default this aggregation returns 10 buckets sorted in ascending order of the composite key. Pagination can be achieved by providing `after` values, the values of the composite key to aggregate after. For instance the following aggregation will aggregate all composite keys that sorts after `arizona, calendar`: ```` "composite_agg": { "composite": { "after": {"field1": "alabama", "field2": "calendar"}, "size": 100, "sources": [ { "field1": { "terms": { "field": "field1" } } }, { "field2": { "terms": { "field": "field2" } } } } } ```` This aggregation is optimized for indices that set an index sorting that match the composite source definition. For instance the aggregation above could run faster on indices that defines an index sorting like this: ```` "settings": { "index.sort.field": ["field1", "field2"] } ```` In this case the `composite` aggregation can early terminate on each segment. This aggregation also accepts multi-valued field but disables early termination for these fields even if index sorting matches the sources definition. This is mandatory because index sorting picks only one value per document to perform the sort. another iter docs Add tests for time zone support docs docs
626dff7
to
fb02640
Compare
I pushed another iteration to address reviews.
@martijnvg can you take another look ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left two small comments. LGTM otherwise!
return Arrays.hashCode(values); | ||
} | ||
|
||
static String formatValue(Object value, DocValueFormat formatter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed? It does not seem to be used.
@@ -182,7 +182,7 @@ public TypeParser() { | |||
protected FormatDateTimeFormatter dateTimeFormatter; | |||
protected DateMathParser dateMathParser; | |||
|
|||
DateFieldType() { | |||
public DateFieldType() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to keep this package protected. I think in CompositeAggregatorTests#setUp()
we should do this instead:
DateFieldMapper.Builder builder = new DateFieldMapper.Builder("date");
builder.docValues(true);
DateFieldMapper fieldMapper = builder.build(new Mapper.BuilderContext(createIndexSettings().getSettings(), new ContentPath(0)));
FIELD_TYPES[3] = fieldMapper.fieldType();
* This change adds a module called `aggs-composite` that defines a new aggregation named `composite`. The `composite` aggregation is a multi-buckets aggregation that creates composite buckets made of multiple sources. The sources for each bucket can be defined as: * A `terms` source, values are extracted from a field or a script. * A `date_histogram` source, values are extracted from a date field and rounded to the provided interval. This aggregation can be used to retrieve all buckets of a deeply nested aggregation by flattening the nested aggregation in composite buckets. A composite buckets is composed of one value per source and is built for each document as the combinations of values in the provided sources. For instance the following aggregation: ```` "test_agg": { "terms": { "field": "field1" }, "aggs": { "nested_test_agg": "terms": { "field": "field2" } } } ```` ... which retrieves the top N terms for `field1` and for each top term in `field1` the top N terms for `field2`, can be replaced by a `composite` aggregation in order to retrieve **all** the combinations of `field1`, `field2` in the matching documents: ```` "composite_agg": { "composite": { "sources": [ { "field1": { "terms": { "field": "field1" } } }, { "field2": { "terms": { "field": "field2" } } }, } } ```` The response of the aggregation looks like this: ```` "aggregations": { "composite_agg": { "buckets": [ { "key": { "field1": "alabama", "field2": "almanach" }, "doc_count": 100 }, { "key": { "field1": "alabama", "field2": "calendar" }, "doc_count": 1 }, { "key": { "field1": "arizona", "field2": "calendar" }, "doc_count": 1 } ] } } ```` By default this aggregation returns 10 buckets sorted in ascending order of the composite key. Pagination can be achieved by providing `after` values, the values of the composite key to aggregate after. For instance the following aggregation will aggregate all composite keys that sorts after `arizona, calendar`: ```` "composite_agg": { "composite": { "after": {"field1": "alabama", "field2": "calendar"}, "size": 100, "sources": [ { "field1": { "terms": { "field": "field1" } } }, { "field2": { "terms": { "field": "field2" } } } } } ```` This aggregation is optimized for indices that set an index sorting that match the composite source definition. For instance the aggregation above could run faster on indices that defines an index sorting like this: ```` "settings": { "index.sort.field": ["field1", "field2"] } ```` In this case the `composite` aggregation can early terminate on each segment. This aggregation also accepts multi-valued field but disables early termination for these fields even if index sorting matches the sources definition. This is mandatory because index sorting picks only one value per document to perform the sort.
* This change adds a module called `aggs-composite` that defines a new aggregation named `composite`. The `composite` aggregation is a multi-buckets aggregation that creates composite buckets made of multiple sources. The sources for each bucket can be defined as: * A `terms` source, values are extracted from a field or a script. * A `date_histogram` source, values are extracted from a date field and rounded to the provided interval. This aggregation can be used to retrieve all buckets of a deeply nested aggregation by flattening the nested aggregation in composite buckets. A composite buckets is composed of one value per source and is built for each document as the combinations of values in the provided sources. For instance the following aggregation: ```` "test_agg": { "terms": { "field": "field1" }, "aggs": { "nested_test_agg": "terms": { "field": "field2" } } } ```` ... which retrieves the top N terms for `field1` and for each top term in `field1` the top N terms for `field2`, can be replaced by a `composite` aggregation in order to retrieve **all** the combinations of `field1`, `field2` in the matching documents: ```` "composite_agg": { "composite": { "sources": [ { "field1": { "terms": { "field": "field1" } } }, { "field2": { "terms": { "field": "field2" } } }, } } ```` The response of the aggregation looks like this: ```` "aggregations": { "composite_agg": { "buckets": [ { "key": { "field1": "alabama", "field2": "almanach" }, "doc_count": 100 }, { "key": { "field1": "alabama", "field2": "calendar" }, "doc_count": 1 }, { "key": { "field1": "arizona", "field2": "calendar" }, "doc_count": 1 } ] } } ```` By default this aggregation returns 10 buckets sorted in ascending order of the composite key. Pagination can be achieved by providing `after` values, the values of the composite key to aggregate after. For instance the following aggregation will aggregate all composite keys that sorts after `arizona, calendar`: ```` "composite_agg": { "composite": { "after": {"field1": "alabama", "field2": "calendar"}, "size": 100, "sources": [ { "field1": { "terms": { "field": "field1" } } }, { "field2": { "terms": { "field": "field2" } } } } } ```` This aggregation is optimized for indices that set an index sorting that match the composite source definition. For instance the aggregation above could run faster on indices that defines an index sorting like this: ```` "settings": { "index.sort.field": ["field1", "field2"] } ```` In this case the `composite` aggregation can early terminate on each segment. This aggregation also accepts multi-valued field but disables early termination for these fields even if index sorting matches the sources definition. This is mandatory because index sorting picks only one value per document to perform the sort.
* master: Stop skipping REST test after backport of #27056 Fix default value of ignore_unavailable for snapshot REST API (#27056) Add composite aggregator (#26800) Fix `ShardSplittingQuery` to respect nested documents. (#27398) [Docs] Restore section about multi-level parent/child relation in parent-join (#27392) Add TcpChannel to unify Transport implementations (#27132) Add note on plugin distributions in plugins folder Remove implementations of `TransportChannel` (#27388) Update Google SDK to version 1.23 (#27381) Fix Gradle 4.3.1 compatibility for logging (#27382) [Test] Change Elasticsearch startup timeout to 120s in packaging tests Docs/windows installer (#27369)
* master: (31 commits) [TEST] Fix `GeoShapeQueryTests#testPointsOnly` failure Transition transport apis to use void listeners (#27440) AwaitsFix GeoShapeQueryTests#testPointsOnly #27454 Bump test version after backport Ensure nested documents have consistent version and seq_ids (#27455) Tests: Add Fedora-27 to packaging tests Delete some seemingly unused exceptions (#27439) #26800: Fix docs rendering Remove config prompting for secrets and text (#27216) Move the CLI into its own subproject (#27114) Correct usage of "an" to "a" in getting started docs Avoid NPE when getting build information Removes BWC snapshot status handler used in 6.x (#27443) Remove manual tracking of registered channels (#27445) Remove parameters on HandshakeResponseHandler (#27444) [GEO] fix pointsOnly bug for MULTIPOINT Standardize underscore requirements in parameters (#27414) peanut butter hamburgers Log primary-replica resync failures Uses TransportMasterNodeAction to update shard snapshot status (#27165) ...
* 6.x: (41 commits) [TEST] Fix `GeoShapeQueryTests#testPointsOnly` failure Transition transport apis to use void listeners (#27440) AwaitsFix GeoShapeQueryTests#testPointsOnly #27454 Ensure nested documents have consistent version and seq_ids (#27455) Tests: Add Fedora-27 to packaging tests #26800: Fix docs rendering Move the CLI into its own subproject (#27114) Correct usage of "an" to "a" in getting started docs Avoid NPE when getting build information Remove manual tracking of registered channels (#27445) Standardize underscore requirements in parameters (#27414) Remove parameters on HandshakeResponseHandler (#27444) [GEO] fix pointsOnly bug for MULTIPOINT peanut butter hamburgers Uses TransportMasterNodeAction to update shard snapshot status (#27165) Log primary-replica resync failures Add limits for ngram and shingle settings (#27411) Enforce a minimum task execution and service time of 1 nanosecond Fix place-holder in allocation decider messages (#27436) Remove newline from log message (#27425) ...
Exclude "key" field from random modifications in tests, the composite agg uses an array of object for bucket key and values are checked. Relates #26800
Exclude "key" field from random modifications in tests, the composite agg uses an array of object for bucket key and values are checked. Relates #26800
This change adds a module called
aggs-composite
that defines a new aggregation namedcomposite
.The
composite
aggregation is a multi-buckets aggregation that creates composite buckets made of multiple sources.The sources for each bucket can be defined as:
terms
source, values are extracted from a field or a script.date_histogram
source, values are extracted from a date field and rounded to the provided interval.This aggregation can be used to retrieve all buckets of a deeply nested aggregation by flattening the nested aggregation in composite buckets.
A composite buckets is composed of one value per source and is built for each document as the combinations of values in the provided sources.
For instance the following aggregation:
... which retrieves the top N terms for
field1
and for each top term infield1
the top N terms forfield2
, can be replaced by acomposite
aggregation in order to retrieve all the combinations offield1
,field2
in the matching documents:The response of the aggregation looks like this:
By default this aggregation returns 10 buckets sorted in ascending order of the composite key.
Pagination can be achieved by providing
after
values, the values of the composite key to aggregate after.For instance the following aggregation will aggregate all composite keys that sorts after
arizona, calendar
:This aggregation is optimized for indices that set an index sorting that match the composite source definition.
For instance the aggregation above could run faster on indices that defines an index sorting like this:
In this case the
composite
aggregation can early terminate on each segment.This aggregation also accepts multi-valued field but disables early termination for these fields even if index sorting matches the sources definition.
This is mandatory because index sorting picks only one value per document to perform the sort.
For sorted indices, we could jump directly to documents that sort after the provided
after
values in each segment in order to speed up the collection but it can be done in a follow up.This aggregation also accepts any sub aggregations and returns the result inside each composite bucket like any other multi-buckets agg:
Documentation is missing which is why this is still a WIP.