Skip to content

Commit

Permalink
Reduce merge map memory overhead in the Variable Width Histogram Aggr…
Browse files Browse the repository at this point in the history
…egation (#59366)

When a document which is distant from existing buckets gets collected, the
`variable_width_histogram` will create a new bucket and then insert it
into the ordered list of buckets.

Currently, a new merge map array is created to move this bucket. This is
very expensive as there might be thousands of buckets.

This PR creates `mergeBuckets(UnaryOperator<Long> mergeMap)` methods in
`BucketsAggregator` and `MergingBucketsDefferingCollector`, and updates
the `variable_width_histogram`  to use them. This eliminates the need to
create an entire merge map array for each new bucket and reduces the
memory overhead of the algorithm.
  • Loading branch information
jamesdorfman authored Jul 24, 2020
1 parent db0adfd commit 8b7c556
Show file tree
Hide file tree
Showing 5 changed files with 361 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.LongUnaryOperator;
import java.util.function.ToLongFunction;

public abstract class BucketsAggregator extends AggregatorBase {
Expand Down Expand Up @@ -105,17 +106,35 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do
* ordinals and doc ID deltas.
*
* Refer to that method for documentation about the merge map.
*
* @deprecated use {@link mergeBuckets(long, LongUnaryOperator)}
*/
@Deprecated
public final void mergeBuckets(long[] mergeMap, long newNumBuckets) {
mergeBuckets(newNumBuckets, bucket -> mergeMap[Math.toIntExact(bucket)]);
}

/**
*
* @param mergeMap a unary operator which maps a bucket's ordinal to the ordinal it should be merged with.
* If a bucket's ordinal is mapped to -1 then the bucket is removed entirely.
*
* This only tidies up doc counts. Call {@link MergingBucketsDeferringCollector#mergeBuckets(LongUnaryOperator)} to
* merge the actual ordinals and doc ID deltas.
*/
public final void mergeBuckets(long newNumBuckets, LongUnaryOperator mergeMap){
try (IntArray oldDocCounts = docCounts) {
docCounts = bigArrays.newIntArray(newNumBuckets, true);
docCounts.fill(0, newNumBuckets, 0);
for (int i = 0; i < oldDocCounts.size(); i++) {
for (long i = 0; i < oldDocCounts.size(); i++) {
int docCount = oldDocCounts.get(i);

if(docCount == 0) continue;

// Skip any in the map which have been "removed", signified with -1
if (docCount != 0 && mergeMap[i] != -1) {
docCounts.increment(mergeMap[i], docCount);
long destinationOrdinal = mergeMap.applyAsLong(i);
if (destinationOrdinal != -1) {
docCounts.increment(destinationOrdinal, docCount);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.function.LongUnaryOperator;

/**
* A specialization of {@link BestBucketsDeferringCollector} that collects all
Expand All @@ -51,8 +52,24 @@ public MergingBucketsDeferringCollector(SearchContext context, boolean isGlobal)
*
* This process rebuilds the ordinals and docDeltas according to the mergeMap, so it should
* not be called unless there are actually changes to be made, to avoid unnecessary work.
*
* @deprecated use {@link mergeBuckets(LongUnaryOperator)}
*/
@Deprecated
public void mergeBuckets(long[] mergeMap) {
mergeBuckets(bucket -> mergeMap[Math.toIntExact(bucket)]);
}

/**
* Merges/prunes the existing bucket ordinals and docDeltas according to the provided mergeMap.
*
* @param mergeMap a unary operator which maps a bucket's ordinal to the ordinal it should be merged with.
* If a bucket's ordinal is mapped to -1 then the bucket is removed entirely.
*
* This process rebuilds the ordinals and docDeltas according to the mergeMap, so it should
* not be called unless there are actually changes to be made, to avoid unnecessary work.
*/
public void mergeBuckets(LongUnaryOperator mergeMap){
List<Entry> newEntries = new ArrayList<>(entries.size());
for (Entry sourceEntry : entries) {
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
Expand All @@ -66,7 +83,7 @@ public void mergeBuckets(long[] mergeMap) {
long delta = docDeltasItr.next();

// Only merge in the ordinal if it hasn't been "removed", signified with -1
long ordinal = mergeMap[Math.toIntExact(bucket)];
long ordinal = mergeMap.applyAsLong(bucket);

if (ordinal != -1) {
newBuckets.add(ordinal);
Expand Down Expand Up @@ -102,7 +119,7 @@ public void mergeBuckets(long[] mergeMap) {
long bucket = itr.next();
assert docDeltasItr.hasNext();
long delta = docDeltasItr.next();
long ordinal = mergeMap[Math.toIntExact(bucket)];
long ordinal = mergeMap.applyAsLong(bucket);

// Only merge in the ordinal if it hasn't been "removed", signified with -1
if (ordinal != -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;

public class VariableWidthHistogramAggregator extends DeferableBucketAggregator {

Expand Down Expand Up @@ -353,22 +354,23 @@ private void moveLastCluster(int index){
clusterSizes.set(index, holdSize);

// Move the underlying buckets
long[] mergeMap = new long[numClusters];
for (int i = 0; i < index; i++) {
// The clusters in range {0 ... idx - 1} don't move
mergeMap[i] = i;
}
for (int i = index; i < numClusters - 1; i++) {
// The clusters in range {index ... numClusters - 1} shift up
mergeMap[i] = i + 1;
}
// Finally, the new cluster moves to index
mergeMap[numClusters - 1] = index;
LongUnaryOperator mergeMap = new LongUnaryOperator() {
@Override
public long applyAsLong(long i) {
if(i < index) {
// The clusters in range {0 ... idx - 1} don't move
return i;
}
if(i == numClusters - 1) {
// The new cluster moves to index
return (long)index;
}
// The clusters in range {index ... numClusters - 1} shift forward
return i + 1;
}
};

// TODO: Create a moveLastCluster() method in BucketsAggregator which is like BucketsAggregator::mergeBuckets,
// except it doesn't require a merge map. This would be more efficient as there would be no need to create a
// merge map on every call.
mergeBuckets(mergeMap, numClusters);
mergeBuckets(numClusters, mergeMap);
if (deferringCollector != null) {
deferringCollector.mergeBuckets(mergeMap);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;

import static org.elasticsearch.search.aggregations.MultiBucketConsumerService.DEFAULT_MAX_BUCKETS;

public class BucketsAggregatorTests extends AggregatorTestCase{

public BucketsAggregator buildMergeAggregator() throws IOException{
try(Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Document document = new Document();
document.add(new SortedNumericDocValuesField("numeric", 0));
indexWriter.addDocument(document);
}

try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = new IndexSearcher(indexReader);

SearchContext searchContext = createSearchContext(
indexSearcher,
createIndexSettings(),
null,
new MultiBucketConsumerService.MultiBucketConsumer(
DEFAULT_MAX_BUCKETS,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
),
new NumberFieldMapper.NumberFieldType("test", NumberFieldMapper.NumberType.INTEGER)
);

return new BucketsAggregator("test", AggregatorFactories.EMPTY, searchContext, null, null, null) {
@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
return null;
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return new InternalAggregation[0];
}

@Override
public InternalAggregation buildEmptyAggregation() {
return null;
}
};
}
}
}

public void testBucketMergeNoDelete() throws IOException{
BucketsAggregator mergeAggregator = buildMergeAggregator();

mergeAggregator.grow(10);
for(int i = 0; i < 10; i++){
mergeAggregator.incrementBucketDocCount(i, i);
}

mergeAggregator.mergeBuckets(10, bucket -> bucket % 5);

for(int i=0; i<5; i++) {
// The i'th bucket should now have all docs whose index % 5 = i
// This is buckets i and i + 5
// i + (i+5) = 2*i + 5
assertEquals(mergeAggregator.getDocCounts().get(i), (2 * i) + 5);
}
for(int i=5; i<10; i++){
assertEquals(mergeAggregator.getDocCounts().get(i), 0);
}
}

public void testBucketMergeAndDelete() throws IOException{
BucketsAggregator mergeAggregator = buildMergeAggregator();

mergeAggregator.grow(10);
int sum = 0;
for(int i = 0; i < 20; i++){
mergeAggregator.incrementBucketDocCount(i, i);
if(5 <= i && i < 15) {
sum += i;
}
}

// Put the buckets in indices 5 ... 14 into bucket 5, and delete the rest of the buckets
mergeAggregator.mergeBuckets(10, bucket -> (5 <= bucket && bucket < 15) ? 5 : -1);

assertEquals(mergeAggregator.getDocCounts().size(), 10); // Confirm that the 10 other buckets were deleted
for(int i=0; i<10; i++){
assertEquals(mergeAggregator.getDocCounts().get(i), i == 5 ? sum : 0);
}
}
}
Loading

0 comments on commit 8b7c556

Please sign in to comment.