Skip to content

Commit

Permalink
ESQL: Track blocks (elastic#100025)
Browse files Browse the repository at this point in the history
This tracks blocks from topn and a few other places. We're going to try
and track blocks all the places.
  • Loading branch information
nik9000 authored Sep 28, 2023
1 parent 246e832 commit 0e21930
Show file tree
Hide file tree
Showing 129 changed files with 1,592 additions and 486 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleBlock;
Expand Down Expand Up @@ -107,6 +108,7 @@ private static Operator operator(String data, int topCount) {
ClusterSettings.createBuiltInClusterSettings()
);
return new TopNOperator(
BlockFactory.getNonBreakingInstance(),
breakerService.getBreaker(CircuitBreaker.REQUEST),
topCount,
elementTypes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private static void close(boolean success, Releasable... releasables) {
* // the resources will be released when reaching here
* </pre>
*/
public static Releasable wrap(final Iterable<Releasable> releasables) {
public static Releasable wrap(final Iterable<? extends Releasable> releasables) {
return new Releasable() {
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,14 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public long ramBytesUsed() {
return BASE_RAM_BYTES_USED + startOffsets.ramBytesUsed() + bytes.ramBytesUsed();
return BASE_RAM_BYTES_USED + bigArraysRamBytesUsed();
}

/**
* Memory used by the {@link BigArrays} portion of this {@link BytesRefArray}.
*/
public long bigArraysRamBytesUsed() {
return startOffsets.ramBytesUsed() + bytes.ramBytesUsed();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,9 @@ public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws Circu
while (true) {
long old = used.get();
long total = old + bytes;
if (total < 0) {
throw new AssertionError("total must be >= 0 but was [" + total + "]");
}
if (total > max.getBytes()) {
throw new CircuitBreakingException(ERROR_MESSAGE, bytes, max.getBytes(), Durability.TRANSIENT);
}
Expand All @@ -689,7 +692,10 @@ public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws Circu

@Override
public void addWithoutBreaking(long bytes) {
used.addAndGet(bytes);
long total = used.addAndGet(bytes);
if (total < 0) {
throw new AssertionError("total must be >= 0 but was [" + total + "]");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.atomic.AtomicLong;

/**
* {@link CircuitBreakerService} that fails one twentieth of the time when you
* add bytes. This is useful to make sure code responds sensibly to circuit
Expand All @@ -27,31 +29,32 @@ public class CrankyCircuitBreakerService extends CircuitBreakerService {
public static final String ERROR_MESSAGE = "cranky breaker";

private final CircuitBreaker breaker = new CircuitBreaker() {
@Override
public void circuitBreak(String fieldName, long bytesNeeded) {
private final AtomicLong used = new AtomicLong();

}
@Override
public void circuitBreak(String fieldName, long bytesNeeded) {}

@Override
public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
if (ESTestCase.random().nextInt(20) == 0) {
throw new CircuitBreakingException(ERROR_MESSAGE, Durability.PERMANENT);
}
used.addAndGet(bytes);
}

@Override
public void addWithoutBreaking(long bytes) {

used.addAndGet(bytes);
}

@Override
public long getUsed() {
return 0;
return used.get();
}

@Override
public long getLimit() {
return 0;
return Long.MAX_VALUE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public BooleanBlock expand() {
public static long ramBytesEstimated(boolean[] values, int[] firstValueIndexes, BitSet nullsMask) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values) + BlockRamUsageEstimator.sizeOf(firstValueIndexes)
+ BlockRamUsageEstimator.sizeOfBitSet(nullsMask) + RamUsageEstimator.shallowSizeOfInstance(MvOrdering.class);
// TODO mvordering is shared
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;

import java.util.Arrays;

/**
Expand All @@ -20,7 +22,7 @@ final class BooleanBlockBuilder extends AbstractBlockBuilder implements BooleanB
BooleanBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
super(blockFactory);
int initialSize = Math.max(estimatedSize, 2);
adjustBreaker(initialSize);
adjustBreaker(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + initialSize * elementSize());
values = new boolean[initialSize];
}

Expand Down Expand Up @@ -192,8 +194,16 @@ public BooleanBlock build() {
block = new BooleanArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false);
built();
return block;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ protected void growValuesArray(int newSize) {

@Override
public BooleanVector build() {
finish();
BooleanVector vector;
if (valueCount == 1) {
vector = new ConstantBooleanVector(values[0], 1, blockFactory);
Expand All @@ -58,8 +59,16 @@ public BooleanVector build() {
}
vector = new BooleanArrayVector(values, valueCount, blockFactory);
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false);
built();
return vector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,12 @@ public BooleanVector build() {
}
return new BooleanArrayVector(values, values.length, blockFactory);
}

@Override
public void close() {
if (nextIndex >= 0) {
// If nextIndex < 0 we've already built the vector
blockFactory.adjustBreaker(-ramBytesUsed(values.length), false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public BytesRefBlock expand() {
public static long ramBytesEstimated(BytesRefArray values, int[] firstValueIndexes, BitSet nullsMask) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values) + BlockRamUsageEstimator.sizeOf(firstValueIndexes)
+ BlockRamUsageEstimator.sizeOfBitSet(nullsMask) + RamUsageEstimator.shallowSizeOfInstance(MvOrdering.class);
// TODO mvordering is shared
}

@Override
Expand Down Expand Up @@ -115,7 +116,7 @@ public void close() {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
blockFactory.adjustBreaker(-(ramBytesUsed() - values.ramBytesUsed()), true);
blockFactory.adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public String toString() {

@Override
public void close() {
blockFactory.adjustBreaker(-BASE_RAM_BYTES_USED, true);
blockFactory.adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,19 +193,42 @@ public BytesRefBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) {
public BytesRefBlock build() {
finish();
BytesRefBlock block;
assert estimatedBytes == 0 || firstValueIndexes != null;
if (hasNonNullValue && positionCount == 1 && valueCount == 1) {
block = new ConstantBytesRefVector(BytesRef.deepCopyOf(values.get(0, new BytesRef())), 1, blockFactory).asBlock();
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false);
Releasables.closeExpectNoException(values);
} else {
estimatedBytes += values.ramBytesUsed();
if (isDense() && singleValued()) {
block = new BytesRefArrayVector(values, positionCount, blockFactory).asBlock();
} else {
block = new BytesRefArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes - values.bigArraysRamBytesUsed(), false);
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true);
values = null;
built();
return block;
}

@Override
public void extraClose() {
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,40 @@ protected void growValuesArray(int newSize) {

@Override
public BytesRefVector build() {
finish();
BytesRefVector vector;
assert estimatedBytes == 0;
if (valueCount == 1) {
vector = new ConstantBytesRefVector(BytesRef.deepCopyOf(values.get(0, new BytesRef())), 1, blockFactory);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed(), false);
Releasables.closeExpectNoException(values);
} else {
estimatedBytes = values.ramBytesUsed();
vector = new BytesRefArrayVector(values, valueCount, blockFactory);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed() - values.bigArraysRamBytesUsed(), false);
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true);
values = null;
built();
return vector;
}

@Override
public void extraClose() {
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public DoubleBlock expand() {
public static long ramBytesEstimated(double[] values, int[] firstValueIndexes, BitSet nullsMask) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values) + BlockRamUsageEstimator.sizeOf(firstValueIndexes)
+ BlockRamUsageEstimator.sizeOfBitSet(nullsMask) + RamUsageEstimator.shallowSizeOfInstance(MvOrdering.class);
// TODO mvordering is shared
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;

import java.util.Arrays;

/**
Expand All @@ -20,7 +22,7 @@ final class DoubleBlockBuilder extends AbstractBlockBuilder implements DoubleBlo
DoubleBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
super(blockFactory);
int initialSize = Math.max(estimatedSize, 2);
adjustBreaker(initialSize);
adjustBreaker(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + initialSize * elementSize());
values = new double[initialSize];
}

Expand Down Expand Up @@ -192,8 +194,16 @@ public DoubleBlock build() {
block = new DoubleArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false);
built();
return block;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ protected void growValuesArray(int newSize) {

@Override
public DoubleVector build() {
finish();
DoubleVector vector;
if (valueCount == 1) {
vector = new ConstantDoubleVector(values[0], 1, blockFactory);
Expand All @@ -58,8 +59,16 @@ public DoubleVector build() {
}
vector = new DoubleArrayVector(values, valueCount, blockFactory);
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false);
built();
return vector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,12 @@ public DoubleVector build() {
}
return new DoubleArrayVector(values, values.length, blockFactory);
}

@Override
public void close() {
if (nextIndex >= 0) {
// If nextIndex < 0 we've already built the vector
blockFactory.adjustBreaker(-ramBytesUsed(values.length), false);
}
}
}
Loading

0 comments on commit 0e21930

Please sign in to comment.