Skip to content

Commit

Permalink
Fix BytesRef
Browse files Browse the repository at this point in the history
  • Loading branch information
nik9000 committed Sep 27, 2023
1 parent eec33df commit 0b99496
Show file tree
Hide file tree
Showing 16 changed files with 109 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,14 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public long ramBytesUsed() {
return BASE_RAM_BYTES_USED + startOffsets.ramBytesUsed() + bytes.ramBytesUsed();
return BASE_RAM_BYTES_USED + bigArraysRamBytesUsed();
}

/**
* Memory used by the {@link BigArrays} portion of this {@link BytesRefArray}.
*/
public long bigArraysRamBytesUsed() {
return startOffsets.ramBytesUsed() + bytes.ramBytesUsed();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public BooleanBlock expand() {
public static long ramBytesEstimated(boolean[] values, int[] firstValueIndexes, BitSet nullsMask) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values) + BlockRamUsageEstimator.sizeOf(firstValueIndexes)
+ BlockRamUsageEstimator.sizeOfBitSet(nullsMask) + RamUsageEstimator.shallowSizeOfInstance(MvOrdering.class);
// TODO mvordering is shared
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public BytesRefBlock expand() {
public static long ramBytesEstimated(BytesRefArray values, int[] firstValueIndexes, BitSet nullsMask) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values) + BlockRamUsageEstimator.sizeOf(firstValueIndexes)
+ BlockRamUsageEstimator.sizeOfBitSet(nullsMask) + RamUsageEstimator.shallowSizeOfInstance(MvOrdering.class);
// TODO mvordering is shared
}

@Override
Expand Down Expand Up @@ -111,7 +112,7 @@ public String toString() {

@Override
public void close() {
blockFactory.adjustBreaker(-(ramBytesUsed() - values.ramBytesUsed()), true);
blockFactory.adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public String toString() {

@Override
public void close() {
blockFactory.adjustBreaker(-BASE_RAM_BYTES_USED, true);
blockFactory.adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ public BytesRefBlock build() {
block = new ConstantBytesRefVector(BytesRef.deepCopyOf(values.get(0, new BytesRef())), 1, blockFactory).asBlock();
Releasables.closeExpectNoException(values);
} else {
estimatedBytes += values.ramBytesUsed();
if (isDense() && singleValued()) {
block = new BytesRefArrayVector(values, positionCount, blockFactory).asBlock();
} else {
Expand All @@ -212,7 +211,8 @@ public BytesRefBlock build() {
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false);
blockFactory.adjustBreaker(block.ramBytesUsed() - values.bigArraysRamBytesUsed(), false);
assert estimatedBytes == 0;
built();
return block;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ public BytesRefVector build() {
vector = new ConstantBytesRefVector(BytesRef.deepCopyOf(values.get(0, new BytesRef())), 1, blockFactory);
Releasables.closeExpectNoException(values);
} else {
estimatedBytes = values.ramBytesUsed();
vector = new BytesRefArrayVector(values, valueCount, blockFactory);
}
assert estimatedBytes == 0;
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
Expand All @@ -71,7 +71,8 @@ public BytesRefVector build() {
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false);
blockFactory.adjustBreaker(vector.ramBytesUsed() - values.bigArraysRamBytesUsed(), false);
values = null;
built();
return vector;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public DoubleBlock expand() {
public static long ramBytesEstimated(double[] values, int[] firstValueIndexes, BitSet nullsMask) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values) + BlockRamUsageEstimator.sizeOf(firstValueIndexes)
+ BlockRamUsageEstimator.sizeOfBitSet(nullsMask) + RamUsageEstimator.shallowSizeOfInstance(MvOrdering.class);
// TODO mvordering is shared
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public IntBlock expand() {
public static long ramBytesEstimated(int[] values, int[] firstValueIndexes, BitSet nullsMask) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values) + BlockRamUsageEstimator.sizeOf(firstValueIndexes)
+ BlockRamUsageEstimator.sizeOfBitSet(nullsMask) + RamUsageEstimator.shallowSizeOfInstance(MvOrdering.class);
// TODO mvordering is shared
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public LongBlock expand() {
public static long ramBytesEstimated(long[] values, int[] firstValueIndexes, BitSet nullsMask) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values) + BlockRamUsageEstimator.sizeOf(firstValueIndexes)
+ BlockRamUsageEstimator.sizeOfBitSet(nullsMask) + RamUsageEstimator.shallowSizeOfInstance(MvOrdering.class);
// TODO mvordering is shared
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ $endif$
public static long ramBytesEstimated($if(BytesRef)$BytesRefArray$else$$type$[]$endif$ values, int[] firstValueIndexes, BitSet nullsMask) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values) + BlockRamUsageEstimator.sizeOf(firstValueIndexes)
+ BlockRamUsageEstimator.sizeOfBitSet(nullsMask) + RamUsageEstimator.shallowSizeOfInstance(MvOrdering.class);
// TODO mvordering is shared
}

@Override
Expand Down Expand Up @@ -133,7 +134,7 @@ $endif$
@Override
public void close() {
$if(BytesRef)$
blockFactory.adjustBreaker(-(ramBytesUsed() - values.ramBytesUsed()), true);
blockFactory.adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
$else$
blockFactory.adjustBreaker(-ramBytesUsed(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ $endif$
$if(BytesRef)$
@Override
public void close() {
blockFactory.adjustBreaker(-BASE_RAM_BYTES_USED, true);
blockFactory.adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
$endif$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ $if(BytesRef)$
block = new ConstantBytesRefVector(BytesRef.deepCopyOf(values.get(0, new BytesRef())), 1, blockFactory).asBlock();
Releasables.closeExpectNoException(values);
} else {
estimatedBytes += values.ramBytesUsed();
$else$
block = new Constant$Type$Vector(values[0], 1, blockFactory).asBlock();
} else {
Expand All @@ -275,7 +274,12 @@ $endif$
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
$if(BytesRef)$
blockFactory.adjustBreaker(block.ramBytesUsed() - values.bigArraysRamBytesUsed(), false);
assert estimatedBytes == 0;
$else$
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false);
$endif$
built();
return block;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,31 @@ $endif$
public $Type$Vector build() {
finish();
$Type$Vector vector;
if (valueCount == 1) {
$if(BytesRef)$
if (valueCount == 1) {
vector = new ConstantBytesRefVector(BytesRef.deepCopyOf(values.get(0, new BytesRef())), 1, blockFactory);
Releasables.closeExpectNoException(values);
} else {
estimatedBytes = values.ramBytesUsed();
vector = new $Type$ArrayVector(values, valueCount, blockFactory);
}
assert estimatedBytes == 0;
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed() - values.bigArraysRamBytesUsed(), false);
values = null;
$else$
if (valueCount == 1) {
vector = new Constant$Type$Vector(values[0], 1, blockFactory);
} else {
if (values.length - valueCount > 1024 || valueCount < (values.length / 2)) {
values = Arrays.copyOf(values, valueCount);
}
$endif$
vector = new $Type$ArrayVector(values, valueCount, blockFactory);
}
/*
Expand All @@ -109,6 +121,7 @@ $endif$
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false);
$endif$
built();
return vector;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,15 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import com.carrotsearch.randomizedtesting.annotations.Repeat;

import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.CrankyCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
Expand Down Expand Up @@ -74,7 +82,9 @@ public void testSingleBuild() {
builder.copyFrom(random.block(), 0, random.block().getPositionCount());
try (Block built = builder.build()) {
assertThat(built, equalTo(random.block()));
assertThat(blockFactory.breaker().getUsed(), equalTo(built.ramBytesUsed()));
}
assertThat(blockFactory.breaker().getUsed(), equalTo(0L));
}
assertThat(blockFactory.breaker().getUsed(), equalTo(0L));
}
Expand All @@ -86,10 +96,31 @@ public void testDoubleBuild() {
builder.copyFrom(random.block(), 0, random.block().getPositionCount());
try (Block built = builder.build()) {
assertThat(built, equalTo(random.block()));
assertThat(blockFactory.breaker().getUsed(), equalTo(built.ramBytesUsed()));
}
assertThat(blockFactory.breaker().getUsed(), equalTo(0L));
Exception e = expectThrows(IllegalStateException.class, builder::build);
assertThat(e.getMessage(), equalTo("already closed"));
}
assertThat(blockFactory.breaker().getUsed(), equalTo(0L));
}

public void testCranky() {
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new CrankyCircuitBreakerService());
BlockFactory blockFactory = new BlockFactory(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST), bigArrays);
try {
try (Block.Builder builder = elementType.newBlockBuilder(10, blockFactory)) {
BasicBlockTests.RandomBlock random = BasicBlockTests.randomBlock(elementType, 10, false, 1, 1, 0, 0);
builder.copyFrom(random.block(), 0, random.block().getPositionCount());
try (Block built = builder.build()) {
assertThat(built, equalTo(random.block()));
}
}
// If we made it this far cranky didn't fail us!
} catch (CircuitBreakingException e) {
logger.info("cranky", e);
assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE));
}
assertThat(blockFactory.breaker().getUsed(), equalTo(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
// more specific to the factory implementation itself (and not necessarily tested elsewhere).
public class BlockFactoryTests extends ESTestCase {
public static BlockFactory blockFactory(ByteSizeValue size) {
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, size);
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, size).withCircuitBreaking();
return new BlockFactory(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST), bigArrays);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,26 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import com.carrotsearch.randomizedtesting.annotations.Repeat;

import com.carrotsearch.randomizedtesting.annotations.Seed;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.CrankyCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.List;

import static org.hamcrest.Matchers.equalTo;

@Seed("7E594D56CF8843C5:4B7F19DE1F496E74")
public class VectorBuilderTests extends ESTestCase {
@ParametersFactory
public static List<Object[]> params() {
Expand Down Expand Up @@ -50,7 +61,9 @@ public void testSingleBuild() {
fill(builder, random.block().asVector());
try (Vector built = builder.build()) {
assertThat(built, equalTo(random.block().asVector()));
assertThat(blockFactory.breaker().getUsed(), equalTo(built.ramBytesUsed()));
}
assertThat(blockFactory.breaker().getUsed(), equalTo(0L));
}
assertThat(blockFactory.breaker().getUsed(), equalTo(0L));
}
Expand All @@ -63,12 +76,32 @@ public void testDoubleBuild() {
try (Vector built = builder.build()) {
assertThat(built, equalTo(random.block().asVector()));
}
assertThat(blockFactory.breaker().getUsed(), equalTo(0L));
Exception e = expectThrows(IllegalStateException.class, builder::build);
assertThat(e.getMessage(), equalTo("already closed"));
}
assertThat(blockFactory.breaker().getUsed(), equalTo(0L));
}

public void testCranky() {
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new CrankyCircuitBreakerService());
BlockFactory blockFactory = new BlockFactory(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST), bigArrays);
try {
try (Vector.Builder builder = vectorBuilder(10, blockFactory)) {
BasicBlockTests.RandomBlock random = BasicBlockTests.randomBlock(elementType, 10, false, 1, 1, 0, 0);
fill(builder, random.block().asVector());
try (Vector built = builder.build()) {
assertThat(built, equalTo(random.block().asVector()));
}
}
// If we made it this far cranky didn't fail us!
} catch (CircuitBreakingException e) {
logger.info("cranky", e);
assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE));
}
assertThat(blockFactory.breaker().getUsed(), equalTo(0L));
}

private Vector.Builder vectorBuilder(int estimatedSize, BlockFactory blockFactory) {
return switch (elementType) {
case NULL, DOC, UNKNOWN -> throw new UnsupportedOperationException();
Expand Down Expand Up @@ -109,6 +142,5 @@ private void fill(Vector.Builder builder, Vector from) {
}
}
}
;
}
}

0 comments on commit 0b99496

Please sign in to comment.