diff --git a/docs/changelog/102831.yaml b/docs/changelog/102831.yaml new file mode 100644 index 0000000000000..fb99b0c7f732b --- /dev/null +++ b/docs/changelog/102831.yaml @@ -0,0 +1,9 @@ +pr: 102831 +summary: Fix memory tracking in TopN.Row +area: ES|QL +type: bug +issues: + - 100640 + - 102784 + - 102790 + - 102683 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java index 2ebc9c82c6d98..c3fc9fc68b60c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.BitSet; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -51,8 +50,7 @@ public class TopNOperator implements Operator, Accountable { * multivalues) to reference each position in each block of the Page. */ static final class Row implements Accountable, Releasable { - private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Row.class) + RamUsageEstimator - .shallowSizeOfInstance(BitSet.class); + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Row.class); /** * The sort key. @@ -64,7 +62,7 @@ static final class Row implements Accountable, Releasable { * For ex, if a Long is represented as 8 bytes, each of these bytes will have the same value (set/unset) if the respective Long * value is used for sorting ascending/descending. */ - final BitSet orderByCompositeKeyAscending = new BitSet(); + final BytesOrder bytesOrder; /** * Values to reconstruct the row. Sort of. When we reconstruct the row we read @@ -73,11 +71,12 @@ static final class Row implements Accountable, Releasable { */ final BreakingBytesRefBuilder values; - Row(CircuitBreaker breaker) { + Row(CircuitBreaker breaker, List sortOrders) { boolean success = false; try { keys = new BreakingBytesRefBuilder(breaker, "topn"); values = new BreakingBytesRefBuilder(breaker, "topn"); + bytesOrder = new BytesOrder(sortOrders, breaker, "topn"); success = true; } finally { if (success == false) { @@ -88,12 +87,54 @@ static final class Row implements Accountable, Releasable { @Override public long ramBytesUsed() { - return SHALLOW_SIZE + keys.ramBytesUsed() + orderByCompositeKeyAscending.size() / Byte.SIZE + values.ramBytesUsed(); + return SHALLOW_SIZE + keys.ramBytesUsed() + bytesOrder.ramBytesUsed() + values.ramBytesUsed(); } @Override public void close() { - Releasables.closeExpectNoException(keys, values); + Releasables.closeExpectNoException(keys, values, bytesOrder); + } + } + + static final class BytesOrder implements Releasable, Accountable { + private static final long BASE_RAM_USAGE = RamUsageEstimator.shallowSizeOfInstance(BytesOrder.class); + private final CircuitBreaker breaker; + final List sortOrders; + final int[] endOffsets; + + BytesOrder(List sortOrders, CircuitBreaker breaker, String label) { + this.breaker = breaker; + this.sortOrders = sortOrders; + breaker.addEstimateBytesAndMaybeBreak(memoryUsed(sortOrders.size()), label); + this.endOffsets = new int[sortOrders.size()]; + } + + /** + * Returns true if the byte at the given position is ordered ascending; otherwise, return false + */ + boolean isByteOrderAscending(int bytePosition) { + int index = Arrays.binarySearch(endOffsets, bytePosition); + if (index < 0) { + index = -1 - index; + } + return sortOrders.get(index).asc(); + } + + private long memoryUsed(int numKeys) { + // sortOrders is global and its memory is accounted at the top level TopNOperator + return BASE_RAM_USAGE + RamUsageEstimator.alignObjectSize( + (long) RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (long) Integer.BYTES * numKeys + ); + } + + @Override + public long ramBytesUsed() { + return memoryUsed(sortOrders.size()); + } + + @Override + public void close() { + breaker.addWithoutBreaking(-ramBytesUsed()); } } @@ -138,14 +179,11 @@ void row(int position, Row destination) { private void writeKey(int position, Row row) { int orderByCompositeKeyCurrentPosition = 0; - for (KeyFactory factory : keyFactories) { - int valueAsBytesSize = factory.extractor.writeKey(row.keys, position); - row.orderByCompositeKeyAscending.set( - orderByCompositeKeyCurrentPosition, - valueAsBytesSize + orderByCompositeKeyCurrentPosition, - factory.ascending - ); + for (int i = 0; i < keyFactories.length; i++) { + int valueAsBytesSize = keyFactories[i].extractor.writeKey(row.keys, position); + assert valueAsBytesSize > 0 : valueAsBytesSize; orderByCompositeKeyCurrentPosition += valueAsBytesSize; + row.bytesOrder.endOffsets[i] = orderByCompositeKeyCurrentPosition - 1; } } @@ -189,9 +227,7 @@ public record TopNOperatorFactory( List sortOrders, int maxPageSize ) implements OperatorFactory { - public TopNOperatorFactory - - { + public TopNOperatorFactory { for (ElementType e : elementTypes) { if (e == null) { throw new IllegalArgumentException("ElementType not known"); @@ -274,19 +310,20 @@ static int compareRows(Row r1, Row r2) { // the two rows are equal return 0; } + int length = Math.min(br1.length, br2.length); // one value is the prefix of the other if (mismatchedByteIndex == length) { // the value with the greater length is considered greater than the other if (length == br1.length) {// first row is less than the second row - return r2.orderByCompositeKeyAscending.get(length) ? 1 : -1; + return r2.bytesOrder.isByteOrderAscending(length) ? 1 : -1; } else {// second row is less than the first row - return r1.orderByCompositeKeyAscending.get(length) ? -1 : 1; + return r1.bytesOrder.isByteOrderAscending(length) ? -1 : 1; } } else { // compare the byte that mismatched accounting for that respective byte asc/desc ordering int c = Byte.compareUnsigned(br1.bytes[br1.offset + mismatchedByteIndex], br2.bytes[br2.offset + mismatchedByteIndex]); - return r1.orderByCompositeKeyAscending.get(mismatchedByteIndex) ? -c : c; + return r1.bytesOrder.isByteOrderAscending(mismatchedByteIndex) ? -c : c; } } @@ -312,10 +349,9 @@ public void addInput(Page page) { try { for (int i = 0; i < page.getPositionCount(); i++) { if (spare == null) { - spare = new Row(breaker); + spare = new Row(breaker, sortOrders); } else { spare.keys.clear(); - spare.orderByCompositeKeyAscending.clear(); spare.values.clear(); } rowFiller.row(i, spare); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index f43873b4fdfd9..be3e75fcce2a2 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -434,13 +434,14 @@ private TopNOperator.Row row( Page page, int position ) { + final var sortOrders = List.of(new TopNOperator.SortOrder(channel, asc, nullsFirst)); TopNOperator.RowFiller rf = new TopNOperator.RowFiller( IntStream.range(0, page.getBlockCount()).mapToObj(i -> elementType).toList(), IntStream.range(0, page.getBlockCount()).mapToObj(i -> encoder).toList(), - List.of(new TopNOperator.SortOrder(channel, asc, nullsFirst)), + sortOrders, page ); - TopNOperator.Row row = new TopNOperator.Row(nonBreakingBigArrays().breakerService().getBreaker("request")); + TopNOperator.Row row = new TopNOperator.Row(nonBreakingBigArrays().breakerService().getBreaker("request"), sortOrders); rf.row(position, row); return row; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNRowTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNRowTests.java index 472b9e50767b1..9fb3a7644ca20 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNRowTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNRowTests.java @@ -12,25 +12,27 @@ import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.test.ESTestCase; +import java.util.List; + import static org.hamcrest.Matchers.equalTo; public class TopNRowTests extends ESTestCase { private final CircuitBreaker breaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST); public void testRamBytesUsedEmpty() { - TopNOperator.Row row = new TopNOperator.Row(breaker); + TopNOperator.Row row = new TopNOperator.Row(breaker, sortOrders()); assertThat(row.ramBytesUsed(), equalTo(expectedRamBytesUsed(row))); } public void testRamBytesUsedSmall() { - TopNOperator.Row row = new TopNOperator.Row(new NoopCircuitBreaker(CircuitBreaker.REQUEST)); + TopNOperator.Row row = new TopNOperator.Row(new NoopCircuitBreaker(CircuitBreaker.REQUEST), sortOrders()); row.keys.append(randomByte()); row.values.append(randomByte()); assertThat(row.ramBytesUsed(), equalTo(expectedRamBytesUsed(row))); } public void testRamBytesUsedBig() { - TopNOperator.Row row = new TopNOperator.Row(new NoopCircuitBreaker(CircuitBreaker.REQUEST)); + TopNOperator.Row row = new TopNOperator.Row(new NoopCircuitBreaker(CircuitBreaker.REQUEST), sortOrders()); for (int i = 0; i < 10000; i++) { row.keys.append(randomByte()); row.values.append(randomByte()); @@ -38,6 +40,13 @@ public void testRamBytesUsedBig() { assertThat(row.ramBytesUsed(), equalTo(expectedRamBytesUsed(row))); } + private static List sortOrders() { + return List.of( + new TopNOperator.SortOrder(randomNonNegativeInt(), randomBoolean(), randomBoolean()), + new TopNOperator.SortOrder(randomNonNegativeInt(), randomBoolean(), randomBoolean()) + ); + } + private long expectedRamBytesUsed(TopNOperator.Row row) { long expected = RamUsageTester.ramUsed(row); if (row.values.bytes().length == 0) { @@ -47,6 +56,8 @@ private long expectedRamBytesUsed(TopNOperator.Row row) { // The breaker is shared infrastructure so we don't count it but RamUsageTester does expected -= RamUsageTester.ramUsed(breaker); expected -= RamUsageTester.ramUsed("topn"); + // the sort orders are shared + expected -= RamUsageTester.ramUsed(sortOrders()); return expected; } } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/HeapAttackIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/HeapAttackIT.java index ffe122b8de222..31d0a7646e1b7 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/HeapAttackIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/HeapAttackIT.java @@ -9,7 +9,6 @@ import org.apache.http.client.config.RequestConfig; import org.apache.http.util.EntityUtils; -import org.apache.lucene.tests.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; @@ -47,7 +46,6 @@ * Tests that run ESQL queries that have, in the past, used so much memory they * crash Elasticsearch. */ -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/102784") public class HeapAttackIT extends ESRestTestCase { /** * This used to fail, but we've since compacted top n so it actually succeeds now.