Skip to content

Commit

Permalink
Add composite aggregator (elastic#26800)
Browse files Browse the repository at this point in the history
* This change adds a module called `aggs-composite` that defines a new aggregation named `composite`.
The `composite` aggregation is a multi-buckets aggregation that creates composite buckets made of multiple sources.
The sources for each bucket can be defined as:
  * A `terms` source, values are extracted from a field or a script.
  * A `date_histogram` source, values are extracted from a date field and rounded to the provided interval.
This aggregation can be used to retrieve all buckets of a deeply nested aggregation by flattening the nested aggregation in composite buckets.
A composite buckets is composed of one value per source and is built for each document as the combinations of values in the provided sources.
For instance the following aggregation:

````
"test_agg": {
  "terms": {
    "field": "field1"
  },
  "aggs": {
    "nested_test_agg":
      "terms": {
        "field": "field2"
      }
  }
}
````
... which retrieves the top N terms for `field1` and for each top term in `field1` the top N terms for `field2`, can be replaced by a `composite` aggregation in order to retrieve **all** the combinations of `field1`, `field2` in the matching documents:

````
"composite_agg": {
  "composite": {
    "sources": [
      {
	"field1": {
          "terms": {
              "field": "field1"
            }
        }
      },
      {
	"field2": {
          "terms": {
            "field": "field2"
          }
        }
      },
    }
  }
````

The response of the aggregation looks like this:

````
"aggregations": {
  "composite_agg": {
    "buckets": [
      {
        "key": {
          "field1": "alabama",
          "field2": "almanach"
        },
        "doc_count": 100
      },
      {
        "key": {
          "field1": "alabama",
          "field2": "calendar"
        },
        "doc_count": 1
      },
      {
        "key": {
          "field1": "arizona",
          "field2": "calendar"
        },
        "doc_count": 1
      }
    ]
  }
}
````

By default this aggregation returns 10 buckets sorted in ascending order of the composite key.
Pagination can be achieved by providing `after` values, the values of the composite key to aggregate after.
For instance the following aggregation will aggregate all composite keys that sorts after `arizona, calendar`:

````
"composite_agg": {
  "composite": {
    "after": {"field1": "alabama", "field2": "calendar"},
    "size": 100,
    "sources": [
      {
	"field1": {
          "terms": {
            "field": "field1"
          }
        }
      },
      {
	"field2": {
          "terms": {
            "field": "field2"
          }
	}
      }
    }
  }
````

This aggregation is optimized for indices that set an index sorting that match the composite source definition.
For instance the aggregation above could run faster on indices that defines an index sorting like this:

````
"settings": {
  "index.sort.field": ["field1", "field2"]
}
````

In this case the `composite` aggregation can early terminate on each segment.
This aggregation also accepts multi-valued field but disables early termination for these fields even if index sorting matches the sources definition.
This is mandatory because index sorting picks only one value per document to perform the sort.
  • Loading branch information
jimczi committed Nov 16, 2017
1 parent 406331d commit b0ca48a
Show file tree
Hide file tree
Showing 51 changed files with 5,419 additions and 30 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ subprojects {
"org.elasticsearch.plugin:parent-join-client:${version}": ':modules:parent-join',
"org.elasticsearch.plugin:aggs-matrix-stats-client:${version}": ':modules:aggs-matrix-stats',
"org.elasticsearch.plugin:percolator-client:${version}": ':modules:percolator',
"org.elasticsearch.plugin:aggs-composite-client:${version}": ':modules:aggs-composite',
]
if (indexCompatVersions[-1].snapshot) {
/* The last and second to last versions can be snapshots. Rather than use
Expand Down
3 changes: 2 additions & 1 deletion client/rest-high-level/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ dependencies {
compile "org.elasticsearch.client:elasticsearch-rest-client:${version}"
compile "org.elasticsearch.plugin:parent-join-client:${version}"
compile "org.elasticsearch.plugin:aggs-matrix-stats-client:${version}"

compile "org.elasticsearch.plugin:aggs-composite-client:${version}"

testCompile "org.elasticsearch.client:test:${version}"
testCompile "org.elasticsearch.test:framework:${version}"
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.composite.CompositeAggregationBuilder;
import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregationBuilder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -647,7 +648,7 @@ public void testDefaultNamedXContents() {

public void testProvidedNamedXContents() {
List<NamedXContentRegistry.Entry> namedXContents = RestHighLevelClient.getProvidedNamedXContents();
assertEquals(2, namedXContents.size());
assertEquals(3, namedXContents.size());
Map<Class<?>, Integer> categories = new HashMap<>();
List<String> names = new ArrayList<>();
for (NamedXContentRegistry.Entry namedXContent : namedXContents) {
Expand All @@ -658,9 +659,10 @@ public void testProvidedNamedXContents() {
}
}
assertEquals(1, categories.size());
assertEquals(Integer.valueOf(2), categories.get(Aggregation.class));
assertEquals(Integer.valueOf(3), categories.get(Aggregation.class));
assertTrue(names.contains(ChildrenAggregationBuilder.NAME));
assertTrue(names.contains(MatrixStatsAggregationBuilder.NAME));
assertTrue(names.contains(CompositeAggregationBuilder.NAME));
}

private static class TrackingActionListener implements ActionListener<Integer> {
Expand Down
1 change: 1 addition & 0 deletions client/transport/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies {
compile "org.elasticsearch.plugin:lang-mustache-client:${version}"
compile "org.elasticsearch.plugin:percolator-client:${version}"
compile "org.elasticsearch.plugin:parent-join-client:${version}"
compile "org.elasticsearch.plugin:aggs-composite-client:${version}"
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testCompile "junit:junit:${versions.junit}"
testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.percolator.PercolatorPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.search.aggregations.composite.CompositeAggregationPlugin;
import org.elasticsearch.transport.Netty4Plugin;

import java.util.Arrays;
Expand All @@ -44,6 +45,7 @@
* {@link PercolatorPlugin},
* {@link MustachePlugin},
* {@link ParentJoinPlugin}
* {@link CompositeAggregationPlugin}
* plugins for the client. These plugins are all the required modules for Elasticsearch.
*/
@SuppressWarnings({"unchecked","varargs"})
Expand Down Expand Up @@ -88,7 +90,8 @@ private static void setSystemPropertyIfUnset(final String key, final String valu
ReindexPlugin.class,
PercolatorPlugin.class,
MustachePlugin.class,
ParentJoinPlugin.class));
ParentJoinPlugin.class,
CompositeAggregationPlugin.class));

/**
* Creates a new transport client with pre-installed plugins.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.search.aggregations.composite.CompositeAggregationPlugin;
import org.junit.Test;

import java.util.Arrays;
Expand All @@ -52,7 +53,8 @@ public void testPluginInstalled() {
@Test
public void testInstallPluginTwice() {
for (Class<? extends Plugin> plugin :
Arrays.asList(ParentJoinPlugin.class, ReindexPlugin.class, PercolatorPlugin.class, MustachePlugin.class)) {
Arrays.asList(ParentJoinPlugin.class, ReindexPlugin.class, PercolatorPlugin.class,
MustachePlugin.class, CompositeAggregationPlugin.class)) {
try {
new PreBuiltTransportClient(Settings.EMPTY, plugin);
fail("exception expected");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ static class FieldSortSpec {
SortField.Type.FLOAT
);

static SortField.Type getSortFieldType(SortField sortField) {
public static SortField.Type getSortFieldType(SortField sortField) {
if (sortField instanceof SortedSetSortField) {
return SortField.Type.STRING;
} else if (sortField instanceof SortedNumericSortField) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,12 @@ protected static <B extends ParsedBucket> B parseXContent(final XContentParser p
bucket.setDocCount(parser.longValue());
}
} else if (token == XContentParser.Token.START_OBJECT) {
XContentParserUtils.parseTypedKeysObject(parser, Aggregation.TYPED_KEYS_DELIMITER, Aggregation.class,
if (CommonFields.KEY.getPreferredName().equals(currentFieldName)) {
keyConsumer.accept(parser, bucket);
} else {
XContentParserUtils.parseTypedKeysObject(parser, Aggregation.TYPED_KEYS_DELIMITER, Aggregation.class,
aggregations::add);
}
}
}
bucket.setAggregations(new Aggregations(aggregations));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public String toString() {
}
};

static SortOrder readFromStream(StreamInput in) throws IOException {
public static SortOrder readFromStream(StreamInput in) throws IOException {
return in.readEnum(SortOrder.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivativeTests;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -157,7 +158,7 @@ public void init() throws Exception {
if (aggsTest instanceof InternalMultiBucketAggregationTestCase) {
// Lower down the number of buckets generated by multi bucket aggregation tests in
// order to avoid too many aggregations to be created.
((InternalMultiBucketAggregationTestCase) aggsTest).maxNumberOfBuckets = 3;
((InternalMultiBucketAggregationTestCase) aggsTest).setMaxNumberOfBuckets(3);
}
aggsTest.setUp();
}
Expand Down Expand Up @@ -266,9 +267,13 @@ private static InternalAggregations createTestInstance(final int minNumAggs, fin
if (testCase instanceof InternalMultiBucketAggregationTestCase) {
InternalMultiBucketAggregationTestCase multiBucketAggTestCase = (InternalMultiBucketAggregationTestCase) testCase;
if (currentDepth < maxDepth) {
multiBucketAggTestCase.subAggregationsSupplier = () -> createTestInstance(0, currentDepth + 1, maxDepth);
multiBucketAggTestCase.setSubAggregationsSupplier(
() -> createTestInstance(0, currentDepth + 1, maxDepth)
);
} else {
multiBucketAggTestCase.subAggregationsSupplier = () -> InternalAggregations.EMPTY;
multiBucketAggTestCase.setSubAggregationsSupplier(
() -> InternalAggregations.EMPTY
);
}
} else if (testCase instanceof InternalSingleBucketAggregationTestCase) {
InternalSingleBucketAggregationTestCase singleBucketAggTestCase = (InternalSingleBucketAggregationTestCase) testCase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilters.InternalBucket;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.common.geo.GeoHashUtils;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGrid.Bucket;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
Expand Down Expand Up @@ -109,7 +109,7 @@ protected void assertReduced(InternalGeoHashGrid reduced, List<InternalGeoHashGr
protected Class<? extends ParsedMultiBucketAggregation> implementationClass() {
return ParsedGeoHashGrid.class;
}

@Override
protected InternalGeoHashGrid mutateInstance(InternalGeoHashGrid instance) {
String name = instance.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.joda.time.DateTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.elasticsearch.search.aggregations.bucket.significant;

import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.ChiSquare;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.GND;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
package org.elasticsearch.search.aggregations.bucket.terms;

import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.junit.Before;
import org.elasticsearch.test.InternalAggregationTestCase;

import java.util.HashMap;
import java.util.List;
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/aggregations/bucket.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,5 @@ include::bucket/significanttext-aggregation.asciidoc[]

include::bucket/terms-aggregation.asciidoc[]

include::bucket/composite-aggregation.asciidoc[]

Loading

0 comments on commit b0ca48a

Please sign in to comment.