Skip to content

Commit

Permalink
Aggs: Add real memory CB call when building internal aggregators in b…
Browse files Browse the repository at this point in the history
…uckets (#116329) (#116393)

Related with #88128

This PR pretends to reduce the potential OOMs received when building internal aggregations.
  • Loading branch information
ivancea authored Nov 7, 2024
1 parent 5b0fdcc commit 22c0eab
Showing 1 changed file with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,7 @@ public final void collectBucket(LeafBucketCollector subCollector, int doc, long
grow(bucketOrd + 1);
int docCount = docCountProvider.getDocCount(doc);
if (docCounts.increment(bucketOrd, docCount) == docCount) {
// We call the circuit breaker the time to time in order to give it a chance to check available
// memory in the parent breaker and break the execution if we are running out. To achieve that we
// are passing 0 as the estimated bytes every 1024 calls
if ((++callCount & 0x3FF) == 0) {
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
}
updateCircuitBreaker("allocated_buckets");
}
subCollector.collect(doc, bucketOrd);
}
Expand Down Expand Up @@ -179,6 +174,7 @@ protected final IntFunction<InternalAggregations> buildSubAggsForBuckets(long[]
prepareSubAggs(bucketOrdsToCollect);
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
for (int i = 0; i < subAggregators.length; i++) {
updateCircuitBreaker("building_sub_aggregation");
aggregations[i] = subAggregators[i].buildAggregations(bucketOrdsToCollect);
}
return subAggsForBucketFunction(aggregations);
Expand Down Expand Up @@ -415,4 +411,15 @@ protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException
// Set LeafReaderContext to the doc_count provider
docCountProvider.setLeafReaderContext(ctx);
}

/**
* This method calls the circuit breaker from time to time in order to give it a chance to check available
* memory in the parent breaker (Which should be a real memory breaker) and break the execution if we are running out.
* To achieve that, we are passing 0 as the estimated bytes every 1024 calls
*/
private void updateCircuitBreaker(String label) {
if ((++callCount & 0x3FF) == 0) {
breaker.addEstimateBytesAndMaybeBreak(0, label);
}
}
}

0 comments on commit 22c0eab

Please sign in to comment.