Skip to content

Commit

Permalink
Remove bucketOrd field from InternalTerms and friends (elastic#118044)
Browse files Browse the repository at this point in the history
The field bucketOrd is only used for building the aggregation but has no use after that.
  • Loading branch information
iverase authored Dec 5, 2024
1 parent fdb1b2b commit 422eb1a
Show file tree
Hide file tree
Showing 14 changed files with 355 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.xcontent.ToXContentObject;

Expand All @@ -20,13 +21,12 @@
import java.util.Comparator;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.ToLongFunction;

/**
* {@link Bucket} ordering strategy. Buckets can be order either as
* "complete" buckets using {@link #comparator()} or against a combination
* of the buckets internals with its ordinal with
* {@link #partiallyBuiltBucketComparator(ToLongFunction, Aggregator)}.
* {@link #partiallyBuiltBucketComparator(Aggregator)}.
*/
public abstract class BucketOrder implements ToXContentObject, Writeable {
/**
Expand Down Expand Up @@ -102,7 +102,7 @@ public final void validate(Aggregator aggregator) throws AggregationExecutionExc
* to validate this order because doing so checks all of the appropriate
* paths.
*/
partiallyBuiltBucketComparator(null, aggregator);
partiallyBuiltBucketComparator(aggregator);
}

/**
Expand All @@ -121,7 +121,7 @@ public final void validate(Aggregator aggregator) throws AggregationExecutionExc
* with it all the time.
* </p>
*/
public abstract <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator);
public abstract <T extends Bucket> Comparator<BucketAndOrd<T>> partiallyBuiltBucketComparator(Aggregator aggregator);

/**
* Build a comparator for fully built buckets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.search.aggregations.Aggregator.BucketComparator;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.sort.SortValue;
Expand All @@ -30,7 +31,6 @@
import java.util.List;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.ToLongFunction;

/**
* Implementations for {@link Bucket} ordering strategies.
Expand Down Expand Up @@ -63,10 +63,10 @@ public AggregationPath path() {
}

@Override
public <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator) {
public <T extends Bucket> Comparator<BucketAndOrd<T>> partiallyBuiltBucketComparator(Aggregator aggregator) {
try {
BucketComparator bucketComparator = path.bucketComparator(aggregator, order);
return (lhs, rhs) -> bucketComparator.compare(ordinalReader.applyAsLong(lhs), ordinalReader.applyAsLong(rhs));
return (lhs, rhs) -> bucketComparator.compare(lhs.ord, rhs.ord);
} catch (IllegalArgumentException e) {
throw new AggregationExecutionException.InvalidPath("Invalid aggregation order path [" + path + "]. " + e.getMessage(), e);
}
Expand Down Expand Up @@ -188,12 +188,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}

@Override
public <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator) {
List<Comparator<T>> comparators = orderElements.stream()
.map(oe -> oe.partiallyBuiltBucketComparator(ordinalReader, aggregator))
.toList();
public <T extends Bucket> Comparator<BucketAndOrd<T>> partiallyBuiltBucketComparator(Aggregator aggregator) {
List<Comparator<BucketAndOrd<T>>> comparators = new ArrayList<>(orderElements.size());
for (BucketOrder order : orderElements) {
comparators.add(order.partiallyBuiltBucketComparator(aggregator));
}
return (lhs, rhs) -> {
for (Comparator<T> c : comparators) {
for (Comparator<BucketAndOrd<T>> c : comparators) {
int result = c.compare(lhs, rhs);
if (result != 0) {
return result;
Expand Down Expand Up @@ -299,9 +300,9 @@ byte id() {
}

@Override
public <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator) {
public <T extends Bucket> Comparator<BucketAndOrd<T>> partiallyBuiltBucketComparator(Aggregator aggregator) {
Comparator<Bucket> comparator = comparator();
return comparator::compare;
return (lhs, rhs) -> comparator.compare(lhs.bucket, rhs.bucket);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasables;
Expand All @@ -26,6 +27,7 @@
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd;
import org.elasticsearch.search.aggregations.bucket.terms.BucketPriorityQueue;
import org.elasticsearch.search.aggregations.bucket.terms.BytesKeyedBucketOrds;
import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;
Expand All @@ -38,7 +40,6 @@
import java.util.Arrays;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import static java.util.Collections.emptyList;
import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;
Expand Down Expand Up @@ -115,51 +116,57 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size());
ObjectArray<StringTerms.Bucket[]> topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size())
) {
for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) {
int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());

// as users can't control sort order, in practice we'll always sort by doc count descending
try (
BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>(
size,
bigArrays(),
partiallyBuiltBucketComparator
)
) {
StringTerms.Bucket spare = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
Supplier<StringTerms.Bucket> emptyBucketBuilder = () -> new StringTerms.Bucket(
new BytesRef(),
0,
null,
false,
0,
format
);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts.increment(ordIdx, docCount);
if (spare == null) {
checkRealMemoryCBForInternalBucket();
spare = emptyBucketBuilder.get();
try (IntArray bucketsToCollect = bigArrays().newIntArray(owningBucketOrds.size())) {
// find how many buckets we are going to collect
long ordsToCollect = 0;
for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) {
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), bucketCountThresholds.getShardSize());
bucketsToCollect.set(ordIdx, size);
ordsToCollect += size;
}
try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) {
long ordsCollected = 0;
for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) {
// as users can't control sort order, in practice we'll always sort by doc count descending
try (
BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>(
bucketsToCollect.get(ordIdx),
bigArrays(),
order.partiallyBuiltBucketComparator(this)
)
) {
BucketAndOrd<StringTerms.Bucket> spare = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts.increment(ordIdx, docCount);
if (spare == null) {
checkRealMemoryCBForInternalBucket();
spare = new BucketAndOrd<>(new StringTerms.Bucket(new BytesRef(), 0, null, false, 0, format));
}
ordsEnum.readValue(spare.bucket.getTermBytes());
spare.bucket.setDocCount(docCount);
spare.ord = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
}
final int orderedSize = (int) ordered.size();
final StringTerms.Bucket[] buckets = new StringTerms.Bucket[orderedSize];
for (int i = orderedSize - 1; i >= 0; --i) {
BucketAndOrd<StringTerms.Bucket> bucketAndOrd = ordered.pop();
buckets[i] = bucketAndOrd.bucket;
ordsArray.set(ordsCollected + i, bucketAndOrd.ord);
otherDocCounts.increment(ordIdx, -bucketAndOrd.bucket.getDocCount());
bucketAndOrd.bucket.setTermBytes(BytesRef.deepCopyOf(bucketAndOrd.bucket.getTermBytes()));
}
topBucketsPerOrd.set(ordIdx, buckets);
ordsCollected += orderedSize;
}
ordsEnum.readValue(spare.getTermBytes());
spare.setDocCount(docCount);
spare.setBucketOrd(ordsEnum.ord());
spare = ordered.insertWithOverflow(spare);
}

topBucketsPerOrd.set(ordIdx, new StringTerms.Bucket[(int) ordered.size()]);
for (int i = (int) ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd.get(ordIdx)[i] = ordered.pop();
otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[i].getDocCount());
topBucketsPerOrd.get(ordIdx)[i].setTermBytes(BytesRef.deepCopyOf(topBucketsPerOrd.get(ordIdx)[i].getTermBytes()));
}
assert ordsCollected == ordsArray.size();
buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, InternalTerms.Bucket::setAggregations);
}
}

buildSubAggsForAllBuckets(topBucketsPerOrd, InternalTerms.Bucket::getBucketOrd, InternalTerms.Bucket::setAggregations);

return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> {
final BucketOrder reduceOrder;
if (isKeyOrder(order) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@

import java.util.Comparator;

public class BucketPriorityQueue<B> extends ObjectArrayPriorityQueue<B> {
public class BucketPriorityQueue<B> extends ObjectArrayPriorityQueue<BucketAndOrd<B>> {

private final Comparator<? super B> comparator;
private final Comparator<BucketAndOrd<B>> comparator;

public BucketPriorityQueue(int size, BigArrays bigArrays, Comparator<? super B> comparator) {
public BucketPriorityQueue(int size, BigArrays bigArrays, Comparator<BucketAndOrd<B>> comparator) {
super(size, bigArrays);
this.comparator = comparator;
}

@Override
protected boolean lessThan(B a, B b) {
protected boolean lessThan(BucketAndOrd<B> a, BucketAndOrd<B> b) {
return comparator.compare(a, b) > 0; // reverse, since we reverse again when adding to a list
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArrayPriorityQueue;

public class BucketSignificancePriorityQueue<B extends SignificantTerms.Bucket> extends ObjectArrayPriorityQueue<B> {
public class BucketSignificancePriorityQueue<B extends SignificantTerms.Bucket> extends ObjectArrayPriorityQueue<BucketAndOrd<B>> {

public BucketSignificancePriorityQueue(int size, BigArrays bigArrays) {
super(size, bigArrays);
}

@Override
protected boolean lessThan(SignificantTerms.Bucket o1, SignificantTerms.Bucket o2) {
return o1.getSignificanceScore() < o2.getSignificanceScore();
protected boolean lessThan(BucketAndOrd<B> o1, BucketAndOrd<B> o2) {
return o1.bucket.getSignificanceScore() < o2.bucket.getSignificanceScore();
}
}
Loading

0 comments on commit 422eb1a

Please sign in to comment.