From 15735e06b3efec899723d213761c48134956d1aa Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 26 Sep 2023 15:42:28 -0400 Subject: [PATCH] ESQL: Handle allocation errors inside topn This properly handles allocation errors inside of topn by making `Block.Builder` and `Vector.Builder` `Releasable`. The "new way" to deal with block factories is like this: ``` try (var b = IntBlock.builder(3, blockFactory) { b.append(1); b.append(2); b.append(3); return b.build(); } ``` If anything goes wrong the block factory's `close` method will be called by the `try` block and all of the circuit breaking that it reserves will be released. For this all to work well `Block.Builder`s have to be one-shot. In other words, you can only call `.build` on them one time. That shifts the accounting from the builder into the block. It is an error to call `build` twice. --- .../org/elasticsearch/core/Releasables.java | 2 +- .../indices/CrankyCircuitBreakerService.java | 15 ++- .../compute/data/BooleanBlockBuilder.java | 16 ++- .../compute/data/BooleanVectorBuilder.java | 13 +- .../compute/data/BytesRefBlockBuilder.java | 17 ++- .../compute/data/BytesRefVectorBuilder.java | 18 ++- .../compute/data/DoubleBlockBuilder.java | 16 ++- .../compute/data/DoubleVectorBuilder.java | 13 +- .../compute/data/IntBlockBuilder.java | 16 ++- .../compute/data/IntVectorBuilder.java | 13 +- .../compute/data/LongBlockBuilder.java | 16 ++- .../compute/data/LongVectorBuilder.java | 13 +- .../topn/ResultBuilderForBoolean.java | 5 + .../topn/ResultBuilderForBytesRef.java | 5 + .../operator/topn/ResultBuilderForDouble.java | 5 + .../operator/topn/ResultBuilderForInt.java | 5 + .../operator/topn/ResultBuilderForLong.java | 5 + .../compute/data/AbstractBlockBuilder.java | 34 ++++++ .../compute/data/AbstractVectorBuilder.java | 41 ++++++- .../org/elasticsearch/compute/data/Block.java | 6 +- .../compute/data/ConstantNullBlock.java | 14 +++ .../elasticsearch/compute/data/DocBlock.java | 19 +-- .../compute/data/ElementType.java | 27 +++-- .../elasticsearch/compute/data/Vector.java | 6 +- .../compute/data/X-BlockBuilder.java.st | 23 +++- .../compute/data/X-VectorBuilder.java.st | 20 ++- .../compute/operator/Driver.java | 8 +- .../compute/operator/topn/ResultBuilder.java | 3 +- .../operator/topn/ResultBuilderForDoc.java | 6 + .../operator/topn/ResultBuilderForNull.java | 5 + .../compute/operator/topn/TopNOperator.java | 52 +++++--- .../operator/topn/X-ResultBuilder.java.st | 5 + .../compute/data/BlockBuilderTests.java | 81 ++++++++----- .../compute/data/BlockFactoryTests.java | 4 + .../data/BytesRefBlockEqualityTests.java | 37 ++++-- .../compute/data/DocVectorTests.java | 57 +++++---- .../data/DoubleBlockEqualityTests.java | 37 ++++-- .../compute/data/IntBlockEqualityTests.java | 37 ++++-- .../compute/data/LongBlockEqualityTests.java | 39 ++++-- .../compute/data/TestBlockBuilder.java | 25 ++++ .../compute/data/VectorBuilderTests.java | 114 ++++++++++++++++++ .../operator/CannedSourceOperator.java | 6 +- .../compute/operator/OperatorTestCase.java | 47 +++++--- .../operator/topn/TopNOperatorTests.java | 98 ++++++++++++--- 44 files changed, 835 insertions(+), 209 deletions(-) create mode 100644 x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java diff --git a/libs/core/src/main/java/org/elasticsearch/core/Releasables.java b/libs/core/src/main/java/org/elasticsearch/core/Releasables.java index b8d1a9a542779..c2b48c4706573 100644 --- a/libs/core/src/main/java/org/elasticsearch/core/Releasables.java +++ b/libs/core/src/main/java/org/elasticsearch/core/Releasables.java @@ -89,7 +89,7 @@ private static void close(boolean success, Releasable... releasables) { * // the resources will be released when reaching here * */ - public static Releasable wrap(final Iterable releasables) { + public static Releasable wrap(final Iterable releasables) { return new Releasable() { @Override public void close() { diff --git a/test/framework/src/main/java/org/elasticsearch/indices/CrankyCircuitBreakerService.java b/test/framework/src/main/java/org/elasticsearch/indices/CrankyCircuitBreakerService.java index 15ffa52569d00..bd5f974a5f800 100644 --- a/test/framework/src/main/java/org/elasticsearch/indices/CrankyCircuitBreakerService.java +++ b/test/framework/src/main/java/org/elasticsearch/indices/CrankyCircuitBreakerService.java @@ -15,6 +15,8 @@ import org.elasticsearch.indices.breaker.CircuitBreakerStats; import org.elasticsearch.test.ESTestCase; +import java.util.concurrent.atomic.AtomicLong; + /** * {@link CircuitBreakerService} that fails one twentieth of the time when you * add bytes. This is useful to make sure code responds sensibly to circuit @@ -27,31 +29,32 @@ public class CrankyCircuitBreakerService extends CircuitBreakerService { public static final String ERROR_MESSAGE = "cranky breaker"; private final CircuitBreaker breaker = new CircuitBreaker() { - @Override - public void circuitBreak(String fieldName, long bytesNeeded) { + private final AtomicLong used = new AtomicLong(); - } + @Override + public void circuitBreak(String fieldName, long bytesNeeded) {} @Override public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { if (ESTestCase.random().nextInt(20) == 0) { throw new CircuitBreakingException(ERROR_MESSAGE, Durability.PERMANENT); } + used.addAndGet(bytes); } @Override public void addWithoutBreaking(long bytes) { - + used.addAndGet(bytes); } @Override public long getUsed() { - return 0; + return used.get(); } @Override public long getLimit() { - return 0; + return Long.MAX_VALUE; } @Override diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlockBuilder.java index a7d397fcfb98e..98b9fdb948bc0 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlockBuilder.java @@ -7,6 +7,8 @@ package org.elasticsearch.compute.data; +import org.apache.lucene.util.RamUsageEstimator; + import java.util.Arrays; /** @@ -20,7 +22,7 @@ final class BooleanBlockBuilder extends AbstractBlockBuilder implements BooleanB BooleanBlockBuilder(int estimatedSize, BlockFactory blockFactory) { super(blockFactory); int initialSize = Math.max(estimatedSize, 2); - adjustBreaker(initialSize); + adjustBreaker(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + initialSize * elementSize()); values = new boolean[initialSize]; } @@ -192,8 +194,16 @@ public BooleanBlock build() { block = new BooleanArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory); } } - // update the breaker with the actual bytes used. - blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true); + /* + * Update the breaker with the actual bytes used. + * We pass false below even though we've used the bytes. That's weird, + * but if we break here we will throw away the used memory, letting + * it be deallocated. The exception will bubble up and the builder will + * still technically be open, meaning the calling code should close it + * which will return all used memory to the breaker. + */ + blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false); + built(); return block; } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVectorBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVectorBuilder.java index d9926227e1c60..a9a6a7b38ba9d 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVectorBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVectorBuilder.java @@ -49,6 +49,7 @@ protected void growValuesArray(int newSize) { @Override public BooleanVector build() { + finish(); BooleanVector vector; if (valueCount == 1) { vector = new ConstantBooleanVector(values[0], 1, blockFactory); @@ -58,8 +59,16 @@ public BooleanVector build() { } vector = new BooleanArrayVector(values, valueCount, blockFactory); } - // update the breaker with the actual bytes used. - blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true); + /* + * Update the breaker with the actual bytes used. + * We pass false below even though we've used the bytes. That's weird, + * but if we break here we will throw away the used memory, letting + * it be deallocated. The exception will bubble up and the builder will + * still technically be open, meaning the calling code should close it + * which will return all used memory to the breaker. + */ + blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false); + built(); return vector; } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlockBuilder.java index 23c18d2a9ca6e..d957f2ca5781f 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlockBuilder.java @@ -204,8 +204,21 @@ public BytesRefBlock build() { block = new BytesRefArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory); } } - // update the breaker with the actual bytes used. - blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true); + /* + * Update the breaker with the actual bytes used. + * We pass false below even though we've used the bytes. That's weird, + * but if we break here we will throw away the used memory, letting + * it be deallocated. The exception will bubble up and the builder will + * still technically be open, meaning the calling code should close it + * which will return all used memory to the breaker. + */ + blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false); + built(); return block; } + + @Override + public void extraClose() { + Releasables.closeExpectNoException(values); + } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVectorBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVectorBuilder.java index be753771ac961..b572d98989939 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVectorBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVectorBuilder.java @@ -54,6 +54,7 @@ protected void growValuesArray(int newSize) { @Override public BytesRefVector build() { + finish(); BytesRefVector vector; if (valueCount == 1) { vector = new ConstantBytesRefVector(BytesRef.deepCopyOf(values.get(0, new BytesRef())), 1, blockFactory); @@ -62,8 +63,21 @@ public BytesRefVector build() { estimatedBytes = values.ramBytesUsed(); vector = new BytesRefArrayVector(values, valueCount, blockFactory); } - // update the breaker with the actual bytes used. - blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true); + /* + * Update the breaker with the actual bytes used. + * We pass false below even though we've used the bytes. That's weird, + * but if we break here we will throw away the used memory, letting + * it be deallocated. The exception will bubble up and the builder will + * still technically be open, meaning the calling code should close it + * which will return all used memory to the breaker. + */ + blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false); + built(); return vector; } + + @Override + public void extraClose() { + Releasables.closeExpectNoException(values); + } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlockBuilder.java index a97f58f3924b1..dca8fe2d0d2e6 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlockBuilder.java @@ -7,6 +7,8 @@ package org.elasticsearch.compute.data; +import org.apache.lucene.util.RamUsageEstimator; + import java.util.Arrays; /** @@ -20,7 +22,7 @@ final class DoubleBlockBuilder extends AbstractBlockBuilder implements DoubleBlo DoubleBlockBuilder(int estimatedSize, BlockFactory blockFactory) { super(blockFactory); int initialSize = Math.max(estimatedSize, 2); - adjustBreaker(initialSize); + adjustBreaker(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + initialSize * elementSize()); values = new double[initialSize]; } @@ -192,8 +194,16 @@ public DoubleBlock build() { block = new DoubleArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory); } } - // update the breaker with the actual bytes used. - blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true); + /* + * Update the breaker with the actual bytes used. + * We pass false below even though we've used the bytes. That's weird, + * but if we break here we will throw away the used memory, letting + * it be deallocated. The exception will bubble up and the builder will + * still technically be open, meaning the calling code should close it + * which will return all used memory to the breaker. + */ + blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false); + built(); return block; } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVectorBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVectorBuilder.java index 8112c5458280f..347bbf8552e76 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVectorBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVectorBuilder.java @@ -49,6 +49,7 @@ protected void growValuesArray(int newSize) { @Override public DoubleVector build() { + finish(); DoubleVector vector; if (valueCount == 1) { vector = new ConstantDoubleVector(values[0], 1, blockFactory); @@ -58,8 +59,16 @@ public DoubleVector build() { } vector = new DoubleArrayVector(values, valueCount, blockFactory); } - // update the breaker with the actual bytes used. - blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true); + /* + * Update the breaker with the actual bytes used. + * We pass false below even though we've used the bytes. That's weird, + * but if we break here we will throw away the used memory, letting + * it be deallocated. The exception will bubble up and the builder will + * still technically be open, meaning the calling code should close it + * which will return all used memory to the breaker. + */ + blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false); + built(); return vector; } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlockBuilder.java index 53d379d715c9b..ba96f85e73197 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlockBuilder.java @@ -7,6 +7,8 @@ package org.elasticsearch.compute.data; +import org.apache.lucene.util.RamUsageEstimator; + import java.util.Arrays; /** @@ -20,7 +22,7 @@ final class IntBlockBuilder extends AbstractBlockBuilder implements IntBlock.Bui IntBlockBuilder(int estimatedSize, BlockFactory blockFactory) { super(blockFactory); int initialSize = Math.max(estimatedSize, 2); - adjustBreaker(initialSize); + adjustBreaker(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + initialSize * elementSize()); values = new int[initialSize]; } @@ -192,8 +194,16 @@ public IntBlock build() { block = new IntArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory); } } - // update the breaker with the actual bytes used. - blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true); + /* + * Update the breaker with the actual bytes used. + * We pass false below even though we've used the bytes. That's weird, + * but if we break here we will throw away the used memory, letting + * it be deallocated. The exception will bubble up and the builder will + * still technically be open, meaning the calling code should close it + * which will return all used memory to the breaker. + */ + blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false); + built(); return block; } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVectorBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVectorBuilder.java index 8bf4a4a96c5cb..acfb9c2cdd621 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVectorBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVectorBuilder.java @@ -49,6 +49,7 @@ protected void growValuesArray(int newSize) { @Override public IntVector build() { + finish(); IntVector vector; if (valueCount == 1) { vector = new ConstantIntVector(values[0], 1, blockFactory); @@ -58,8 +59,16 @@ public IntVector build() { } vector = new IntArrayVector(values, valueCount, blockFactory); } - // update the breaker with the actual bytes used. - blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true); + /* + * Update the breaker with the actual bytes used. + * We pass false below even though we've used the bytes. That's weird, + * but if we break here we will throw away the used memory, letting + * it be deallocated. The exception will bubble up and the builder will + * still technically be open, meaning the calling code should close it + * which will return all used memory to the breaker. + */ + blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false); + built(); return vector; } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlockBuilder.java index a378b382ce31e..09d858e7c9b03 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlockBuilder.java @@ -7,6 +7,8 @@ package org.elasticsearch.compute.data; +import org.apache.lucene.util.RamUsageEstimator; + import java.util.Arrays; /** @@ -20,7 +22,7 @@ final class LongBlockBuilder extends AbstractBlockBuilder implements LongBlock.B LongBlockBuilder(int estimatedSize, BlockFactory blockFactory) { super(blockFactory); int initialSize = Math.max(estimatedSize, 2); - adjustBreaker(initialSize); + adjustBreaker(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + initialSize * elementSize()); values = new long[initialSize]; } @@ -192,8 +194,16 @@ public LongBlock build() { block = new LongArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory); } } - // update the breaker with the actual bytes used. - blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true); + /* + * Update the breaker with the actual bytes used. + * We pass false below even though we've used the bytes. That's weird, + * but if we break here we will throw away the used memory, letting + * it be deallocated. The exception will bubble up and the builder will + * still technically be open, meaning the calling code should close it + * which will return all used memory to the breaker. + */ + blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false); + built(); return block; } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVectorBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVectorBuilder.java index 10daed94a966e..0277fdec62060 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVectorBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVectorBuilder.java @@ -49,6 +49,7 @@ protected void growValuesArray(int newSize) { @Override public LongVector build() { + finish(); LongVector vector; if (valueCount == 1) { vector = new ConstantLongVector(values[0], 1, blockFactory); @@ -58,8 +59,16 @@ public LongVector build() { } vector = new LongArrayVector(values, valueCount, blockFactory); } - // update the breaker with the actual bytes used. - blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true); + /* + * Update the breaker with the actual bytes used. + * We pass false below even though we've used the bytes. That's weird, + * but if we break here we will throw away the used memory, letting + * it be deallocated. The exception will bubble up and the builder will + * still technically be open, meaning the calling code should close it + * which will return all used memory to the breaker. + */ + blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false); + built(); return vector; } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForBoolean.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForBoolean.java index d10c43b517f3b..3d568adc2b5ea 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForBoolean.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForBoolean.java @@ -64,4 +64,9 @@ public BooleanBlock build() { public String toString() { return "ResultBuilderForBoolean[inKey=" + inKey + "]"; } + + @Override + public void close() { + builder.close(); + } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForBytesRef.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForBytesRef.java index f19c2396845b7..e37f82f3363a9 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForBytesRef.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForBytesRef.java @@ -68,4 +68,9 @@ public BytesRefBlock build() { public String toString() { return "ResultBuilderForBytesRef[inKey=" + inKey + "]"; } + + @Override + public void close() { + builder.close(); + } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForDouble.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForDouble.java index 12b32bc7c8a05..77c976c6e0085 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForDouble.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForDouble.java @@ -64,4 +64,9 @@ public DoubleBlock build() { public String toString() { return "ResultBuilderForDouble[inKey=" + inKey + "]"; } + + @Override + public void close() { + builder.close(); + } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForInt.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForInt.java index 22cac7a13314a..389ed3bc2e3c3 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForInt.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForInt.java @@ -64,4 +64,9 @@ public IntBlock build() { public String toString() { return "ResultBuilderForInt[inKey=" + inKey + "]"; } + + @Override + public void close() { + builder.close(); + } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForLong.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForLong.java index 9acca56e1d8cd..63ee9d35c59e5 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForLong.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/topn/ResultBuilderForLong.java @@ -64,4 +64,9 @@ public LongBlock build() { public String toString() { return "ResultBuilderForLong[inKey=" + inKey + "]"; } + + @Override + public void close() { + builder.close(); + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractBlockBuilder.java index a6ad5d1299543..3d06eba398513 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractBlockBuilder.java @@ -33,6 +33,8 @@ abstract class AbstractBlockBuilder implements Block.Builder { /** The number of bytes currently estimated with the breaker. */ protected long estimatedBytes; + private boolean closed = false; + protected AbstractBlockBuilder(BlockFactory blockFactory) { this.blockFactory = blockFactory; } @@ -101,7 +103,14 @@ protected final void updatePosition() { } } + /** + * Called during implementations of {@link Block.Builder#build} as a first step + * to check if the block is still open and to finish the last position. + */ protected final void finish() { + if (closed) { + throw new IllegalStateException("already closed"); + } if (positionEntryIsOpen) { endPositionEntry(); } @@ -110,6 +119,16 @@ protected final void finish() { } } + /** + * Called during implementations of {@link Block.Builder#build} as a last step + * to mark the Builder as closed and make sure that further closes don't double + * free memory. + */ + protected final void built() { + closed = true; + estimatedBytes = 0; + } + protected abstract void growValuesArray(int newSize); /** The number of bytes used to represent each value element. */ @@ -125,6 +144,20 @@ protected final void ensureCapacity() { growValuesArray(newSize); } + @Override + public final void close() { + if (closed == false) { + closed = true; + adjustBreaker(-estimatedBytes); + extraClose(); + } + } + + /** + * Called when first {@link #close() closed}. + */ + protected void extraClose() {} + static int calculateNewArraySize(int currentSize) { // trivially, grows array by 50% return currentSize + (currentSize >> 1); @@ -133,6 +166,7 @@ static int calculateNewArraySize(int currentSize) { protected void adjustBreaker(long deltaBytes) { blockFactory.adjustBreaker(deltaBytes, false); estimatedBytes += deltaBytes; + assert estimatedBytes >= 0; } private void setFirstValue(int position, int value) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractVectorBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractVectorBuilder.java index 49ce276074735..274e88cd8d8b6 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractVectorBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractVectorBuilder.java @@ -7,9 +7,14 @@ package org.elasticsearch.compute.data; -abstract class AbstractVectorBuilder { +abstract class AbstractVectorBuilder implements Vector.Builder { protected int valueCount; + /** + * Has this builder been closed already? + */ + private boolean closed = false; + protected final BlockFactory blockFactory; /** The number of bytes currently estimated with the breaker. */ @@ -46,4 +51,38 @@ protected void adjustBreaker(long deltaBytes) { blockFactory.adjustBreaker(deltaBytes, false); estimatedBytes += deltaBytes; } + + /** + * Called during implementations of {@link Block.Builder#build} as a first step + * to check if the block is still open and to finish the last position. + */ + protected final void finish() { + if (closed) { + throw new IllegalStateException("already closed"); + } + } + + /** + * Called during implementations of {@link Block.Builder#build} as a last step + * to mark the Builder as closed and make sure that further closes don't double + * free memory. + */ + protected final void built() { + closed = true; + estimatedBytes = 0; + } + + @Override + public final void close() { + if (closed == false) { + closed = true; + adjustBreaker(-estimatedBytes); + extraClose(); + } + } + + /** + * Called when first {@link #close() closed}. + */ + protected void extraClose() {} } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java index 1982c937f2a17..bf1631997d935 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java @@ -130,7 +130,11 @@ static Block constantNullBlock(int positions, BlockFactory blockFactory) { return blockFactory.newConstantNullBlock(positions); } - interface Builder { + /** + * Builds {@link Block}s. Typically, you use one of it's direct supinterfaces like {@link IntBlock.Builder}. + * This is {@link Releasable} and should be released after building the block or if building the block fails. + */ + interface Builder extends Releasable { /** * Appends a null value to the block. diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java index 7ad60d89ed72d..1fd3e13ea2d6c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java @@ -132,6 +132,11 @@ public void close() { static class Builder implements Block.Builder { private int positionCount; + /** + * Has this builder been closed already? + */ + private boolean closed = false; + @Override public Builder appendNull() { positionCount++; @@ -170,7 +175,16 @@ public Block.Builder mvOrdering(MvOrdering mvOrdering) { @Override public Block build() { + if (closed) { + throw new IllegalStateException("already closed"); + } + close(); return new ConstantNullBlock(positionCount); } + + @Override + public void close() { + closed = true; + } } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocBlock.java index 433591be5b2bb..88eba0c38a4d8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocBlock.java @@ -78,8 +78,8 @@ public void close() { /** * A builder the for {@link DocBlock}. */ - public static Builder newBlockBuilder(int estimatedSize) { - return new Builder(estimatedSize); + public static Builder newBlockBuilder(int estimatedSize, BlockFactory blockFactory) { + return new Builder(estimatedSize, blockFactory); } public static class Builder implements Block.Builder { @@ -87,10 +87,10 @@ public static class Builder implements Block.Builder { private final IntVector.Builder segments; private final IntVector.Builder docs; - private Builder(int estimatedSize) { - shards = IntVector.newVectorBuilder(estimatedSize); - segments = IntVector.newVectorBuilder(estimatedSize); - docs = IntVector.newVectorBuilder(estimatedSize); + private Builder(int estimatedSize, BlockFactory blockFactory) { + shards = IntVector.newVectorBuilder(estimatedSize, blockFactory); + segments = IntVector.newVectorBuilder(estimatedSize, blockFactory); + docs = IntVector.newVectorBuilder(estimatedSize, blockFactory); } public Builder appendShard(int shard) { @@ -146,8 +146,13 @@ public Block.Builder mvOrdering(MvOrdering mvOrdering) { @Override public DocBlock build() { - // Pass null for singleSegmentNonDecreasing so we calculate it when we first need it. + // Pass null for singleSegmentNonDecreasing so we 1calculate it when we first need it. return new DocVector(shards.build(), segments.build(), docs.build(), null).asBlock(); } + + @Override + public void close() { + Releasables.closeExpectNoException(shards, segments, docs); + } } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java index dc55489b5bcd5..4467766a9e0ef 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java @@ -9,8 +9,6 @@ import org.apache.lucene.util.BytesRef; -import java.util.function.IntFunction; - /** * The type of elements in {@link Block} and {@link Vector} */ @@ -22,7 +20,7 @@ public enum ElementType { /** * Blocks containing only null values. */ - NULL(estimatedSize -> new ConstantNullBlock.Builder()), + NULL((estimatedSize, blockFactory) -> new ConstantNullBlock.Builder()), BYTES_REF(BytesRefBlock::newBlockBuilder), @@ -34,19 +32,32 @@ public enum ElementType { /** * Intermediate blocks which don't support retrieving elements. */ - UNKNOWN(estimatedSize -> { throw new UnsupportedOperationException("can't build null blocks"); }); + UNKNOWN((estimatedSize, blockFactory) -> { throw new UnsupportedOperationException("can't build null blocks"); }); + + interface BuilderSupplier { + Block.Builder newBlockBuilder(int estimatedSize, BlockFactory blockFactory); + } - private final IntFunction builder; + private final BuilderSupplier builder; - ElementType(IntFunction builder) { + ElementType(BuilderSupplier builder) { this.builder = builder; } + /** + * Create a new {@link Block.Builder} for blocks of this type. + * @deprecated use {@link #newBlockBuilder(int, BlockFactory)} + */ + @Deprecated + public Block.Builder newBlockBuilder(int estimatedSize) { + return builder.newBlockBuilder(estimatedSize, BlockFactory.getNonBreakingInstance()); + } + /** * Create a new {@link Block.Builder} for blocks of this type. */ - public Block.Builder newBlockBuilder(int estimatedSize) { // TODO add BlockFactory - return builder.apply(estimatedSize); + public Block.Builder newBlockBuilder(int estimatedSize, BlockFactory blockFactory) { + return builder.newBlockBuilder(estimatedSize, blockFactory); } public static ElementType fromJava(Class type) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Vector.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Vector.java index 171bdbd62f4d0..c9ecf1aa9e399 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Vector.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Vector.java @@ -50,7 +50,11 @@ public interface Vector extends Accountable, Releasable { /** The block factory associated with this vector. */ BlockFactory blockFactory(); - interface Builder { + /** + * Builds {@link Vector}s. Typically, you use one of it's direct supinterfaces like {@link IntVector.Builder}. + * This is {@link Releasable} and should be released after building the vector or if building the vector fails. + */ + interface Builder extends Releasable { /** * Builds the block. This method can be called multiple times. */ diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-BlockBuilder.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-BlockBuilder.java.st index 4d43f25577cc5..635f90bec4801 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-BlockBuilder.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-BlockBuilder.java.st @@ -14,6 +14,8 @@ import org.elasticsearch.common.util.BytesRefArray; import org.elasticsearch.core.Releasables; $else$ +import org.apache.lucene.util.RamUsageEstimator; + import java.util.Arrays; $endif$ @@ -41,7 +43,7 @@ $else$ $Type$BlockBuilder(int estimatedSize, BlockFactory blockFactory) { super(blockFactory); int initialSize = Math.max(estimatedSize, 2); - adjustBreaker(initialSize); + adjustBreaker(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + initialSize * elementSize()); values = new $type$[initialSize]; } $endif$ @@ -265,8 +267,23 @@ $endif$ block = new $Type$ArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory); } } - // update the breaker with the actual bytes used. - blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true); + /* + * Update the breaker with the actual bytes used. + * We pass false below even though we've used the bytes. That's weird, + * but if we break here we will throw away the used memory, letting + * it be deallocated. The exception will bubble up and the builder will + * still technically be open, meaning the calling code should close it + * which will return all used memory to the breaker. + */ + blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false); + built(); return block; } +$if(BytesRef)$ + + @Override + public void extraClose() { + Releasables.closeExpectNoException(values); + } +$endif$ } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-VectorBuilder.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-VectorBuilder.java.st index 09e95e16c303d..ab191c3720a11 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-VectorBuilder.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-VectorBuilder.java.st @@ -83,6 +83,7 @@ $endif$ @Override public $Type$Vector build() { + finish(); $Type$Vector vector; if (valueCount == 1) { $if(BytesRef)$ @@ -99,8 +100,23 @@ $else$ $endif$ vector = new $Type$ArrayVector(values, valueCount, blockFactory); } - // update the breaker with the actual bytes used. - blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true); + /* + * Update the breaker with the actual bytes used. + * We pass false below even though we've used the bytes. That's weird, + * but if we break here we will throw away the used memory, letting + * it be deallocated. The exception will bubble up and the builder will + * still technically be open, meaning the calling code should close it + * which will return all used memory to the breaker. + */ + blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false); + built(); return vector; } +$if(BytesRef)$ + + @Override + public void extraClose() { + Releasables.closeExpectNoException(values); + } +$endif$ } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index 281693a487255..1a1604406892c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -181,7 +181,13 @@ private SubscribableListener runSingleLoopIteration() { if (op.isFinished() == false && nextOp.needsInput()) { Page page = op.getOutput(); - if (page != null && page.getPositionCount() != 0) { + if (page == null) { + // No result, just move to the next iteration + } else if (page.getPositionCount() == 0) { + // Empty result, release any memory it holds immediately and move to the next iteration + page.releaseBlocks(); + } else { + // Non-empty result from the previous operation, move it to the next operation nextOp.addInput(page); movedPage = true; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java index 467d76d14923a..bd2027cade78f 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java @@ -11,11 +11,12 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.core.Releasable; /** * Builds {@link Block}s from keys and values encoded into {@link BytesRef}s. */ -interface ResultBuilder { +interface ResultBuilder extends Releasable { /** * Called for each sort key before {@link #decodeValue} to consume the sort key and * store the value of the key for {@link #decodeValue} can use it to reconstruct diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForDoc.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForDoc.java index f7ebdf86f0fba..7fb507ffdbead 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForDoc.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForDoc.java @@ -20,6 +20,7 @@ class ResultBuilderForDoc implements ResultBuilder { private int position; ResultBuilderForDoc(BlockFactory blockFactory, int positions) { + // TODO use fixed length builders this.blockFactory = blockFactory; this.shards = new int[positions]; this.segments = new int[positions]; @@ -53,4 +54,9 @@ public Block build() { public String toString() { return "ValueExtractorForDoc"; } + + @Override + public void close() { + // TODO memory accounting + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForNull.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForNull.java index 0bc04259b58e4..a45f16fc30910 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForNull.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForNull.java @@ -42,4 +42,9 @@ public Block build() { public String toString() { return "ValueExtractorForNull"; } + + @Override + public void close() { + // Nothing to close + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java index 0f161c4f2b337..9657d60376763 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java @@ -12,6 +12,7 @@ import org.apache.lucene.util.PriorityQueue; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.ElementType; @@ -241,6 +242,7 @@ public String describe() { private final List encoders; private final List sortOrders; + private Row spare; private Iterator output; public TopNOperator( @@ -313,21 +315,20 @@ public void addInput(Page page) { * and must be closed. That happens either because it's overflow from the * inputQueue or because we hit an allocation failure while building it. */ - Row row = null; try { for (int i = 0; i < page.getPositionCount(); i++) { - if (row == null) { - row = new Row(breaker); + if (spare == null) { + spare = new Row(breaker); } else { - row.keys.clear(); - row.orderByCompositeKeyAscending.clear(); - row.values.clear(); + spare.keys.clear(); + spare.orderByCompositeKeyAscending.clear(); + spare.values.clear(); } - rowFiller.row(i, row); - row = inputQueue.insertWithOverflow(row); + rowFiller.row(i, spare); + spare = inputQueue.insertWithOverflow(spare); } } finally { - Releasables.close(row, () -> page.releaseBlocks()); + Releasables.close(() -> page.releaseBlocks()); } } @@ -339,18 +340,24 @@ public void finish() { } private Iterator toPages() { + if (spare != null) { + // Remove the spare, we're never going to use it again. + spare.close(); + spare = null; + } if (inputQueue.size() == 0) { return Collections.emptyIterator(); } List list = new ArrayList<>(inputQueue.size()); + List result = new ArrayList<>(); + ResultBuilder[] builders = null; + boolean success = false; try { while (inputQueue.size() > 0) { list.add(inputQueue.pop()); } Collections.reverse(list); - List result = new ArrayList<>(); - ResultBuilder[] builders = null; int p = 0; int size = 0; for (int i = 0; i < list.size(); i++) { @@ -399,14 +406,22 @@ private Iterator toPages() { p++; if (p == size) { result.add(new Page(Arrays.stream(builders).map(ResultBuilder::build).toArray(Block[]::new))); + Releasables.closeExpectNoException(builders); builders = null; } - } assert builders == null; + success = true; return result.iterator(); } finally { - Releasables.closeExpectNoException(() -> Releasables.close(list)); + if (success == false) { + List close = new ArrayList<>(list); + for (Page p : result) { + close.add(p::releaseBlocks); + } + Collections.addAll(close, builders); + Releasables.closeExpectNoException(Releasables.wrap(close)); + } } } @@ -435,10 +450,15 @@ public Page getOutput() { @Override public void close() { /* - * If everything went well we'll have drained inputQueue to this'll - * be a noop. But if inputQueue + * If we close before calling finish then spare and inputQueue will be live rows + * that need closing. If we close after calling finish then the output iterator + * will contain pages of results that have yet to be returned. */ - Releasables.closeExpectNoException(() -> Releasables.close(inputQueue)); + Releasables.closeExpectNoException( + spare, + inputQueue == null ? null : Releasables.wrap(inputQueue), + output == null ? null : Releasables.wrap(() -> Iterators.map(output, p -> p::releaseBlocks)) + ); } private static long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(TopNOperator.class) + RamUsageEstimator diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/X-ResultBuilder.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/X-ResultBuilder.java.st index e0c8bbdffa404..ebe62398c8504 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/X-ResultBuilder.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/X-ResultBuilder.java.st @@ -82,4 +82,9 @@ $endif$ public String toString() { return "ResultBuilderFor$Type$[inKey=" + inKey + "]"; } + + @Override + public void close() { + builder.close(); + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java index de552d242afa2..2fd277d9672fe 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java @@ -7,47 +7,42 @@ package org.elasticsearch.compute.data; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.test.ESTestCase; +import java.util.ArrayList; import java.util.List; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; public class BlockBuilderTests extends ESTestCase { - - public void testAllNullsInt() { - for (int numEntries : List.of(1, randomIntBetween(1, 100))) { - testAllNullsImpl(IntBlock.newBlockBuilder(0), numEntries); - testAllNullsImpl(IntBlock.newBlockBuilder(100), numEntries); - testAllNullsImpl(IntBlock.newBlockBuilder(1000), numEntries); - testAllNullsImpl(IntBlock.newBlockBuilder(randomIntBetween(0, 100)), numEntries); + @ParametersFactory + public static List params() { + List params = new ArrayList<>(); + for (ElementType elementType : ElementType.values()) { + if (elementType == ElementType.UNKNOWN || elementType == ElementType.NULL || elementType == ElementType.DOC) { + continue; + } + params.add(new Object[] { elementType }); } + return params; } - public void testAllNullsLong() { - for (int numEntries : List.of(1, randomIntBetween(1, 100))) { - testAllNullsImpl(LongBlock.newBlockBuilder(0), numEntries); - testAllNullsImpl(LongBlock.newBlockBuilder(100), numEntries); - testAllNullsImpl(LongBlock.newBlockBuilder(1000), numEntries); - testAllNullsImpl(LongBlock.newBlockBuilder(randomIntBetween(0, 100)), numEntries); - } - } + private final ElementType elementType; - public void testAllNullsDouble() { - for (int numEntries : List.of(1, randomIntBetween(1, 100))) { - testAllNullsImpl(DoubleBlock.newBlockBuilder(0), numEntries); - testAllNullsImpl(DoubleBlock.newBlockBuilder(100), numEntries); - testAllNullsImpl(DoubleBlock.newBlockBuilder(1000), numEntries); - testAllNullsImpl(DoubleBlock.newBlockBuilder(randomIntBetween(0, 100)), numEntries); - } + public BlockBuilderTests(ElementType elementType) { + this.elementType = elementType; } - public void testAllNullsBytesRef() { + public void testAllNulls() { for (int numEntries : List.of(1, randomIntBetween(1, 100))) { - testAllNullsImpl(BytesRefBlock.newBlockBuilder(0), numEntries); - testAllNullsImpl(BytesRefBlock.newBlockBuilder(100), numEntries); - testAllNullsImpl(BytesRefBlock.newBlockBuilder(1000), numEntries); - testAllNullsImpl(BytesRefBlock.newBlockBuilder(randomIntBetween(0, 100)), numEntries); + testAllNullsImpl(elementType.newBlockBuilder(0), numEntries); + testAllNullsImpl(elementType.newBlockBuilder(100), numEntries); + testAllNullsImpl(elementType.newBlockBuilder(1000), numEntries); + testAllNullsImpl(elementType.newBlockBuilder(randomIntBetween(0, 100)), numEntries); } } @@ -65,4 +60,36 @@ private void testAllNullsImpl(Block.Builder builder, int numEntries) { static int randomPosition(int positionCount) { return positionCount == 1 ? 0 : randomIntBetween(0, positionCount - 1); } + + public void testCloseWithoutBuilding() { + BlockFactory blockFactory = BlockFactoryTests.blockFactory(ByteSizeValue.ofGb(1)); + elementType.newBlockBuilder(10, blockFactory).close(); + assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); + } + + public void testSingleBuild() { + BlockFactory blockFactory = BlockFactoryTests.blockFactory(ByteSizeValue.ofGb(1)); + try (Block.Builder builder = elementType.newBlockBuilder(10, blockFactory)) { + BasicBlockTests.RandomBlock random = BasicBlockTests.randomBlock(elementType, 10, false, 1, 1, 0, 0); + builder.copyFrom(random.block(), 0, random.block().getPositionCount()); + try (Block built = builder.build()) { + assertThat(built, equalTo(random.block())); + } + } + assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); + } + + public void testDoubleBuild() { + BlockFactory blockFactory = BlockFactoryTests.blockFactory(ByteSizeValue.ofGb(1)); + try (Block.Builder builder = elementType.newBlockBuilder(10, blockFactory)) { + BasicBlockTests.RandomBlock random = BasicBlockTests.randomBlock(elementType, 10, false, 1, 1, 0, 0); + builder.copyFrom(random.block(), 0, random.block().getPositionCount()); + try (Block built = builder.build()) { + assertThat(built, equalTo(random.block())); + } + Exception e = expectThrows(IllegalStateException.class, builder::build); + assertThat(e.getMessage(), equalTo("already closed")); + } + assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockFactoryTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockFactoryTests.java index 831be03cb0c81..ea0108ccd6cd7 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockFactoryTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockFactoryTests.java @@ -37,6 +37,10 @@ // BlockFactory is used and effectively tested in many other places, but this class contains tests // more specific to the factory implementation itself (and not necessarily tested elsewhere). public class BlockFactoryTests extends ESTestCase { + public static BlockFactory blockFactory(ByteSizeValue size) { + BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, size); + return new BlockFactory(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST), bigArrays); + } final CircuitBreaker breaker; final BigArrays bigArrays; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BytesRefBlockEqualityTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BytesRefBlockEqualityTests.java index 0eb9beec2e7f9..ee654497c1ec3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BytesRefBlockEqualityTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BytesRefBlockEqualityTests.java @@ -18,7 +18,6 @@ import java.util.Arrays; import java.util.BitSet; import java.util.List; -import java.util.stream.IntStream; public class BytesRefBlockEqualityTests extends ESTestCase { @@ -332,10 +331,14 @@ public void testSimpleBlockWithSingleNull() { public void testSimpleBlockWithManyNulls() { int positions = randomIntBetween(1, 256); boolean grow = randomBoolean(); - var builder = BytesRefBlock.newBlockBuilder(grow ? 0 : positions); - IntStream.range(0, positions).forEach(i -> builder.appendNull()); - BytesRefBlock block1 = builder.build(); - BytesRefBlock block2 = builder.build(); + BytesRefBlock.Builder builder1 = BytesRefBlock.newBlockBuilder(grow ? 0 : positions); + BytesRefBlock.Builder builder2 = BytesRefBlock.newBlockBuilder(grow ? 0 : positions); + for (int p = 0; p < positions; p++) { + builder1.appendNull(); + builder2.appendNull(); + } + BytesRefBlock block1 = builder1.build(); + BytesRefBlock block2 = builder2.build(); assertEquals(positions, block1.getPositionCount()); assertTrue(block1.mayHaveNulls()); assertTrue(block1.isNull(0)); @@ -365,15 +368,27 @@ public void testSimpleBlockWithSingleMultiValue() { public void testSimpleBlockWithManyMultiValues() { int positions = randomIntBetween(1, 256); boolean grow = randomBoolean(); - var builder = BytesRefBlock.newBlockBuilder(grow ? 0 : positions); + BytesRefBlock.Builder builder1 = BytesRefBlock.newBlockBuilder(grow ? 0 : positions); + BytesRefBlock.Builder builder2 = BytesRefBlock.newBlockBuilder(grow ? 0 : positions); + BytesRefBlock.Builder builder3 = BytesRefBlock.newBlockBuilder(grow ? 0 : positions); for (int pos = 0; pos < positions; pos++) { - builder.beginPositionEntry(); + builder1.beginPositionEntry(); + builder2.beginPositionEntry(); + builder3.beginPositionEntry(); int values = randomIntBetween(1, 16); - IntStream.range(0, values).forEach(i -> builder.appendBytesRef(new BytesRef(Integer.toHexString(randomInt())))); + for (int i = 0; i < values; i++) { + BytesRef value = new BytesRef(Integer.toHexString(randomInt())); + builder1.appendBytesRef(value); + builder2.appendBytesRef(value); + builder3.appendBytesRef(value); + } + builder1.endPositionEntry(); + builder2.endPositionEntry(); + builder3.endPositionEntry(); } - BytesRefBlock block1 = builder.build(); - BytesRefBlock block2 = builder.build(); - BytesRefBlock block3 = builder.build(); + BytesRefBlock block1 = builder1.build(); + BytesRefBlock block2 = builder2.build(); + BytesRefBlock block3 = builder3.build(); assertEquals(positions, block1.getPositionCount()); assertAllEquals(List.of(block1, block2, block3)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/DocVectorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/DocVectorTests.java index 85e578fcbd38f..154362389fce9 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/DocVectorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/DocVectorTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.compute.data; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; @@ -95,32 +96,36 @@ public void testRandomShardSegmentDocMap() { } private void assertShardSegmentDocMap(int[][] data, int[][] expected) { - DocBlock.Builder builder = DocBlock.newBlockBuilder(data.length); - for (int r = 0; r < data.length; r++) { - builder.appendShard(data[r][0]); - builder.appendSegment(data[r][1]); - builder.appendDoc(data[r][2]); + BlockFactory blockFactory = BlockFactoryTests.blockFactory(ByteSizeValue.ofGb(1)); + try (DocBlock.Builder builder = DocBlock.newBlockBuilder(data.length, blockFactory)) { + for (int r = 0; r < data.length; r++) { + builder.appendShard(data[r][0]); + builder.appendSegment(data[r][1]); + builder.appendDoc(data[r][2]); + } + try (DocVector docVector = builder.build().asVector()) { + int[] forwards = docVector.shardSegmentDocMapForwards(); + + int[][] result = new int[docVector.getPositionCount()][]; + for (int p = 0; p < result.length; p++) { + result[p] = new int[] { + docVector.shards().getInt(forwards[p]), + docVector.segments().getInt(forwards[p]), + docVector.docs().getInt(forwards[p]) }; + } + assertThat(result, equalTo(expected)); + + int[] backwards = docVector.shardSegmentDocMapBackwards(); + for (int p = 0; p < result.length; p++) { + result[p] = new int[] { + docVector.shards().getInt(backwards[forwards[p]]), + docVector.segments().getInt(backwards[forwards[p]]), + docVector.docs().getInt(backwards[forwards[p]]) }; + } + + assertThat(result, equalTo(data)); + } } - DocVector docVector = builder.build().asVector(); - int[] forwards = docVector.shardSegmentDocMapForwards(); - - int[][] result = new int[docVector.getPositionCount()][]; - for (int p = 0; p < result.length; p++) { - result[p] = new int[] { - docVector.shards().getInt(forwards[p]), - docVector.segments().getInt(forwards[p]), - docVector.docs().getInt(forwards[p]) }; - } - assertThat(result, equalTo(expected)); - - int[] backwards = docVector.shardSegmentDocMapBackwards(); - for (int p = 0; p < result.length; p++) { - result[p] = new int[] { - docVector.shards().getInt(backwards[forwards[p]]), - docVector.segments().getInt(backwards[forwards[p]]), - docVector.docs().getInt(backwards[forwards[p]]) }; - } - - assertThat(result, equalTo(data)); + assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/DoubleBlockEqualityTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/DoubleBlockEqualityTests.java index 2abbcc0b989f1..7dda97f52834e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/DoubleBlockEqualityTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/DoubleBlockEqualityTests.java @@ -11,7 +11,6 @@ import java.util.BitSet; import java.util.List; -import java.util.stream.IntStream; public class DoubleBlockEqualityTests extends ESTestCase { @@ -224,10 +223,14 @@ public void testSimpleBlockWithSingleNull() { public void testSimpleBlockWithManyNulls() { int positions = randomIntBetween(1, 256); boolean grow = randomBoolean(); - var builder = DoubleBlock.newBlockBuilder(grow ? 0 : positions); - IntStream.range(0, positions).forEach(i -> builder.appendNull()); - DoubleBlock block1 = builder.build(); - DoubleBlock block2 = builder.build(); + DoubleBlock.Builder builder1 = DoubleBlock.newBlockBuilder(grow ? 0 : positions); + DoubleBlock.Builder builder2 = DoubleBlock.newBlockBuilder(grow ? 0 : positions); + for (int p = 0; p < positions; p++) { + builder1.appendNull(); + builder2.appendNull(); + } + DoubleBlock block1 = builder1.build(); + DoubleBlock block2 = builder2.build(); assertEquals(positions, block1.getPositionCount()); assertTrue(block1.mayHaveNulls()); assertTrue(block1.isNull(0)); @@ -248,15 +251,27 @@ public void testSimpleBlockWithSingleMultiValue() { public void testSimpleBlockWithManyMultiValues() { int positions = randomIntBetween(1, 256); boolean grow = randomBoolean(); - var builder = DoubleBlock.newBlockBuilder(grow ? 0 : positions); + DoubleBlock.Builder builder1 = DoubleBlock.newBlockBuilder(grow ? 0 : positions); + DoubleBlock.Builder builder2 = DoubleBlock.newBlockBuilder(grow ? 0 : positions); + DoubleBlock.Builder builder3 = DoubleBlock.newBlockBuilder(grow ? 0 : positions); for (int pos = 0; pos < positions; pos++) { - builder.beginPositionEntry(); + builder1.beginPositionEntry(); + builder2.beginPositionEntry(); + builder3.beginPositionEntry(); int values = randomIntBetween(1, 16); - IntStream.range(0, values).forEach(i -> builder.appendDouble(randomDouble())); + for (int i = 0; i < values; i++) { + double value = randomDouble(); + builder1.appendDouble(value); + builder2.appendDouble(value); + builder3.appendDouble(value); + } + builder1.endPositionEntry(); + builder2.endPositionEntry(); + builder3.endPositionEntry(); } - DoubleBlock block1 = builder.build(); - DoubleBlock block2 = builder.build(); - DoubleBlock block3 = builder.build(); + DoubleBlock block1 = builder1.build(); + DoubleBlock block2 = builder2.build(); + DoubleBlock block3 = builder3.build(); assertEquals(positions, block1.getPositionCount()); assertAllEquals(List.of(block1, block2, block3)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/IntBlockEqualityTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/IntBlockEqualityTests.java index c4e19106d4368..40c84324f13d2 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/IntBlockEqualityTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/IntBlockEqualityTests.java @@ -11,7 +11,6 @@ import java.util.BitSet; import java.util.List; -import java.util.stream.IntStream; public class IntBlockEqualityTests extends ESTestCase { @@ -185,10 +184,14 @@ public void testSimpleBlockWithSingleNull() { public void testSimpleBlockWithManyNulls() { int positions = randomIntBetween(1, 256); boolean grow = randomBoolean(); - var builder = IntBlock.newBlockBuilder(grow ? 0 : positions); - IntStream.range(0, positions).forEach(i -> builder.appendNull()); - IntBlock block1 = builder.build(); - IntBlock block2 = builder.build(); + IntBlock.Builder builder1 = IntBlock.newBlockBuilder(grow ? 0 : positions); + IntBlock.Builder builder2 = IntBlock.newBlockBuilder(grow ? 0 : positions); + for (int p = 0; p < positions; p++) { + builder1.appendNull(); + builder2.appendNull(); + } + IntBlock block1 = builder1.build(); + IntBlock block2 = builder2.build(); assertEquals(positions, block1.getPositionCount()); assertTrue(block1.mayHaveNulls()); assertTrue(block1.isNull(0)); @@ -210,15 +213,27 @@ public void testSimpleBlockWithSingleMultiValue() { public void testSimpleBlockWithManyMultiValues() { int positions = randomIntBetween(1, 256); boolean grow = randomBoolean(); - var builder = IntBlock.newBlockBuilder(grow ? 0 : positions); + IntBlock.Builder builder1 = IntBlock.newBlockBuilder(grow ? 0 : positions); + IntBlock.Builder builder2 = IntBlock.newBlockBuilder(grow ? 0 : positions); + IntBlock.Builder builder3 = IntBlock.newBlockBuilder(grow ? 0 : positions); for (int pos = 0; pos < positions; pos++) { - builder.beginPositionEntry(); + builder1.beginPositionEntry(); + builder2.beginPositionEntry(); + builder3.beginPositionEntry(); int values = randomIntBetween(1, 16); - IntStream.range(0, values).forEach(i -> builder.appendInt(randomInt())); + for (int i = 0; i < values; i++) { + int value = randomInt(); + builder1.appendInt(value); + builder2.appendInt(value); + builder3.appendInt(value); + } + builder1.endPositionEntry(); + builder2.endPositionEntry(); + builder3.endPositionEntry(); } - IntBlock block1 = builder.build(); - IntBlock block2 = builder.build(); - IntBlock block3 = builder.build(); + IntBlock block1 = builder1.build(); + IntBlock block2 = builder2.build(); + IntBlock block3 = builder3.build(); assertEquals(positions, block1.getPositionCount()); assertAllEquals(List.of(block1, block2, block3)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/LongBlockEqualityTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/LongBlockEqualityTests.java index 3d08b2a96d635..a24b4a4dd6fa6 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/LongBlockEqualityTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/LongBlockEqualityTests.java @@ -11,7 +11,6 @@ import java.util.BitSet; import java.util.List; -import java.util.stream.IntStream; public class LongBlockEqualityTests extends ESTestCase { @@ -191,10 +190,14 @@ public void testSimpleBlockWithSingleNull() { public void testSimpleBlockWithManyNulls() { int positions = randomIntBetween(1, 256); boolean grow = randomBoolean(); - var builder = LongBlock.newBlockBuilder(grow ? 0 : positions); - IntStream.range(0, positions).forEach(i -> builder.appendNull()); - LongBlock block1 = builder.build(); - LongBlock block2 = builder.build(); + LongBlock.Builder builder1 = LongBlock.newBlockBuilder(grow ? 0 : positions); + LongBlock.Builder builder2 = LongBlock.newBlockBuilder(grow ? 0 : positions); + for (int p = 0; p < positions; p++) { + builder1.appendNull(); + builder2.appendNull(); + } + LongBlock block1 = builder1.build(); + LongBlock block2 = builder2.build(); assertEquals(positions, block1.getPositionCount()); assertTrue(block1.mayHaveNulls()); assertTrue(block1.isNull(0)); @@ -216,15 +219,27 @@ public void testSimpleBlockWithSingleMultiValue() { public void testSimpleBlockWithManyMultiValues() { int positions = randomIntBetween(1, 256); boolean grow = randomBoolean(); - var builder = LongBlock.newBlockBuilder(grow ? 0 : positions); + LongBlock.Builder builder1 = LongBlock.newBlockBuilder(grow ? 0 : positions); + LongBlock.Builder builder2 = LongBlock.newBlockBuilder(grow ? 0 : positions); + LongBlock.Builder builder3 = LongBlock.newBlockBuilder(grow ? 0 : positions); for (int pos = 0; pos < positions; pos++) { - builder.beginPositionEntry(); - int values = randomIntBetween(1, 16); - IntStream.range(0, values).forEach(i -> builder.appendLong(randomLong())); + builder1.beginPositionEntry(); + builder2.beginPositionEntry(); + builder3.beginPositionEntry(); + int valueCount = randomIntBetween(1, 16); + for (int i = 0; i < valueCount; i++) { + long value = randomLong(); + builder1.appendLong(value); + builder2.appendLong(value); + builder3.appendLong(value); + } + builder1.endPositionEntry(); + builder2.endPositionEntry(); + builder3.endPositionEntry(); } - LongBlock block1 = builder.build(); - LongBlock block2 = builder.build(); - LongBlock block3 = builder.build(); + LongBlock block1 = builder1.build(); + LongBlock block2 = builder2.build(); + LongBlock block3 = builder3.build(); assertEquals(positions, block1.getPositionCount()); assertAllEquals(List.of(block1, block2, block3)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/TestBlockBuilder.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/TestBlockBuilder.java index 4684da93a661a..d9377a490368d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/TestBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/TestBlockBuilder.java @@ -139,6 +139,11 @@ public Block.Builder appendAllValuesToCurrentPosition(Block block) { public IntBlock build() { return builder.build(); } + + @Override + public void close() { + builder.close(); + } } private static class TestLongBlockBuilder extends TestBlockBuilder { @@ -195,6 +200,11 @@ public Block.Builder appendAllValuesToCurrentPosition(Block block) { public LongBlock build() { return builder.build(); } + + @Override + public void close() { + builder.close(); + } } private static class TestDoubleBlockBuilder extends TestBlockBuilder { @@ -251,6 +261,11 @@ public Block.Builder appendAllValuesToCurrentPosition(Block block) { public DoubleBlock build() { return builder.build(); } + + @Override + public void close() { + builder.close(); + } } private static class TestBytesRefBlockBuilder extends TestBlockBuilder { @@ -307,6 +322,11 @@ public Block.Builder appendAllValuesToCurrentPosition(Block block) { public BytesRefBlock build() { return builder.build(); } + + @Override + public void close() { + builder.close(); + } } private static class TestBooleanBlockBuilder extends TestBlockBuilder { @@ -366,5 +386,10 @@ public Block.Builder appendAllValuesToCurrentPosition(Block block) { public BooleanBlock build() { return builder.build(); } + + @Override + public void close() { + builder.close(); + } } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java new file mode 100644 index 0000000000000..0422040d7922f --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java @@ -0,0 +1,114 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + +public class VectorBuilderTests extends ESTestCase { + @ParametersFactory + public static List params() { + List params = new ArrayList<>(); + for (ElementType elementType : ElementType.values()) { + if (elementType == ElementType.UNKNOWN || elementType == ElementType.NULL || elementType == ElementType.DOC) { + continue; + } + params.add(new Object[] { elementType }); + } + return params; + } + + private final ElementType elementType; + + public VectorBuilderTests(ElementType elementType) { + this.elementType = elementType; + } + + public void testCloseWithoutBuilding() { + BlockFactory blockFactory = BlockFactoryTests.blockFactory(ByteSizeValue.ofGb(1)); + vectorBuilder(10, blockFactory).close(); + assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); + } + + public void testSingleBuild() { + BlockFactory blockFactory = BlockFactoryTests.blockFactory(ByteSizeValue.ofGb(1)); + try (Vector.Builder builder = vectorBuilder(10, blockFactory)) { + BasicBlockTests.RandomBlock random = BasicBlockTests.randomBlock(elementType, 10, false, 1, 1, 0, 0); + fill(builder, random.block().asVector()); + try (Vector built = builder.build()) { + assertThat(built, equalTo(random.block().asVector())); + } + } + assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); + } + + public void testDoubleBuild() { + BlockFactory blockFactory = BlockFactoryTests.blockFactory(ByteSizeValue.ofGb(1)); + try (Vector.Builder builder = vectorBuilder(10, blockFactory)) { + BasicBlockTests.RandomBlock random = BasicBlockTests.randomBlock(elementType, 10, false, 1, 1, 0, 0); + fill(builder, random.block().asVector()); + try (Vector built = builder.build()) { + assertThat(built, equalTo(random.block().asVector())); + } + Exception e = expectThrows(IllegalStateException.class, builder::build); + assertThat(e.getMessage(), equalTo("already closed")); + } + assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); + } + + private Vector.Builder vectorBuilder(int estimatedSize, BlockFactory blockFactory) { + return switch (elementType) { + case NULL, DOC, UNKNOWN -> throw new UnsupportedOperationException(); + case BOOLEAN -> BooleanVector.newVectorBuilder(estimatedSize, blockFactory); + case BYTES_REF -> BytesRefVector.newVectorBuilder(estimatedSize, blockFactory); + case DOUBLE -> DoubleVector.newVectorBuilder(estimatedSize, blockFactory); + case INT -> IntVector.newVectorBuilder(estimatedSize, blockFactory); + case LONG -> LongVector.newVectorBuilder(estimatedSize, blockFactory); + }; + } + + private void fill(Vector.Builder builder, Vector from) { + switch (elementType) { + case NULL, DOC, UNKNOWN -> throw new UnsupportedOperationException(); + case BOOLEAN -> { + for (int p = 0; p < from.getPositionCount(); p++) { + ((BooleanVector.Builder) builder).appendBoolean(((BooleanVector) from).getBoolean(p)); + } + } + case BYTES_REF -> { + for (int p = 0; p < from.getPositionCount(); p++) { + ((BytesRefVector.Builder) builder).appendBytesRef(((BytesRefVector) from).getBytesRef(p, new BytesRef())); + } + } + case DOUBLE -> { + for (int p = 0; p < from.getPositionCount(); p++) { + ((DoubleVector.Builder) builder).appendDouble(((DoubleVector) from).getDouble(p)); + } + } + case INT -> { + for (int p = 0; p < from.getPositionCount(); p++) { + ((IntVector.Builder) builder).appendInt(((IntVector) from).getInt(p)); + } + } + case LONG -> { + for (int p = 0; p < from.getPositionCount(); p++) { + ((LongVector.Builder) builder).appendLong(((LongVector) from).getLong(p)); + } + } + } + ; + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/CannedSourceOperator.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/CannedSourceOperator.java index 6d009a2536c34..57ea313b88dab 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/CannedSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/CannedSourceOperator.java @@ -96,5 +96,9 @@ public Page getOutput() { } @Override - public void close() {} + public void close() { + while (page.hasNext()) { + page.next().releaseBlocks(); + } + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java index df07eed001634..808835c0ce1ec 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java @@ -88,21 +88,21 @@ public final void testSimpleCircuitBreaking() { * The input blocks don't count against the memory usage for the limited operator that we * build. */ - List input = CannedSourceOperator.collectPages(simpleInput(driverContext().blockFactory(), between(1_000, 10_000))); - try { - BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, smallEnoughToCircuitBreak()) - .withCircuitBreaking(); - CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST); - BlockFactory blockFactory = BlockFactory.getInstance(breaker, bigArrays); - Exception e = expectThrows( - CircuitBreakingException.class, - () -> drive(simple(bigArrays).get(new DriverContext(bigArrays, blockFactory)), input.iterator()) - ); - assertThat(e.getMessage(), equalTo(MockBigArrays.ERROR_MESSAGE)); - assertThat(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); - } finally { - Releasables.close(() -> Iterators.map(input.iterator(), p -> p::releaseBlocks)); - } + DriverContext inputFactoryContext = driverContext(); + BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, smallEnoughToCircuitBreak()) + .withCircuitBreaking(); + List input = CannedSourceOperator.collectPages(simpleInput(inputFactoryContext.blockFactory(), between(1_000, 10_000))); + CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST); + BlockFactory blockFactory = BlockFactory.getInstance(breaker, bigArrays); + Exception e = expectThrows( + CircuitBreakingException.class, + () -> drive(simple(bigArrays).get(new DriverContext(bigArrays, blockFactory)), input.iterator()) + ); + assertThat(e.getMessage(), equalTo(MockBigArrays.ERROR_MESSAGE)); + + // Note the lack of try/finally here - we're asserting that when the driver throws an exception we clear the breakers. + assertThat(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); + assertThat(inputFactoryContext.breaker().getUsed(), equalTo(0L)); } /** @@ -112,15 +112,24 @@ public final void testSimpleCircuitBreaking() { * in ctors. */ public final void testSimpleWithCranky() { - CrankyCircuitBreakerService breaker = new CrankyCircuitBreakerService(); - BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, breaker).withCircuitBreaking(); - BlockFactory blockFactory = BlockFactory.getInstance(breaker.getBreaker("request"), bigArrays); + DriverContext inputFactoryContext = driverContext(); + List input = CannedSourceOperator.collectPages(simpleInput(inputFactoryContext.blockFactory(), between(1_000, 10_000))); + + CrankyCircuitBreakerService cranky = new CrankyCircuitBreakerService(); + BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, cranky).withCircuitBreaking(); + BlockFactory blockFactory = BlockFactory.getInstance(cranky.getBreaker(CircuitBreaker.REQUEST), bigArrays); try { - assertSimple(new DriverContext(bigArrays, blockFactory), between(1_000, 10_000)); + List result = drive(simple(bigArrays).get(new DriverContext(bigArrays, blockFactory)), input.iterator()); + Releasables.close(() -> Iterators.map(result.iterator(), p -> p::releaseBlocks)); // Either we get lucky and cranky doesn't throw and the test completes or we don't and it throws } catch (CircuitBreakingException e) { + logger.info("broken", e); assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE)); } + + // Note the lack of try/finally here - we're asserting that when the driver throws an exception we clear the breakers. + assertThat(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); + assertThat(inputFactoryContext.breaker().getUsed(), equalTo(0L)); } /** diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index 373478731904a..2b79781789c0e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -38,7 +38,9 @@ import org.elasticsearch.compute.operator.TupleBlockSourceOperator; import org.elasticsearch.core.Tuple; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.ListMatcher; import org.elasticsearch.xpack.versionfield.Version; +import org.junit.After; import java.lang.reflect.Field; import java.net.InetAddress; @@ -184,7 +186,7 @@ protected ByteSizeValue smallEnoughToCircuitBreak() { * 775 causes us to blow up while collecting values and 780 doesn't * trip the breaker. So 775 is the max on this range. */ - return ByteSizeValue.ofBytes(between(1, 775)); + return ByteSizeValue.ofBytes(775); } public void testRamBytesUsed() { @@ -210,23 +212,26 @@ public long accumulateObject(Object o, long shallowSize, Map fiel // We under-count by a few bytes because of the lists. In that end that's fine, but we need to account for it here. long underCount = 200; DriverContext context = driverContext(); - TopNOperator op = new TopNOperator.TopNOperatorFactory( - topCount, - List.of(LONG), - List.of(DEFAULT_UNSORTABLE), - List.of(new TopNOperator.SortOrder(0, true, false)), - pageSize - ).get(context); - long actualEmpty = RamUsageTester.ramUsed(op, acc); - assertThat(op.ramBytesUsed(), both(greaterThan(actualEmpty - underCount)).and(lessThan(actualEmpty))); - // But when we fill it then we're quite close - for (Page p : CannedSourceOperator.collectPages(simpleInput(context.blockFactory(), topCount))) { - op.addInput(p); - } - long actualFull = RamUsageTester.ramUsed(op, acc); - assertThat(op.ramBytesUsed(), both(greaterThan(actualFull - underCount)).and(lessThan(actualFull))); + try ( + TopNOperator op = new TopNOperator.TopNOperatorFactory( + topCount, + List.of(LONG), + List.of(DEFAULT_UNSORTABLE), + List.of(new TopNOperator.SortOrder(0, true, false)), + pageSize + ).get(context) + ) { + long actualEmpty = RamUsageTester.ramUsed(op, acc); + assertThat(op.ramBytesUsed(), both(greaterThan(actualEmpty - underCount)).and(lessThan(actualEmpty))); + // But when we fill it then we're quite close + for (Page p : CannedSourceOperator.collectPages(simpleInput(context.blockFactory(), topCount))) { + op.addInput(p); + } + long actualFull = RamUsageTester.ramUsed(op, acc); + assertThat(op.ramBytesUsed(), both(greaterThan(actualFull - underCount)).and(lessThan(actualFull))); - // TODO empty it again and check. + // TODO empty it again and check. + } } public void testRandomTopN() { @@ -637,6 +642,7 @@ private List> topNTwoColumns( for (int i = 0; i < block1.getPositionCount(); i++) { outputValues.add(tuple(block1.isNull(i) ? null : block1.getLong(i), block2.isNull(i) ? null : block2.getLong(i))); } + page.releaseBlocks(); }), () -> {} ) @@ -1284,6 +1290,50 @@ public void testZeroByte() { assertThat((Integer) actual.get(1).get(1), equalTo(100)); } + public void testErrorBeforeFullyDraining() { + int maxPageSize = between(1, 100); + int topCount = maxPageSize * 4; + int docCount = topCount * 10; + List> actual = new ArrayList<>(); + DriverContext driverContext = driverContext(); + try ( + Driver driver = new Driver( + driverContext, + new SequenceLongBlockSourceOperator(driverContext.blockFactory(), LongStream.range(0, docCount)), + List.of( + new TopNOperator( + driverContext.blockFactory(), + nonBreakingBigArrays().breakerService().getBreaker("request"), + topCount, + List.of(LONG), + List.of(DEFAULT_UNSORTABLE), + List.of(new TopNOperator.SortOrder(0, true, randomBoolean())), + maxPageSize + ) + ), + new PageConsumerOperator(p -> { + assertThat(p.getPositionCount(), equalTo(maxPageSize)); + if (actual.isEmpty()) { + readInto(actual, p); + } else { + p.releaseBlocks(); + throw new RuntimeException("boo"); + } + }), + () -> {} + ) + ) { + Exception e = expectThrows(RuntimeException.class, () -> runDriver(driver)); + assertThat(e.getMessage(), equalTo("boo")); + } + + ListMatcher values = matchesList(); + for (int i = 0; i < maxPageSize; i++) { + values = values.item((long) i); + } + assertMap(actual, matchesList().item(values)); + } + public void testCloseWithoutCompleting() { CircuitBreaker breaker = new MockBigArrays.LimitedBreaker(CircuitBreaker.REQUEST, ByteSizeValue.ofGb(1)); try ( @@ -1299,13 +1349,23 @@ public void testCloseWithoutCompleting() { ) { op.addInput(new Page(new IntArrayVector(new int[] { 1 }, 1).asBlock())); } - assertThat(breaker.getUsed(), equalTo(0L)); } + private final List breakers = new ArrayList<>(); + @Override protected DriverContext driverContext() { // TODO remove this when the parent uses a breaking block factory BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); - return new DriverContext(bigArrays, new BlockFactory(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST), bigArrays)); + CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST); + breakers.add(breaker); + return new DriverContext(bigArrays, new BlockFactory(breaker, bigArrays)); + } + + @After + public void allBreakersEmpty() { + for (CircuitBreaker breaker : breakers) { + assertThat(breaker.getUsed(), equalTo(0L)); + } } @SuppressWarnings({ "unchecked", "rawtypes" })