Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always return the after_key in composite aggregation response #28358

Merged
merged 4 commits into from
Jan 25, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/reference/aggregations/bucket/composite-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,10 @@ GET /_search
...
"aggregations": {
"my_buckets": {
"after_key": { <1>
"date": 1494288000000,
"product": "mad max"
},
"buckets": [
{
"key": {
Expand All @@ -403,7 +407,7 @@ GET /_search
"doc_count": 1
},
{
"key": { <1>
"key": {
"date": 1494288000000,
"product": "mad max"
},
Expand All @@ -420,7 +424,7 @@ GET /_search

The `after` parameter can be used to retrieve the composite buckets that are **after**
the last composite buckets returned in a previous round.
For the example below the last bucket is `"key": [1494288000000, "mad max"]` so the next
For the example below the last bucket can be found in `after_key` and the next
round of result can be retrieved with:

[source,js]
Expand Down Expand Up @@ -485,6 +489,10 @@ GET /_search
...
"aggregations": {
"my_buckets": {
"after_key": {
"date": 1494201600000,
"product": "rocky"
},
"buckets": [
{
"key": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ setup:
---
"Aggregate After Missing":
- skip:
version: " - 6.99.99"
reason: bug fixed in 7.0.0
version: " - 6.1.99"
reason: bug fixed in 6.2.0


- do:
Expand Down Expand Up @@ -295,3 +295,31 @@ setup:
- length: { aggregations.test.buckets: 1 }
- match: { aggregations.test.buckets.0.key.date: "2017-10-21" }
- match: { aggregations.test.buckets.0.doc_count: 1 }

---
"Composite aggregation with after_key in the response":
- skip:
version: " - 6.99.99"
reason: starting in 7.0.0 after_key is returned in the response

- do:
search:
index: test
body:
aggregations:
test:
composite:
sources: [
{
"keyword": {
"terms": {
"field": "keyword",
}
}
}
]

- match: {hits.total: 6}
- length: { aggregations.test.buckets: 2 }
- length: { aggregations.test.after_key: 1 }
- match: { aggregations.test.after_key.field: "foo" }
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ static XContentBuilder bucketToXContent(CompositeAggregation.Bucket bucket,
}

static XContentBuilder toXContentFragment(CompositeAggregation aggregation, XContentBuilder builder, Params params) throws IOException {
if (aggregation.afterKey() != null) {
buildCompositeMap("after_key", aggregation.afterKey(), builder);
}
builder.startArray(CommonFields.BUCKETS.getPreferredName());
for (CompositeAggregation.Bucket bucket : aggregation.getBuckets()) {
bucketToXContent(bucket, builder, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,15 @@ public InternalAggregation buildAggregation(long zeroBucket) throws IOException
int docCount = bucketDocCount(slot);
buckets[pos++] = new InternalComposite.InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs);
}
return new InternalComposite(name, size, sourceNames, formats, Arrays.asList(buckets), reverseMuls,
CompositeKey lastBucket = num > 0 ? buckets[num-1].getRawKey() : null;
return new InternalComposite(name, size, sourceNames, formats, Arrays.asList(buckets), lastBucket, reverseMuls,
pipelineAggregators(), metaData());
}

@Override
public InternalAggregation buildEmptyAggregation() {
final int[] reverseMuls = getReverseMuls();
return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), reverseMuls,
return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), null, reverseMuls,
pipelineAggregators(), metaData());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,38 @@

package org.elasticsearch.search.aggregations.bucket.composite;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;

import java.io.IOException;
import java.util.Arrays;

/**
* A key that is composed of multiple {@link Comparable} values.
*/
class CompositeKey {
class CompositeKey implements Writeable {
private final Comparable<?>[] values;

CompositeKey(Comparable<?>... values) {
this.values = values;
}

CompositeKey(StreamInput in) throws IOException {
values = new Comparable<?>[in.readVInt()];
for (int i = 0; i < values.length; i++) {
values[i] = (Comparable<?>) in.readGenericValue();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(values.length);
for (int i = 0; i < values.length; i++) {
out.writeGenericValue(values[i]);
}
}

Comparable<?>[] values() {
return values;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -50,17 +49,19 @@ public class InternalComposite

private final int size;
private final List<InternalBucket> buckets;
private final CompositeKey afterKey;
private final int[] reverseMuls;
private final List<String> sourceNames;
private final List<DocValueFormat> formats;

InternalComposite(String name, int size, List<String> sourceNames, List<DocValueFormat> formats,
List<InternalBucket> buckets, int[] reverseMuls,
List<InternalBucket> buckets, CompositeKey afterKey, int[] reverseMuls,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
this.sourceNames = sourceNames;
this.formats = formats;
this.buckets = buckets;
this.afterKey = afterKey;
this.size = size;
this.reverseMuls = reverseMuls;
}
Expand All @@ -79,6 +80,11 @@ public InternalComposite(StreamInput in) throws IOException {
}
this.reverseMuls = in.readIntArray();
this.buckets = in.readList((input) -> new InternalBucket(input, sourceNames, formats, reverseMuls));
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
this.afterKey = in.readBoolean() ? new CompositeKey(in) : null;
} else {
this.afterKey = buckets.size() > 0 ? buckets.get(buckets.size()-1).key : null;
}
}

@Override
Expand All @@ -92,6 +98,12 @@ protected void doWriteTo(StreamOutput out) throws IOException {
}
out.writeIntArray(reverseMuls);
out.writeList(buckets);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeBoolean(afterKey != null);
if (afterKey != null) {
afterKey.writeTo(out);
}
}
}

@Override
Expand All @@ -105,8 +117,14 @@ public String getWriteableName() {
}

@Override
public InternalComposite create(List<InternalBucket> buckets) {
return new InternalComposite(name, size, sourceNames, formats, buckets, reverseMuls, pipelineAggregators(), getMetaData());
public InternalComposite create(List<InternalBucket> newBuckets) {
/**
* This is used by pipeline aggregations to filter/remove buckets so we
* keep the <code>afterKey</code> of the original aggregation in order
* to be able to retrieve the next page even if all buckets have been filtered.
*/
return new InternalComposite(name, size, sourceNames, formats, newBuckets, afterKey,
reverseMuls, pipelineAggregators(), getMetaData());
}

@Override
Expand All @@ -126,7 +144,10 @@ public List<InternalBucket> getBuckets() {

@Override
public Map<String, Object> afterKey() {
return buckets.size() > 0 ? buckets.get(buckets.size()-1).getKey() : null;
if (afterKey != null) {
return new ArrayMap(sourceNames, formats, afterKey.values());
}
return null;
}

// Visible for tests
Expand Down Expand Up @@ -169,20 +190,22 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
reduceContext.consumeBucketsAndMaybeBreak(1);
result.add(reduceBucket);
}
return new InternalComposite(name, size, sourceNames, formats, result, reverseMuls, pipelineAggregators(), metaData);
final CompositeKey lastKey = result.size() > 0 ? result.get(result.size()-1).getRawKey() : null;
return new InternalComposite(name, size, sourceNames, formats, result, lastKey, reverseMuls, pipelineAggregators(), metaData);
}

@Override
protected boolean doEquals(Object obj) {
InternalComposite that = (InternalComposite) obj;
return Objects.equals(size, that.size) &&
Objects.equals(buckets, that.buckets) &&
Objects.equals(afterKey, that.afterKey) &&
Arrays.equals(reverseMuls, that.reverseMuls);
}

@Override
protected int doHashCode() {
return Objects.hash(size, buckets, Arrays.hashCode(reverseMuls));
return Objects.hash(size, buckets, afterKey, Arrays.hashCode(reverseMuls));
}

private static class BucketIterator implements Comparable<BucketIterator> {
Expand Down Expand Up @@ -226,11 +249,7 @@ static class InternalBucket extends InternalMultiBucketAggregation.InternalBucke

@SuppressWarnings("unchecked")
InternalBucket(StreamInput in, List<String> sourceNames, List<DocValueFormat> formats, int[] reverseMuls) throws IOException {
final Comparable<?>[] values = new Comparable<?>[in.readVInt()];
for (int i = 0; i < values.length; i++) {
values[i] = (Comparable<?>) in.readGenericValue();
}
this.key = new CompositeKey(values);
this.key = new CompositeKey(in);
this.docCount = in.readVLong();
this.aggregations = InternalAggregations.readAggregations(in);
this.reverseMuls = reverseMuls;
Expand All @@ -240,10 +259,7 @@ static class InternalBucket extends InternalMultiBucketAggregation.InternalBucke

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(key.size());
for (int i = 0; i < key.size(); i++) {
out.writeGenericValue(key.get(i));
}
key.writeTo(out);
out.writeVLong(docCount);
aggregations.writeTo(out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.search.aggregations.bucket.composite;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -33,15 +34,26 @@ public class ParsedComposite extends ParsedMultiBucketAggregation<ParsedComposit
new ObjectParser<>(ParsedComposite.class.getSimpleName(), true, ParsedComposite::new);

static {
PARSER.declareField(ParsedComposite::setAfterKey, (p, c) -> p.mapOrdered(), new ParseField("after_key"),
ObjectParser.ValueType.OBJECT);
declareMultiBucketAggregationFields(PARSER,
parser -> ParsedComposite.ParsedBucket.fromXContent(parser),
parser -> null
);
}

private Map<String, Object> afterKey;

public static ParsedComposite fromXContent(XContentParser parser, String name) throws IOException {
ParsedComposite aggregation = PARSER.parse(parser, null);
aggregation.setName(name);
if (aggregation.afterKey == null && aggregation.getBuckets().size() > 0) {
/**
* Previous versions (< 6.3) don't send <code>afterKey</code>
* in the response so we set it as the last returned buckets.
*/
aggregation.setAfterKey(aggregation.getBuckets().get(aggregation.getBuckets().size()-1).key);
}
return aggregation;
}

Expand All @@ -57,9 +69,16 @@ public List<ParsedBucket> getBuckets() {

@Override
public Map<String, Object> afterKey() {
if (afterKey != null) {
return afterKey;
}
return buckets.size() > 0 ? buckets.get(buckets.size()-1).getKey() : null;
}

private void setAfterKey(Map<String, Object> afterKey) {
this.afterKey = afterKey;
}

@Override
protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
return CompositeAggregation.toXContentFragment(this, builder, params);
Expand Down
Loading