Skip to content

Commit

Permalink
Optimise terms aggregations for single value fields (elastic#107930)
Browse files Browse the repository at this point in the history
this commit optimise terms aggregations for single value fields.
  • Loading branch information
iverase authored Apr 30, 2024
1 parent a037e33 commit cca98af
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 115 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/107930.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 107930
summary: Optimise terms aggregations for single value fields
area: Aggregations
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.elasticsearch.search.aggregations.bucket.countedterms;

import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.Releasables;
Expand Down Expand Up @@ -63,27 +65,47 @@ class CountedTermsAggregator extends TermsAggregator {

@Override
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, LeafBucketCollector sub) throws IOException {
SortedSetDocValues ords = valuesSource.ordinalsValues(aggCtx.getLeafReaderContext());
final SortedSetDocValues ords = valuesSource.ordinalsValues(aggCtx.getLeafReaderContext());
final SortedDocValues singleton = DocValues.unwrapSingleton(ords);
return singleton != null ? getLeafCollector(singleton, sub) : getLeafCollector(ords, sub);
}

private LeafBucketCollector getLeafCollector(SortedSetDocValues ords, LeafBucketCollector sub) {
return new LeafBucketCollectorBase(sub, ords) {

@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (ords.advanceExact(doc) == false) {
return;
}
for (long ord = ords.nextOrd(); ord != NO_MORE_ORDS; ord = ords.nextOrd()) {
long bucketOrdinal = bucketOrds.add(owningBucketOrd, ords.lookupOrd(ord));
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
} else {
collectBucket(sub, doc, bucketOrdinal);
if (ords.advanceExact(doc)) {
for (long ord = ords.nextOrd(); ord != NO_MORE_ORDS; ord = ords.nextOrd()) {
collectOrdinal(bucketOrds.add(owningBucketOrd, ords.lookupOrd(ord)), doc, sub);
}
}
}
};
}

private LeafBucketCollector getLeafCollector(SortedDocValues ords, LeafBucketCollector sub) {
return new LeafBucketCollectorBase(sub, ords) {

@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (ords.advanceExact(doc)) {
collectOrdinal(bucketOrds.add(owningBucketOrd, ords.lookupOrd(ords.ordValue())), doc, sub);
}

}
};
}

private void collectOrdinal(long bucketOrdinal, int doc, LeafBucketCollector sub) throws IOException {
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
} else {
collectBucket(sub, doc, bucketOrdinal);
}
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
StringTerms.Bucket[][] topBucketsPerOrd = new StringTerms.Bucket[owningBucketOrds.length][];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

package org.elasticsearch.search.aggregations.bucket.prefix;

import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.search.aggregations.AggregationErrors;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
Expand Down Expand Up @@ -97,56 +99,62 @@ public IpPrefixAggregator(

@Override
protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, LeafBucketCollector sub) throws IOException {
return new IpPrefixLeafCollector(sub, config.getValuesSource().bytesValues(aggCtx.getLeafReaderContext()), ipPrefix);
final SortedBinaryDocValues values = config.getValuesSource().bytesValues(aggCtx.getLeafReaderContext());
final BinaryDocValues singleton = FieldData.unwrapSingleton(values);
return singleton != null ? getLeafCollector(singleton, sub) : getLeafCollector(values, sub);
}

private class IpPrefixLeafCollector extends LeafBucketCollectorBase {
private final IpPrefix ipPrefix;
private final LeafBucketCollector sub;
private final SortedBinaryDocValues values;

IpPrefixLeafCollector(final LeafBucketCollector sub, final SortedBinaryDocValues values, final IpPrefix ipPrefix) {
super(sub, values);
this.sub = sub;
this.values = values;
this.ipPrefix = ipPrefix;
}

@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
BytesRef previousSubnet = null;
BytesRef subnet = new BytesRef(new byte[ipPrefix.netmask.length]);
BytesRef ipAddress;
if (values.advanceExact(doc)) {
int valuesCount = values.docValueCount();

for (int i = 0; i < valuesCount; ++i) {
ipAddress = values.nextValue();
maskIpAddress(ipAddress, ipPrefix.netmask, subnet);
if (previousSubnet != null && subnet.bytesEquals(previousSubnet)) {
continue;
private LeafBucketCollector getLeafCollector(SortedBinaryDocValues values, LeafBucketCollector sub) {

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (values.advanceExact(doc)) {
BytesRef previousSubnet = null;
for (int i = 0; i < values.docValueCount(); ++i) {
final BytesRef subnet = new BytesRef(new byte[ipPrefix.netmask.length]);
maskIpAddress(values.nextValue(), ipPrefix.netmask, subnet);
if (previousSubnet != null && subnet.bytesEquals(previousSubnet)) {
continue;
}
addBucketOrd(bucketOrds.add(owningBucketOrd, subnet), doc, sub);
previousSubnet = subnet;
}
long bucketOrd = bucketOrds.add(owningBucketOrd, subnet);
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
collectExistingBucket(sub, doc, bucketOrd);
} else {
collectBucket(sub, doc, bucketOrd);
}
previousSubnet = subnet;
}
}
}
};
}

private static void maskIpAddress(final BytesRef ipAddress, final BytesRef subnetMask, final BytesRef subnet) {
assert ipAddress.length == 16 : "Invalid length for ip address [" + ipAddress.length + "] expected 16 bytes";
// NOTE: IPv4 addresses are encoded as 16-bytes. As a result, we use an
// offset (12) to apply the subnet to the last 4 bytes (byes 12, 13, 14, 15)
// if the subnet mask is just a 4-bytes subnet mask.
int offset = subnetMask.length == 4 ? 12 : 0;
for (int i = 0; i < subnetMask.length; ++i) {
subnet.bytes[i] = (byte) (ipAddress.bytes[i + offset] & subnetMask.bytes[i]);
private LeafBucketCollector getLeafCollector(BinaryDocValues values, LeafBucketCollector sub) {
final BytesRef subnet = new BytesRef(new byte[ipPrefix.netmask.length]);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (values.advanceExact(doc)) {
maskIpAddress(values.binaryValue(), ipPrefix.netmask, subnet);
addBucketOrd(bucketOrds.add(owningBucketOrd, subnet), doc, sub);
}
}
};
}

private void addBucketOrd(long bucketOrd, int doc, LeafBucketCollector sub) throws IOException {
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
collectExistingBucket(sub, doc, bucketOrd);
} else {
collectBucket(sub, doc, bucketOrd);
}
}

private static void maskIpAddress(final BytesRef ipAddress, final BytesRef subnetMask, final BytesRef subnet) {
assert ipAddress.length == 16 : "Invalid length for ip address [" + ipAddress.length + "] expected 16 bytes";
// NOTE: IPv4 addresses are encoded as 16-bytes. As a result, we use an
// offset (12) to apply the subnet to the last 4 bytes (byes 12, 13, 14, 15)
// if the subnet mask is just a 4-bytes subnet mask.
int offset = subnetMask.length == 4 ? 12 : 0;
for (int i = 0; i < subnetMask.length; ++i) {
subnet.bytes[i] = (byte) (ipAddress.bytes[i + offset] & subnetMask.bytes[i]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
*/
package org.elasticsearch.search.aggregations.bucket.terms;

import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.common.util.SetBackedScalingCuckooFilter;
Expand Down Expand Up @@ -66,36 +68,54 @@ protected static SortedNumericDocValues getValues(ValuesSource.Numeric valuesSou

@Override
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, LeafBucketCollector sub) throws IOException {
SortedNumericDocValues values = getValues(valuesSource, aggCtx.getLeafReaderContext());
final SortedNumericDocValues values = getValues(valuesSource, aggCtx.getLeafReaderContext());
final NumericDocValues singleton = DocValues.unwrapSingleton(values);
return singleton != null ? getLeafCollector(singleton, sub) : getLeafCollector(values, sub);
}

private LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) {
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int docId, long owningBucketOrd) throws IOException {
if (false == values.advanceExact(docId)) {
return;
}
int valuesCount = values.docValueCount();
long previous = Long.MAX_VALUE;
for (int i = 0; i < valuesCount; ++i) {
long val = values.nextValue();
if (i == 0 && previous == val) {
continue;
}
previous = val;
if (filter != null && false == filter.accept(val)) {
continue;
}
long bucketOrdinal = bucketOrds.add(owningBucketOrd, val);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, docId, bucketOrdinal);
} else {
collectBucket(sub, docId, bucketOrdinal);
if (values.advanceExact(docId)) {
long previous = Long.MAX_VALUE;
for (int i = 0; i < values.docValueCount(); ++i) {
long val = values.nextValue();
if (i == 0 && previous == val) {
continue;
}
collectValue(val, docId, owningBucketOrd, sub);
previous = val;
}
}
}
};
}

private LeafBucketCollector getLeafCollector(NumericDocValues values, LeafBucketCollector sub) {
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int docId, long owningBucketOrd) throws IOException {
if (values.advanceExact(docId)) {
collectValue(values.longValue(), docId, owningBucketOrd, sub);
}
}
};
}

private void collectValue(long val, int docId, long owningBucketOrd, LeafBucketCollector sub) throws IOException {
if (filter == null || filter.accept(val)) {
long bucketOrdinal = bucketOrds.add(owningBucketOrd, val);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, docId, bucketOrdinal);
} else {
collectBucket(sub, docId, bucketOrdinal);
}
}

}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.elasticsearch.search.aggregations.bucket.terms;

import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.BytesRef;
Expand All @@ -16,6 +17,7 @@
import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
Expand Down Expand Up @@ -204,7 +206,19 @@ public LeafBucketCollector getLeafCollector(
LongConsumer addRequestCircuitBreakerBytes,
CollectConsumer consumer
) throws IOException {
SortedBinaryDocValues values = valuesSourceConfig.getValuesSource().bytesValues(ctx);
final SortedBinaryDocValues values = valuesSourceConfig.getValuesSource().bytesValues(ctx);
final BinaryDocValues singleton = FieldData.unwrapSingleton(values);
return singleton != null
? getLeafCollector(includeExclude, singleton, sub, consumer)
: getLeafCollector(includeExclude, values, sub, consumer);
}

private LeafBucketCollector getLeafCollector(
IncludeExclude.StringFilter includeExclude,
SortedBinaryDocValues values,
LeafBucketCollector sub,
CollectConsumer consumer
) {
return new LeafBucketCollectorBase(sub, values) {
final BytesRefBuilder previous = new BytesRefBuilder();

Expand Down Expand Up @@ -233,6 +247,26 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
};
}

private LeafBucketCollector getLeafCollector(
IncludeExclude.StringFilter includeExclude,
BinaryDocValues values,
LeafBucketCollector sub,
CollectConsumer consumer
) {
return new LeafBucketCollectorBase(sub, values) {

@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (values.advanceExact(doc)) {
BytesRef bytes = values.binaryValue();
if (includeExclude == null || includeExclude.accept(bytes)) {
consumer.accept(sub, doc, owningBucketOrd, bytes);
}
}
}
};
}

@Override
public void close() {}
}
Expand Down
Loading

0 comments on commit cca98af

Please sign in to comment.