Skip to content

Commit

Permalink
Fix several potential circuit breaker leaks in Aggregators (elastic#7…
Browse files Browse the repository at this point in the history
…9676) (elastic#80929)

This commit adds a new CircuitBreaker implementation in the test that throws CircuitBreaker
Exceptions randomly. This new circuit breaker helps uncover several places where we might
leak if the circuit breaker throws such exception.
  • Loading branch information
iverase authored Nov 23, 2021
1 parent 4fda6e9 commit 71c9d9b
Show file tree
Hide file tree
Showing 18 changed files with 391 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,11 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do
* If a bucket's ordinal is mapped to -1 then the bucket is removed entirely.
*/
public final void rewriteBuckets(long newNumBuckets, LongUnaryOperator mergeMap) {
try (LongArray oldDocCounts = docCounts) {
LongArray oldDocCounts = docCounts;
boolean success = false;
try {
docCounts = bigArrays().newLongArray(newNumBuckets, true);
success = true;
docCounts.fill(0, newNumBuckets, 0);
for (long i = 0; i < oldDocCounts.size(); i++) {
long docCount = oldDocCounts.get(i);
Expand All @@ -113,6 +116,10 @@ public final void rewriteBuckets(long newNumBuckets, LongUnaryOperator mergeMap)
docCounts.increment(destinationOrdinal, docCount);
}
}
} finally {
if (success) {
oldDocCounts.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,15 @@ class BinaryValuesSource extends SingleDimensionValuesSource<BytesRef> {
this.breakerConsumer = breakerConsumer;
this.docValuesFunc = docValuesFunc;
this.values = bigArrays.newObjectArray(Math.min(size, 100));
this.valueBuilders = bigArrays.newObjectArray(Math.min(size, 100));
boolean success = false;
try {
this.valueBuilders = bigArrays.newObjectArray(Math.min(size, 100));
success = true;
} finally {
if (success == false) {
close();
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ protected void doClose() {
try {
Releasables.close(queue);
} finally {
Releasables.close(sources);
if (sources != null) {
Releasables.close(sources);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,15 @@ class DoubleValuesSource extends SingleDimensionValuesSource<Double> {
super(bigArrays, format, fieldType, missingBucket, missingOrder, size, reverseMul);
this.docValuesFunc = docValuesFunc;
this.bits = this.missingBucket ? new BitArray(100, bigArrays) : null;
this.values = bigArrays.newDoubleArray(Math.min(size, 100), false);
boolean success = false;
try {
this.values = bigArrays.newDoubleArray(Math.min(size, 100), false);
success = true;
} finally {
if (success == false) {
close();
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,15 @@ class LongValuesSource extends SingleDimensionValuesSource<Long> {
this.docValuesFunc = docValuesFunc;
this.rounding = rounding;
this.bits = missingBucket ? new BitArray(Math.min(size, 100), bigArrays) : null;
this.values = bigArrays.newLongArray(Math.min(size, 100), false);
boolean success = false;
try {
this.values = bigArrays.newLongArray(Math.min(size, 100), false);
success = true;
} finally {
if (success == false) {
close();
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,13 @@ private void increaseRoundingIfNeeded(long rounded) {
return;
}
do {
try (LongKeyedBucketOrds oldOrds = bucketOrds) {
LongKeyedBucketOrds oldOrds = bucketOrds;
boolean success = false;
try {
preparedRounding = prepareRounding(++roundingIdx);
long[] mergeMap = new long[Math.toIntExact(oldOrds.size())];
bucketOrds = new LongKeyedBucketOrds.FromSingle(bigArrays());
success = true; // now it is safe to close oldOrds after we finish
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(0);
while (ordsEnum.next()) {
long oldKey = ordsEnum.value();
Expand All @@ -288,6 +291,10 @@ private void increaseRoundingIfNeeded(long rounded) {
mergeMap[(int) ordsEnum.ord()] = newBucketOrd >= 0 ? newBucketOrd : -1 - newBucketOrd;
}
merge(mergeMap, bucketOrds.size());
} finally {
if (success) {
oldOrds.close();
}
}
} while (roundingIdx < roundingInfos.length - 1
&& (bucketOrds.size() > targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval()
Expand Down Expand Up @@ -527,9 +534,12 @@ private int increaseRoundingIfNeeded(long owningBucketOrd, int oldEstimatedBucke

private void rebucket() {
rebucketCount++;
try (LongKeyedBucketOrds oldOrds = bucketOrds) {
LongKeyedBucketOrds oldOrds = bucketOrds;
boolean success = false;
try {
long[] mergeMap = new long[Math.toIntExact(oldOrds.size())];
bucketOrds = new LongKeyedBucketOrds.FromMany(bigArrays());
success = true;
for (long owningBucketOrd = 0; owningBucketOrd <= oldOrds.maxOwningBucketOrd(); owningBucketOrd++) {
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(owningBucketOrd);
Rounding.Prepared preparedRounding = preparedRoundings[roundingIndexFor(owningBucketOrd)];
Expand All @@ -543,6 +553,10 @@ private void rebucket() {
liveBucketCountUnderestimate.set(owningBucketOrd, Math.toIntExact(bucketOrds.bucketsInOrd(owningBucketOrd)));
}
merge(mergeMap, bucketOrds.size());
} finally {
if (success) {
oldOrds.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,16 @@ private class MergeBucketsPhase extends CollectionPhase {

MergeBucketsPhase(DoubleArray buffer, int bufferSize) {
// Cluster the documents to reduce the number of buckets
bucketBufferedDocs(buffer, bufferSize, mergePhaseInitialBucketCount(shardSize));
boolean success = false;
try {
bucketBufferedDocs(buffer, bufferSize, mergePhaseInitialBucketCount(shardSize));
success = true;
} finally {
if (success == false) {
close();
clusterMaxes = clusterMins = clusterCentroids = clusterSizes = null;
}
}

if (bufferSize > 1) {
updateAvgBucketDistance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,15 @@ private static class FromMany extends BytesKeyedBucketOrds {

private FromMany(BigArrays bigArrays) {
bytesToLong = new BytesRefHash(1, bigArrays);
longToBucketOrds = LongKeyedBucketOrds.build(bigArrays, CardinalityUpperBound.MANY);
boolean success = false;
try {
longToBucketOrds = LongKeyedBucketOrds.build(bigArrays, CardinalityUpperBound.MANY);
success = true;
} finally {
if (success == false) {
close();
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ class SignificantTermsResults extends ResultStrategy<
private final long supersetSize;
private final SignificanceHeuristic significanceHeuristic;

private LongArray subsetSizes = bigArrays().newLongArray(1, true);
private LongArray subsetSizes;

SignificantTermsResults(
SignificanceLookup significanceLookup,
Expand All @@ -822,6 +822,15 @@ class SignificantTermsResults extends ResultStrategy<
backgroundFrequencies = significanceLookup.bytesLookup(bigArrays(), cardinality);
supersetSize = significanceLookup.supersetSize();
this.significanceHeuristic = significanceHeuristic;
boolean success = false;
try {
subsetSizes = bigArrays().newLongArray(1, true);
success = true;
} finally {
if (success == false) {
close();
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@ public MapStringTermsAggregator(
Map<String, Object> metadata
) throws IOException {
super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata);
this.collectorSource = collectorSource;
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
this.includeExclude = includeExclude;
bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), cardinality);
// set last because if there is an error during construction the collector gets release outside the constructor.
this.collectorSource = collectorSource;
}

@Override
Expand Down Expand Up @@ -478,7 +479,7 @@ class SignificantTermsResults extends ResultStrategy<SignificantStringTerms, Sig
private final long supersetSize;
private final SignificanceHeuristic significanceHeuristic;

private LongArray subsetSizes = bigArrays().newLongArray(1, true);
private LongArray subsetSizes;

SignificantTermsResults(
SignificanceLookup significanceLookup,
Expand All @@ -488,6 +489,15 @@ class SignificantTermsResults extends ResultStrategy<SignificantStringTerms, Sig
backgroundFrequencies = significanceLookup.bytesLookup(bigArrays(), cardinality);
supersetSize = significanceLookup.supersetSize();
this.significanceHeuristic = significanceHeuristic;
boolean success = false;
try {
subsetSizes = bigArrays().newLongArray(1, true);
success = true;
} finally {
if (success == false) {
close();
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,15 @@ class SignificantLongTermsResults extends ResultStrategy<SignificantLongTerms, S
backgroundFrequencies = significanceLookup.longLookup(bigArrays(), cardinality);
supersetSize = significanceLookup.supersetSize();
this.significanceHeuristic = significanceHeuristic;
subsetSizes = bigArrays().newLongArray(1, true);
boolean success = false;
try {
subsetSizes = bigArrays().newLongArray(1, true);
success = true;
} finally {
if (success == false) {
close();
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,27 +97,37 @@ public long freq(BytesRef term) throws IOException {
public void close() {}
};
}
return new BackgroundFrequencyForBytes() {
private final BytesRefHash termToPosition = new BytesRefHash(1, bigArrays);
private LongArray positionToFreq = bigArrays.newLongArray(1, false);

@Override
public long freq(BytesRef term) throws IOException {
long position = termToPosition.add(term);
if (position < 0) {
return positionToFreq.get(-1 - position);
final BytesRefHash termToPosition = new BytesRefHash(1, bigArrays);
boolean success = false;
try {
BackgroundFrequencyForBytes b = new BackgroundFrequencyForBytes() {
private LongArray positionToFreq = bigArrays.newLongArray(1, false);

@Override
public long freq(BytesRef term) throws IOException {
long position = termToPosition.add(term);
if (position < 0) {
return positionToFreq.get(-1 - position);
}
long freq = getBackgroundFrequency(term);
positionToFreq = bigArrays.grow(positionToFreq, position + 1);
positionToFreq.set(position, freq);
return freq;
}
long freq = getBackgroundFrequency(term);
positionToFreq = bigArrays.grow(positionToFreq, position + 1);
positionToFreq.set(position, freq);
return freq;
}

@Override
public void close() {
Releasables.close(termToPosition, positionToFreq);
@Override
public void close() {
Releasables.close(termToPosition, positionToFreq);
}
};
success = true;
return b;
} finally {
if (success == false) {
termToPosition.close();
}
};
}

}

/**
Expand All @@ -142,27 +152,37 @@ public long freq(long term) throws IOException {
public void close() {}
};
}
return new BackgroundFrequencyForLong() {
private final LongHash termToPosition = new LongHash(1, bigArrays);
private LongArray positionToFreq = bigArrays.newLongArray(1, false);

@Override
public long freq(long term) throws IOException {
long position = termToPosition.add(term);
if (position < 0) {
return positionToFreq.get(-1 - position);
final LongHash termToPosition = new LongHash(1, bigArrays);
boolean success = false;
try {
BackgroundFrequencyForLong b = new BackgroundFrequencyForLong() {

private LongArray positionToFreq = bigArrays.newLongArray(1, false);

@Override
public long freq(long term) throws IOException {
long position = termToPosition.add(term);
if (position < 0) {
return positionToFreq.get(-1 - position);
}
long freq = getBackgroundFrequency(term);
positionToFreq = bigArrays.grow(positionToFreq, position + 1);
positionToFreq.set(position, freq);
return freq;
}
long freq = getBackgroundFrequency(term);
positionToFreq = bigArrays.grow(positionToFreq, position + 1);
positionToFreq.set(position, freq);
return freq;
}

@Override
public void close() {
Releasables.close(termToPosition, positionToFreq);
@Override
public void close() {
Releasables.close(termToPosition, positionToFreq);
}
};
success = true;
return b;
} finally {
if (success == false) {
termToPosition.close();
}
};
}
}

/**
Expand Down
Loading

0 comments on commit 71c9d9b

Please sign in to comment.