Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing_bucket option in the composite agg #29465

Merged
merged 8 commits into from
May 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions docs/reference/aggregations/bucket/composite-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,34 @@ GET /_search
\... will sort the composite bucket in descending order when comparing values from the `date_histogram` source
and in ascending order when comparing values from the `terms` source.

====== Missing bucket

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


By default documents without a value for a given source are ignored.
It is possible to include them in the response by setting `missing_bucket` to
`true` (defaults to `false`):

[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{ "product_name": { "terms" : { "field": "product", "missing_bucket": true } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE

In the example above the source `product_name` will emit an explicit `null` value
for documents without a value for the field `product`.
The `order` specified in the source dictates whether the `null` values should rank
first (ascending order, `asc`) or last (descending order, `desc`).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think somewhere in the docs we need to say that the missing option is deprecated and will be removed in favour of this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I plan to add the deprecation in the docs during the backport to 6x since the deprecation is not for master. After the backport to 6x I'll remove the missing option and add a note in the breaking change.

==== Size

The `size` parameter can be set to define how many composite buckets should be returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,3 +323,32 @@ setup:
- length: { aggregations.test.buckets: 2 }
- length: { aggregations.test.after_key: 1 }
- match: { aggregations.test.after_key.keyword: "foo" }

---
"Composite aggregation and array size":
- skip:
version: " - 6.99.99"
reason: starting in 7.0 the composite sources do not allocate arrays eagerly.

- do:
search:
index: test
body:
aggregations:
test:
composite:
size: 1000000000
sources: [
{
"keyword": {
"terms": {
"field": "keyword",
}
}
}
]

- match: {hits.total: 6}
- length: { aggregations.test.buckets: 2 }
- length: { aggregations.test.after_key: 1 }
- match: { aggregations.test.after_key.keyword: "foo" }
Original file line number Diff line number Diff line change
Expand Up @@ -24,49 +24,93 @@
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.StringFieldType;
import org.elasticsearch.index.mapper.TextFieldMapper;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.LeafBucketCollector;

import java.io.IOException;
import java.util.function.LongConsumer;

/**
* A {@link SingleDimensionValuesSource} for binary source ({@link BytesRef}).
*/
class BinaryValuesSource extends SingleDimensionValuesSource<BytesRef> {
private final LongConsumer breakerConsumer;
private final CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc;
private final BytesRef[] values;
private ObjectArray<BytesRef> values;
private ObjectArray<BytesRefBuilder> valueBuilders;
private BytesRef currentValue;

BinaryValuesSource(MappedFieldType fieldType, CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc,
DocValueFormat format, Object missing, int size, int reverseMul) {
super(format, fieldType, missing, size, reverseMul);
BinaryValuesSource(BigArrays bigArrays, LongConsumer breakerConsumer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like breakerConsumer isn't used here, but is used in DoubleValuesSource. Was that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact this is the only place where the breaker consumer is needed (I removed it from the other values source). It is used to take the BytesRef in the ObjectArray into account in the circuit breaker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

MappedFieldType fieldType, CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc,
DocValueFormat format, boolean missingBucket, Object missing, int size, int reverseMul) {
super(bigArrays, format, fieldType, missingBucket, missing, size, reverseMul);
this.breakerConsumer = breakerConsumer;
this.docValuesFunc = docValuesFunc;
this.values = new BytesRef[size];
this.values = bigArrays.newObjectArray(Math.min(size, 100));
this.valueBuilders = bigArrays.newObjectArray(Math.min(size, 100));
}

@Override
public void copyCurrent(int slot) {
values[slot] = BytesRef.deepCopyOf(currentValue);
void copyCurrent(int slot) {
values = bigArrays.grow(values, slot+1);
valueBuilders = bigArrays.grow(valueBuilders, slot+1);
BytesRefBuilder builder = valueBuilders.get(slot);
int byteSize = builder == null ? 0 : builder.bytes().length;
if (builder == null) {
builder = new BytesRefBuilder();
valueBuilders.set(slot, builder);
}
if (missingBucket && currentValue == null) {
values.set(slot, null);
} else {
assert currentValue != null;
builder.copyBytes(currentValue);
breakerConsumer.accept(builder.bytes().length - byteSize);
values.set(slot, builder.get());
}
}

@Override
public int compare(int from, int to) {
return compareValues(values[from], values[to]);
int compare(int from, int to) {
if (missingBucket) {
if (values.get(from) == null) {
return values.get(to) == null ? 0 : -1 * reverseMul;
} else if (values.get(to) == null) {
return reverseMul;
}
}
return compareValues(values.get(from), values.get(to));
}

@Override
int compareCurrent(int slot) {
return compareValues(currentValue, values[slot]);
if (missingBucket) {
if (currentValue == null) {
return values.get(slot) == null ? 0 : -1 * reverseMul;
} else if (values.get(slot) == null) {
return reverseMul;
}
}
return compareValues(currentValue, values.get(slot));
}

@Override
int compareCurrentWithAfter() {
if (missingBucket) {
if (currentValue == null) {
return afterValue == null ? 0 : -1 * reverseMul;
} else if (afterValue == null) {
return reverseMul;
}
}
return compareValues(currentValue, afterValue);
}

Expand All @@ -76,7 +120,9 @@ int compareValues(BytesRef v1, BytesRef v2) {

@Override
void setAfter(Comparable<?> value) {
if (value.getClass() == String.class) {
if (missingBucket && value == null) {
afterValue = null;
} else if (value.getClass() == String.class) {
afterValue = format.parseBytesRef(value.toString());
} else {
throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName());
Expand All @@ -85,7 +131,7 @@ void setAfter(Comparable<?> value) {

@Override
BytesRef toComparable(int slot) {
return values[slot];
return values.get(slot);
}

@Override
Expand All @@ -100,6 +146,9 @@ public void collect(int doc, long bucket) throws IOException {
currentValue = dvs.nextValue();
next.collect(doc, bucket);
}
} else if (missingBucket) {
currentValue = null;
next.collect(doc, bucket);
}
}
};
Expand Down Expand Up @@ -130,5 +179,7 @@ SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query quer
}

@Override
public void close() {}
public void close() {
Releasables.close(values, valueBuilders);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket.composite;

import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;

/**
* A bit array that is implemented using a growing {@link LongArray}
* created from {@link BigArrays}.
* The underlying long array grows lazily based on the biggest index
* that needs to be set.
*/
final class BitArray implements Releasable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if RoaringDocIdSet could be reused here instead of a custom bit array class? I'm thinking it'd provide better compression in the case when missing keys are sparse, and similar compression when missing keys are dense?

Although it seems to require that IDs are added in monotonically increasing order, and I'm not sure if composite follows that pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used to mark slots with missing values and not doc ids so the size remains small (capped by the requested size of the composite agg) and values are mutable (we reuse slots if a new competitive composite bucket is found and the queue is full) so we need a fixed bit set.
It also uses BigArrays to create the underlying LongArray so the memory it uses is accounted in the circuit breaker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

private final BigArrays bigArrays;
private LongArray bits;

BitArray(BigArrays bigArrays, int initialSize) {
this.bigArrays = bigArrays;
this.bits = bigArrays.newLongArray(initialSize, true);
}

public void set(int index) {
fill(index, true);
}

public void clear(int index) {
fill(index, false);
}

public boolean get(int index) {
int wordNum = index >> 6;
long bitmask = 1L << index;
return (bits.get(wordNum) & bitmask) != 0;
}

private void fill(int index, boolean bit) {
int wordNum = index >> 6;
bits = bigArrays.grow(bits,wordNum+1);
long bitmask = 1L << index;
long value = bit ? bits.get(wordNum) | bitmask : bits.get(wordNum) & ~bitmask;
bits.set(wordNum, value);
}

@Override
public void close() {
Releasables.close(bits);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.search.aggregations.bucket.composite;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;

Expand Down Expand Up @@ -66,11 +65,7 @@ static XContentBuilder toXContentFragment(CompositeAggregation aggregation, XCon
static void buildCompositeMap(String fieldName, Map<String, Object> composite, XContentBuilder builder) throws IOException {
builder.startObject(fieldName);
for (Map.Entry<String, Object> entry : composite.entrySet()) {
if (entry.getValue().getClass() == BytesRef.class) {
builder.field(entry.getKey(), ((BytesRef) entry.getValue()).utf8ToString());
} else {
builder.field(entry.getKey(), entry.getValue());
}
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ protected AggregatorFactory<?> doBuild(SearchContext context, AggregatorFactory<
throw new IllegalArgumentException("Missing value for [after." + sources.get(i).name() + "]");
}
Object obj = after.get(sourceName);
if (obj instanceof Comparable) {
if (configs[i].missingBucket() && obj == null) {
values[i] = null;
} else if (obj instanceof Comparable) {
values[i] = (Comparable<?>) obj;
} else {
throw new IllegalArgumentException("Invalid value for [after." + sources.get(i).name() +
Expand Down
Loading