Skip to content

Commit

Permalink
Fix memory tracking in TopN.Row (#102831)
Browse files Browse the repository at this point in the history
This commit addresses the issue of missing memory tracking for the 
BitSet in TopN.Row. Instead of introducing BreakingBitSet, we replace
the BitSet with a smaller array of offsets in this PR. Nik suggested to
remove that BitSet, but I haven't looked into that option yet.

Closes #100640
Closes #102683
Closes #102790
Closes #102784
  • Loading branch information
dnhatn authored Dec 1, 2023
1 parent 8af50e3 commit 60b7622
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 29 deletions.
9 changes: 9 additions & 0 deletions docs/changelog/102831.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
pr: 102831
summary: Fix memory tracking in TopN.Row
area: ES|QL
type: bug
issues:
- 100640
- 102784
- 102790
- 102683
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand All @@ -51,8 +50,7 @@ public class TopNOperator implements Operator, Accountable {
* multivalues) to reference each position in each block of the Page.
*/
static final class Row implements Accountable, Releasable {
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Row.class) + RamUsageEstimator
.shallowSizeOfInstance(BitSet.class);
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Row.class);

/**
* The sort key.
Expand All @@ -64,7 +62,7 @@ static final class Row implements Accountable, Releasable {
* For ex, if a Long is represented as 8 bytes, each of these bytes will have the same value (set/unset) if the respective Long
* value is used for sorting ascending/descending.
*/
final BitSet orderByCompositeKeyAscending = new BitSet();
final BytesOrder bytesOrder;

/**
* Values to reconstruct the row. Sort of. When we reconstruct the row we read
Expand All @@ -73,11 +71,12 @@ static final class Row implements Accountable, Releasable {
*/
final BreakingBytesRefBuilder values;

Row(CircuitBreaker breaker) {
Row(CircuitBreaker breaker, List<SortOrder> sortOrders) {
boolean success = false;
try {
keys = new BreakingBytesRefBuilder(breaker, "topn");
values = new BreakingBytesRefBuilder(breaker, "topn");
bytesOrder = new BytesOrder(sortOrders, breaker, "topn");
success = true;
} finally {
if (success == false) {
Expand All @@ -88,12 +87,54 @@ static final class Row implements Accountable, Releasable {

@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + keys.ramBytesUsed() + orderByCompositeKeyAscending.size() / Byte.SIZE + values.ramBytesUsed();
return SHALLOW_SIZE + keys.ramBytesUsed() + bytesOrder.ramBytesUsed() + values.ramBytesUsed();
}

@Override
public void close() {
Releasables.closeExpectNoException(keys, values);
Releasables.closeExpectNoException(keys, values, bytesOrder);
}
}

static final class BytesOrder implements Releasable, Accountable {
private static final long BASE_RAM_USAGE = RamUsageEstimator.shallowSizeOfInstance(BytesOrder.class);
private final CircuitBreaker breaker;
final List<SortOrder> sortOrders;
final int[] endOffsets;

BytesOrder(List<SortOrder> sortOrders, CircuitBreaker breaker, String label) {
this.breaker = breaker;
this.sortOrders = sortOrders;
breaker.addEstimateBytesAndMaybeBreak(memoryUsed(sortOrders.size()), label);
this.endOffsets = new int[sortOrders.size()];
}

/**
* Returns true if the byte at the given position is ordered ascending; otherwise, return false
*/
boolean isByteOrderAscending(int bytePosition) {
int index = Arrays.binarySearch(endOffsets, bytePosition);
if (index < 0) {
index = -1 - index;
}
return sortOrders.get(index).asc();
}

private long memoryUsed(int numKeys) {
// sortOrders is global and its memory is accounted at the top level TopNOperator
return BASE_RAM_USAGE + RamUsageEstimator.alignObjectSize(
(long) RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (long) Integer.BYTES * numKeys
);
}

@Override
public long ramBytesUsed() {
return memoryUsed(sortOrders.size());
}

@Override
public void close() {
breaker.addWithoutBreaking(-ramBytesUsed());
}
}

Expand Down Expand Up @@ -138,14 +179,11 @@ void row(int position, Row destination) {

private void writeKey(int position, Row row) {
int orderByCompositeKeyCurrentPosition = 0;
for (KeyFactory factory : keyFactories) {
int valueAsBytesSize = factory.extractor.writeKey(row.keys, position);
row.orderByCompositeKeyAscending.set(
orderByCompositeKeyCurrentPosition,
valueAsBytesSize + orderByCompositeKeyCurrentPosition,
factory.ascending
);
for (int i = 0; i < keyFactories.length; i++) {
int valueAsBytesSize = keyFactories[i].extractor.writeKey(row.keys, position);
assert valueAsBytesSize > 0 : valueAsBytesSize;
orderByCompositeKeyCurrentPosition += valueAsBytesSize;
row.bytesOrder.endOffsets[i] = orderByCompositeKeyCurrentPosition - 1;
}
}

Expand Down Expand Up @@ -189,9 +227,7 @@ public record TopNOperatorFactory(
List<SortOrder> sortOrders,
int maxPageSize
) implements OperatorFactory {
public TopNOperatorFactory

{
public TopNOperatorFactory {
for (ElementType e : elementTypes) {
if (e == null) {
throw new IllegalArgumentException("ElementType not known");
Expand Down Expand Up @@ -274,19 +310,20 @@ static int compareRows(Row r1, Row r2) {
// the two rows are equal
return 0;
}

int length = Math.min(br1.length, br2.length);
// one value is the prefix of the other
if (mismatchedByteIndex == length) {
// the value with the greater length is considered greater than the other
if (length == br1.length) {// first row is less than the second row
return r2.orderByCompositeKeyAscending.get(length) ? 1 : -1;
return r2.bytesOrder.isByteOrderAscending(length) ? 1 : -1;
} else {// second row is less than the first row
return r1.orderByCompositeKeyAscending.get(length) ? -1 : 1;
return r1.bytesOrder.isByteOrderAscending(length) ? -1 : 1;
}
} else {
// compare the byte that mismatched accounting for that respective byte asc/desc ordering
int c = Byte.compareUnsigned(br1.bytes[br1.offset + mismatchedByteIndex], br2.bytes[br2.offset + mismatchedByteIndex]);
return r1.orderByCompositeKeyAscending.get(mismatchedByteIndex) ? -c : c;
return r1.bytesOrder.isByteOrderAscending(mismatchedByteIndex) ? -c : c;
}
}

Expand All @@ -312,10 +349,9 @@ public void addInput(Page page) {
try {
for (int i = 0; i < page.getPositionCount(); i++) {
if (spare == null) {
spare = new Row(breaker);
spare = new Row(breaker, sortOrders);
} else {
spare.keys.clear();
spare.orderByCompositeKeyAscending.clear();
spare.values.clear();
}
rowFiller.row(i, spare);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,13 +434,14 @@ private TopNOperator.Row row(
Page page,
int position
) {
final var sortOrders = List.of(new TopNOperator.SortOrder(channel, asc, nullsFirst));
TopNOperator.RowFiller rf = new TopNOperator.RowFiller(
IntStream.range(0, page.getBlockCount()).mapToObj(i -> elementType).toList(),
IntStream.range(0, page.getBlockCount()).mapToObj(i -> encoder).toList(),
List.of(new TopNOperator.SortOrder(channel, asc, nullsFirst)),
sortOrders,
page
);
TopNOperator.Row row = new TopNOperator.Row(nonBreakingBigArrays().breakerService().getBreaker("request"));
TopNOperator.Row row = new TopNOperator.Row(nonBreakingBigArrays().breakerService().getBreaker("request"), sortOrders);
rf.row(position, row);
return row;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,41 @@
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.test.ESTestCase;

import java.util.List;

import static org.hamcrest.Matchers.equalTo;

public class TopNRowTests extends ESTestCase {
private final CircuitBreaker breaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST);

public void testRamBytesUsedEmpty() {
TopNOperator.Row row = new TopNOperator.Row(breaker);
TopNOperator.Row row = new TopNOperator.Row(breaker, sortOrders());
assertThat(row.ramBytesUsed(), equalTo(expectedRamBytesUsed(row)));
}

public void testRamBytesUsedSmall() {
TopNOperator.Row row = new TopNOperator.Row(new NoopCircuitBreaker(CircuitBreaker.REQUEST));
TopNOperator.Row row = new TopNOperator.Row(new NoopCircuitBreaker(CircuitBreaker.REQUEST), sortOrders());
row.keys.append(randomByte());
row.values.append(randomByte());
assertThat(row.ramBytesUsed(), equalTo(expectedRamBytesUsed(row)));
}

public void testRamBytesUsedBig() {
TopNOperator.Row row = new TopNOperator.Row(new NoopCircuitBreaker(CircuitBreaker.REQUEST));
TopNOperator.Row row = new TopNOperator.Row(new NoopCircuitBreaker(CircuitBreaker.REQUEST), sortOrders());
for (int i = 0; i < 10000; i++) {
row.keys.append(randomByte());
row.values.append(randomByte());
}
assertThat(row.ramBytesUsed(), equalTo(expectedRamBytesUsed(row)));
}

private static List<TopNOperator.SortOrder> sortOrders() {
return List.of(
new TopNOperator.SortOrder(randomNonNegativeInt(), randomBoolean(), randomBoolean()),
new TopNOperator.SortOrder(randomNonNegativeInt(), randomBoolean(), randomBoolean())
);
}

private long expectedRamBytesUsed(TopNOperator.Row row) {
long expected = RamUsageTester.ramUsed(row);
if (row.values.bytes().length == 0) {
Expand All @@ -47,6 +56,8 @@ private long expectedRamBytesUsed(TopNOperator.Row row) {
// The breaker is shared infrastructure so we don't count it but RamUsageTester does
expected -= RamUsageTester.ramUsed(breaker);
expected -= RamUsageTester.ramUsed("topn");
// the sort orders are shared
expected -= RamUsageTester.ramUsed(sortOrders());
return expected;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.apache.http.client.config.RequestConfig;
import org.apache.http.util.EntityUtils;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
Expand Down Expand Up @@ -47,7 +46,6 @@
* Tests that run ESQL queries that have, in the past, used so much memory they
* crash Elasticsearch.
*/
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/102784")
public class HeapAttackIT extends ESRestTestCase {
/**
* This used to fail, but we've since compacted top n so it actually succeeds now.
Expand Down

0 comments on commit 60b7622

Please sign in to comment.