Skip to content

Commit

Permalink
ESQL: Compact topn
Browse files Browse the repository at this point in the history
This lowers topn's memory usage somewhat and makes it easier to track the
memory usage. That looks like:

```
"status" : {
  "occupied_rows" : 10000,
  "ram_bytes_used" : 255392224,
  "ram_used" : "243.5mb"
}
```

In some cases the memory usage savings is significant. In an example with
many, many keys the memory usage of each row drops from `58kb` to `25kb`.
This is a little degenerate though and I expect the savings to normally be
on the order of 10%.

The real advantage is memory tracking. It's *easy* to track used memory.
And, in a followup, it should be fairly easy to track circuit break the
used memory.

Mostly this is done by adding new abstractions and moving existing
abstractions to top level classes with tests and stuff.

* `TopNEncoder` is now a top level class. It has grown the ability to
  *decode* values as well as encode them. And it has grown "unsortable"
  versions which don't write their values such that sorting the bytes
  sorts the values. We use the "unsortable" versions when writing
  values.
* `KeyExtractor` extracts keys from the blocks and writes them to the
  row's `BytesRefBuilder`. This is basically objects replacing one of
  switch statements in `RowFactory`. They are more scattered but easier
  to test, and hopefully `TopNOperator` is more readable with this
  behavior factored out. Also! Most implementations are automatically
  generated.
* `ValueExtractor` extracts values from the blocks and writes them to
  the row's `BytesRefBuilder`. This replaces the other switch statement
  in `RowFactory` for the same reasons, except instead of writing to
  many arrays it writes to a `BytesRefBuilder` just like the key as
  compactly as it can manage.

The memory savings comes from three changes:
1. Lower overhead for storing values by encoding them rather than using
   many primitive arrays.
2. Encode the value count as a vint rather than a whole int. Usually
   there are very few rows and vint encodes that quite nicely.
3. Don't write values that are in the key for single-valued fields.
   Instead we read them from the key. That's going to be very very
   common.

This is unlikely to be faster than the old code. I haven't really tried
for speed. Just memory usage and accountability. Once we get good
accounting we can try and make this faster. I expect we'll have to
figure out the megamorphic invocations I've added. But, for now, they
help more than they hurt.
  • Loading branch information
nik9000 committed Sep 7, 2023
1 parent 43d747a commit affb498
Show file tree
Hide file tree
Showing 53 changed files with 4,307 additions and 1,180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.TopNOperator;
import org.elasticsearch.compute.operator.topn.TopNEncoder;
import org.elasticsearch.compute.operator.topn.TopNOperator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -77,8 +79,27 @@ private static Operator operator(String data, int topCount) {
case TWO_LONGS, LONGS_AND_BYTES_REFS -> 2;
default -> throw new IllegalArgumentException("unsupported data type [" + data + "]");
};
List<ElementType> elementTypes = switch (data) {
case LONGS -> List.of(ElementType.LONG);
case INTS -> List.of(ElementType.INT);
case DOUBLES -> List.of(ElementType.DOUBLE);
case BOOLEANS -> List.of(ElementType.BOOLEAN);
case BYTES_REFS -> List.of(ElementType.BYTES_REF);
case TWO_LONGS -> List.of(ElementType.INT, ElementType.INT);
case LONGS_AND_BYTES_REFS -> List.of(ElementType.INT, ElementType.BYTES_REF);
default -> throw new IllegalArgumentException("unsupported data type [" + data + "]");
};
List<TopNEncoder> encoders = switch (data) {
case LONGS, INTS, DOUBLES, BOOLEANS -> List.of(TopNEncoder.DEFAULT_SORTABLE);
case BYTES_REFS -> List.of(TopNEncoder.UTF8);
case TWO_LONGS -> List.of(TopNEncoder.DEFAULT_SORTABLE, TopNEncoder.DEFAULT_SORTABLE);
case LONGS_AND_BYTES_REFS -> List.of(TopNEncoder.DEFAULT_SORTABLE, TopNEncoder.UTF8);
default -> throw new IllegalArgumentException("unsupported data type [" + data + "]");
};
return new TopNOperator(
topCount,
elementTypes,
encoders,
IntStream.range(0, count).mapToObj(c -> new TopNOperator.SortOrder(c, false, false)).toList(),
16 * 1024
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
import org.elasticsearch.compute.lucene.ValueSourceInfo;
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
import org.elasticsearch.compute.operator.TopNOperator;
import org.elasticsearch.compute.operator.topn.TopNOperator;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
Expand Down
78 changes: 78 additions & 0 deletions x-pack/plugin/esql/compute/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,82 @@ tasks.named('stringTemplates').configure {
it.inputFile = multivalueDedupeInputFile
it.outputFile = "org/elasticsearch/compute/operator/MultivalueDedupeBytesRef.java"
}
File keyExtractorInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/operator/topn/X-KeyExtractor.java.st")
template {
it.properties = bytesRefProperties
it.inputFile = keyExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/KeyExtractorForBytesRef.java"
}
template {
it.properties = booleanProperties
it.inputFile = keyExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/KeyExtractorForBoolean.java"
}
template {
it.properties = intProperties
it.inputFile = keyExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/KeyExtractorForInt.java"
}
template {
it.properties = longProperties
it.inputFile = keyExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/KeyExtractorForLong.java"
}
template {
it.properties = doubleProperties
it.inputFile = keyExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/KeyExtractorForDouble.java"
}
File valueExtractorInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/operator/topn/X-ValueExtractor.java.st")
template {
it.properties = bytesRefProperties
it.inputFile = valueExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ValueExtractorForBytesRef.java"
}
template {
it.properties = booleanProperties
it.inputFile = valueExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ValueExtractorForBoolean.java"
}
template {
it.properties = intProperties
it.inputFile = valueExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ValueExtractorForInt.java"
}
template {
it.properties = longProperties
it.inputFile = valueExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ValueExtractorForLong.java"
}
template {
it.properties = doubleProperties
it.inputFile = valueExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ValueExtractorForDouble.java"
}
File resultBuilderInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/operator/topn/X-ResultBuilder.java.st")
template {
it.properties = bytesRefProperties
it.inputFile = resultBuilderInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ResultBuilderForBytesRef.java"
}
template {
it.properties = booleanProperties
it.inputFile = resultBuilderInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ResultBuilderForBoolean.java"
}
template {
it.properties = intProperties
it.inputFile = resultBuilderInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ResultBuilderForInt.java"
}
template {
it.properties = longProperties
it.inputFile = resultBuilderInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ResultBuilderForLong.java"
}
template {
it.properties = doubleProperties
it.inputFile = resultBuilderInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ResultBuilderForDouble.java"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.topn;

import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BooleanVector;

abstract class KeyExtractorForBoolean implements KeyExtractor {
static KeyExtractorForBoolean extractorFor(TopNEncoder encoder, boolean ascending, byte nul, byte nonNul, BooleanBlock block) {
BooleanVector v = block.asVector();
if (v != null) {
return new KeyExtractorForBoolean.ForVector(encoder, nul, nonNul, v);
}
if (ascending) {
return block.mvOrdering() == Block.MvOrdering.ASCENDING
? new KeyExtractorForBoolean.MinForAscending(encoder, nul, nonNul, block)
: new KeyExtractorForBoolean.MinForUnordered(encoder, nul, nonNul, block);
}
return block.mvOrdering() == Block.MvOrdering.ASCENDING
? new KeyExtractorForBoolean.MaxForAscending(encoder, nul, nonNul, block)
: new KeyExtractorForBoolean.MaxForUnordered(encoder, nul, nonNul, block);
}

private final byte nul;
private final byte nonNul;

KeyExtractorForBoolean(TopNEncoder encoder, byte nul, byte nonNul) {
assert encoder == TopNEncoder.DEFAULT_SORTABLE;
this.nul = nul;
this.nonNul = nonNul;
}

protected final int nonNul(BytesRefBuilder key, boolean value) {
key.append(nonNul);
TopNEncoder.DEFAULT_SORTABLE.encodeBoolean(value, key);
return Byte.BYTES + 1;
}

protected final int nul(BytesRefBuilder key) {
key.append(nul);
return 1;
}

static class ForVector extends KeyExtractorForBoolean {
private final BooleanVector vector;

ForVector(TopNEncoder encoder, byte nul, byte nonNul, BooleanVector vector) {
super(encoder, nul, nonNul);
this.vector = vector;
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
return nonNul(key, vector.getBoolean(position));
}
}

static class MinForAscending extends KeyExtractorForBoolean {
private final BooleanBlock block;

MinForAscending(TopNEncoder encoder, byte nul, byte nonNul, BooleanBlock block) {
super(encoder, nul, nonNul);
this.block = block;
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
if (block.isNull(position)) {
return nul(key);
}
return nonNul(key, block.getBoolean(block.getFirstValueIndex(position)));
}
}

static class MaxForAscending extends KeyExtractorForBoolean {
private final BooleanBlock block;

MaxForAscending(TopNEncoder encoder, byte nul, byte nonNul, BooleanBlock block) {
super(encoder, nul, nonNul);
this.block = block;
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
if (block.isNull(position)) {
return nul(key);
}
return nonNul(key, block.getBoolean(block.getFirstValueIndex(position) + block.getValueCount(position) - 1));
}
}

static class MinForUnordered extends KeyExtractorForBoolean {
private final BooleanBlock block;

MinForUnordered(TopNEncoder encoder, byte nul, byte nonNul, BooleanBlock block) {
super(encoder, nul, nonNul);
this.block = block;
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
int size = block.getValueCount(position);
if (size == 0) {
return nul(key);
}
int start = block.getFirstValueIndex(position);
int end = start + size;
for (int i = start; i < end; i++) {
if (block.getBoolean(i) == false) {
return nonNul(key, false);
}
}
return nonNul(key, true);
}
}

static class MaxForUnordered extends KeyExtractorForBoolean {
private final BooleanBlock block;

MaxForUnordered(TopNEncoder encoder, byte nul, byte nonNul, BooleanBlock block) {
super(encoder, nul, nonNul);
this.block = block;
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
int size = block.getValueCount(position);
if (size == 0) {
return nul(key);
}
int start = block.getFirstValueIndex(position);
int end = start + size;
for (int i = start; i < end; i++) {
if (block.getBoolean(i)) {
return nonNul(key, true);
}
}
return nonNul(key, false);
}
}
}
Loading

0 comments on commit affb498

Please sign in to comment.