From 13103ef2382aab73e599f94d3a9ac431f3467d42 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 9 Aug 2023 12:32:37 -0400 Subject: [PATCH 1/2] ES|QL deserves a new hash table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We've been using `LongHash` and `LongLongHash` which are open addressed, linear probing hash tables that grow in place. They have served us well, but we need to add features to them to support all of ES|QL. It turns out that there've been a lot of advances in the hash space in the ten years since we wrote these hash tables! And they weren't the most "advanced" thing back then. This PR creates a new hash table implementation the borrows significantly from google's Swiss Tables. It's 25% to 49% faster in microbenchmarks: ``` unique longHash ordinator64 5 7.470 ± 0.033 -> 4.158 ± 0.037 ns/op 45% faster 1000 9.657 ± 0.375 -> 4.907 ± 0.036 ns/op 49% faster 10000 15.505 ± 0.051 -> 11.609 ± 0.062 ns/op 25% faster 100000 20.948 ± 0.112 -> 13.413 ± 0.764 ns/op 35% fsater 1000000 48.507 ± 0.586 -> 36.306 ± 0.296 ns/op 25% faster ``` This also integrates the new table into ES|QL's grouping functions, though imperfectly at the moment. --- .../compute/operator/AggregatorBenchmark.java | 60 +- .../compute/operator/HashBenchmark.java | 163 +++++ .../src/main/groovy/elasticsearch.ide.gradle | 4 +- .../internal/ElasticsearchJavaBasePlugin.java | 2 +- .../internal/ElasticsearchTestBasePlugin.java | 4 +- x-pack/plugin/esql/compute/build.gradle | 20 + .../blockhash/DoubleBlockHash.java | 49 +- .../aggregation/blockhash/IntBlockHash.java | 52 +- .../aggregation/blockhash/LongBlockHash.java | 47 +- .../operator/MultivalueDedupeBytesRef.java | 15 +- .../operator/MultivalueDedupeDouble.java | 24 +- .../compute/operator/MultivalueDedupeInt.java | 24 +- .../operator/MultivalueDedupeLong.java | 24 +- .../compute/src/main/java/module-info.java | 1 + .../aggregation/blockhash/BlockHash.java | 70 +- .../aggregation/blockhash/Ordinator.java | 293 +++++++++ .../aggregation/blockhash/Ordinator64.java | 622 ++++++++++++++++++ .../aggregation/blockhash/X-BlockHash.java.st | 156 +++++ .../operator/HashAggregationOperator.java | 5 +- .../operator/OrdinalsGroupingOperator.java | 21 +- .../operator/X-MultivalueDedupe.java.st | 38 +- .../elasticsearch/compute/OperatorTests.java | 18 +- .../GroupingAggregatorFunctionTestCase.java | 7 +- .../blockhash/BlockHashRandomizedTests.java | 17 +- .../aggregation/blockhash/BlockHashTests.java | 28 +- .../blockhash/Ordinator64Tests.java | 396 +++++++++++ .../HashAggregationOperatorTests.java | 10 +- .../operator/MultivalueDedupeTests.java | 111 ++-- .../AbstractPhysicalOperationProviders.java | 2 +- .../planner/EsPhysicalOperationProviders.java | 1 + .../esql/planner/LocalExecutionPlanner.java | 25 +- .../xpack/esql/plugin/ComputeService.java | 12 +- .../esql/plugin/TransportEsqlQueryAction.java | 10 +- .../elasticsearch/xpack/esql/CsvTests.java | 11 +- .../planner/LocalExecutionPlannerTests.java | 8 + .../TestPhysicalOperationProviders.java | 13 +- 36 files changed, 2094 insertions(+), 269 deletions(-) create mode 100644 benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/HashBenchmark.java rename x-pack/plugin/esql/compute/src/main/{java => generated-src}/org/elasticsearch/compute/aggregation/blockhash/DoubleBlockHash.java (67%) rename x-pack/plugin/esql/compute/src/main/{java => generated-src}/org/elasticsearch/compute/aggregation/blockhash/IntBlockHash.java (65%) rename x-pack/plugin/esql/compute/src/main/{java => generated-src}/org/elasticsearch/compute/aggregation/blockhash/LongBlockHash.java (70%) create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator.java create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator64.java create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/X-BlockHash.java.st create mode 100644 x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator64Tests.java diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java index aa16523e38097..048b0bd6a0521 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java @@ -9,7 +9,11 @@ package org.elasticsearch.benchmark.compute.operator; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.compute.aggregation.CountAggregatorFunction; @@ -36,6 +40,8 @@ import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.HashAggregationOperator; import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -58,14 +64,12 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) @State(Scope.Thread) -@Fork(1) +@Fork(value = 1, jvmArgsAppend = { "--enable-preview", "--add-modules", "jdk.incubator.vector" }) public class AggregatorBenchmark { static final int BLOCK_LENGTH = 8 * 1024; private static final int OP_COUNT = 1024; private static final int GROUPS = 5; - private static final BigArrays BIG_ARRAYS = BigArrays.NON_RECYCLING_INSTANCE; // TODO real big arrays? - private static final String LONGS = "longs"; private static final String INTS = "ints"; private static final String DOUBLES = "doubles"; @@ -96,7 +100,7 @@ public class AggregatorBenchmark { for (String grouping : AggregatorBenchmark.class.getField("grouping").getAnnotationsByType(Param.class)[0].value()) { for (String op : AggregatorBenchmark.class.getField("op").getAnnotationsByType(Param.class)[0].value()) { for (String blockType : AggregatorBenchmark.class.getField("blockType").getAnnotationsByType(Param.class)[0].value()) { - run(grouping, op, blockType, 50); + new AggregatorBenchmark().run(grouping, op, blockType, 50); } } } @@ -105,6 +109,14 @@ public class AggregatorBenchmark { } } + private final PageCacheRecycler recycler = new PageCacheRecycler(Settings.EMPTY); + private final CircuitBreakerService breakerService = new HierarchyCircuitBreakerService( + Settings.EMPTY, + List.of(), + ClusterSettings.createBuiltInClusterSettings() + ); + private final BigArrays bigArrays = new BigArrays(recycler, breakerService, CircuitBreaker.REQUEST); + @Param({ NONE, LONGS, INTS, DOUBLES, BOOLEANS, BYTES_REFS, TWO_LONGS, LONGS_AND_BYTES_REFS, TWO_LONGS_AND_BYTES_REFS }) public String grouping; @@ -114,7 +126,7 @@ public class AggregatorBenchmark { @Param({ VECTOR_LONGS, HALF_NULL_LONGS, VECTOR_DOUBLES, HALF_NULL_DOUBLES }) public String blockType; - private static Operator operator(String grouping, String op, String dataType) { + private Operator operator(String grouping, String op, String dataType) { if (grouping.equals("none")) { return new AggregationOperator(List.of(supplier(op, dataType, 0).aggregatorFactory(AggregatorMode.SINGLE).get())); } @@ -139,34 +151,35 @@ private static Operator operator(String grouping, String op, String dataType) { ); default -> throw new IllegalArgumentException("unsupported grouping [" + grouping + "]"); }; + BlockHash.Factory factory = new BlockHash.Factory(bigArrays, recycler, () -> breakerService.getBreaker(CircuitBreaker.REQUEST)); return new HashAggregationOperator( List.of(supplier(op, dataType, groups.size()).groupingAggregatorFactory(AggregatorMode.SINGLE)), - () -> BlockHash.build(groups, BIG_ARRAYS, 16 * 1024), + () -> factory.build(groups, 16 * 1024), new DriverContext() ); } - private static AggregatorFunctionSupplier supplier(String op, String dataType, int dataChannel) { + private AggregatorFunctionSupplier supplier(String op, String dataType, int dataChannel) { return switch (op) { - case COUNT -> CountAggregatorFunction.supplier(BIG_ARRAYS, List.of(dataChannel)); + case COUNT -> CountAggregatorFunction.supplier(bigArrays, List.of(dataChannel)); case COUNT_DISTINCT -> switch (dataType) { - case LONGS -> new CountDistinctLongAggregatorFunctionSupplier(BIG_ARRAYS, List.of(dataChannel), 3000); - case DOUBLES -> new CountDistinctDoubleAggregatorFunctionSupplier(BIG_ARRAYS, List.of(dataChannel), 3000); + case LONGS -> new CountDistinctLongAggregatorFunctionSupplier(bigArrays, List.of(dataChannel), 3000); + case DOUBLES -> new CountDistinctDoubleAggregatorFunctionSupplier(bigArrays, List.of(dataChannel), 3000); default -> throw new IllegalArgumentException("unsupported data type [" + dataType + "]"); }; case MAX -> switch (dataType) { - case LONGS -> new MaxLongAggregatorFunctionSupplier(BIG_ARRAYS, List.of(dataChannel)); - case DOUBLES -> new MaxDoubleAggregatorFunctionSupplier(BIG_ARRAYS, List.of(dataChannel)); + case LONGS -> new MaxLongAggregatorFunctionSupplier(bigArrays, List.of(dataChannel)); + case DOUBLES -> new MaxDoubleAggregatorFunctionSupplier(bigArrays, List.of(dataChannel)); default -> throw new IllegalArgumentException("unsupported data type [" + dataType + "]"); }; case MIN -> switch (dataType) { - case LONGS -> new MinLongAggregatorFunctionSupplier(BIG_ARRAYS, List.of(dataChannel)); - case DOUBLES -> new MinDoubleAggregatorFunctionSupplier(BIG_ARRAYS, List.of(dataChannel)); + case LONGS -> new MinLongAggregatorFunctionSupplier(bigArrays, List.of(dataChannel)); + case DOUBLES -> new MinDoubleAggregatorFunctionSupplier(bigArrays, List.of(dataChannel)); default -> throw new IllegalArgumentException("unsupported data type [" + dataType + "]"); }; case SUM -> switch (dataType) { - case LONGS -> new SumLongAggregatorFunctionSupplier(BIG_ARRAYS, List.of(dataChannel)); - case DOUBLES -> new SumDoubleAggregatorFunctionSupplier(BIG_ARRAYS, List.of(dataChannel)); + case LONGS -> new SumLongAggregatorFunctionSupplier(bigArrays, List.of(dataChannel)); + case DOUBLES -> new SumDoubleAggregatorFunctionSupplier(bigArrays, List.of(dataChannel)); default -> throw new IllegalArgumentException("unsupported data type [" + dataType + "]"); }; default -> throw new IllegalArgumentException("unsupported op [" + op + "]"); @@ -561,19 +574,20 @@ public void run() { run(grouping, op, blockType, OP_COUNT); } - private static void run(String grouping, String op, String blockType, int opCount) { + private void run(String grouping, String op, String blockType, int opCount) { String dataType = switch (blockType) { case VECTOR_LONGS, HALF_NULL_LONGS -> LONGS; case VECTOR_DOUBLES, HALF_NULL_DOUBLES -> DOUBLES; default -> throw new IllegalArgumentException(); }; - Operator operator = operator(grouping, op, dataType); - Page page = page(grouping, blockType); - for (int i = 0; i < opCount; i++) { - operator.addInput(page); + try (Operator operator = operator(grouping, op, dataType)) { + Page page = page(grouping, blockType); + for (int i = 0; i < opCount; i++) { + operator.addInput(page); + } + operator.finish(); + checkExpected(grouping, op, blockType, dataType, operator.getOutput(), opCount); } - operator.finish(); - checkExpected(grouping, op, blockType, dataType, operator.getOutput(), opCount); } } diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/HashBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/HashBenchmark.java new file mode 100644 index 0000000000000..8cfc895a0a503 --- /dev/null +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/HashBenchmark.java @@ -0,0 +1,163 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.benchmark.compute.operator; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BytesRefHash; +import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.common.util.LongLongHash; +import org.elasticsearch.common.util.LongObjectPagedHashMap; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.compute.aggregation.blockhash.Ordinator64; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +@Warmup(iterations = 5) +@Measurement(iterations = 7) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Thread) +@Fork(value = 1, jvmArgsAppend = { "--enable-preview", "--add-modules", "jdk.incubator.vector" }) +public class HashBenchmark { + static { + // Smoke test all the expected values and force loading subclasses more like prod + try { + for (String unique : HashBenchmark.class.getField("unique").getAnnotationsByType(Param.class)[0].value()) { + HashBenchmark bench = new HashBenchmark(); + bench.unique = Integer.parseInt(unique); + bench.initTestData(); + bench.longHash(); + bench.bytesRefHash(); + bench.longLongHash(); + bench.longObjectHash(); + bench.ordinator(); + bench.ordinatorArray(); + } + } catch (NoSuchFieldException e) { + throw new AssertionError(); + } + } + + private static final int ITERATIONS = 10_000_000; + + @Param({ "5", "1000", "10000", "100000", "1000000" }) + public int unique; + + private long[] testLongs; + private BytesRef[] testBytes; + private int[] targetInts; + private long[] targetLongs; + private Object[] targetObject; + + @Setup + public void initTestData() { + testLongs = LongStream.range(0, ITERATIONS).map(l -> l % unique).toArray(); + BytesRef[] uniqueBytes = IntStream.range(0, unique).mapToObj(i -> new BytesRef(Integer.toString(i))).toArray(BytesRef[]::new); + testBytes = IntStream.range(0, ITERATIONS).mapToObj(i -> uniqueBytes[i % unique]).toArray(BytesRef[]::new); + targetInts = new int[ITERATIONS]; + targetLongs = new long[ITERATIONS]; + targetObject = new Object[ITERATIONS]; + } + + @Benchmark + @OperationsPerInvocation(ITERATIONS) + public void longHash() { + LongHash hash = new LongHash(16, BigArrays.NON_RECYCLING_INSTANCE); + for (int i = 0; i < testLongs.length; i++) { + targetLongs[i] = hash.add(testLongs[i]); + } + if (hash.size() != unique) { + throw new AssertionError(); + } + } + + @Benchmark + @OperationsPerInvocation(ITERATIONS) + public void bytesRefHash() { + BytesRefHash hash = new BytesRefHash(16, BigArrays.NON_RECYCLING_INSTANCE); + for (int i = 0; i < testLongs.length; i++) { + targetLongs[i] = hash.add(testBytes[i]); + } + if (hash.size() != unique) { + throw new AssertionError(); + } + } + + @Benchmark + @OperationsPerInvocation(ITERATIONS) + public void longLongHash() { + LongLongHash hash = new LongLongHash(16, BigArrays.NON_RECYCLING_INSTANCE); + for (int i = 0; i < testLongs.length; i++) { + targetLongs[i] = hash.add(testLongs[i], testLongs[i]); + } + if (hash.size() != unique) { + throw new AssertionError(); + } + } + + @Benchmark + @OperationsPerInvocation(ITERATIONS) + public void longObjectHash() { + LongObjectPagedHashMap hash = new LongObjectPagedHashMap<>(16, BigArrays.NON_RECYCLING_INSTANCE); + Object o = new Object(); + for (int i = 0; i < testLongs.length; i++) { + targetObject[i] = hash.put(testLongs[i], o); + } + if (hash.size() != unique) { + throw new AssertionError(); + } + } + + @Benchmark + @OperationsPerInvocation(ITERATIONS) + public void ordinator() { + Ordinator64 hash = new Ordinator64( + new PageCacheRecycler(Settings.EMPTY), + new NoopCircuitBreaker("bench"), + new Ordinator64.IdSpace() + ); + for (int i = 0; i < testLongs.length; i++) { + targetInts[i] = hash.add(testLongs[i]); + } + if (hash.currentSize() != unique) { + throw new AssertionError("expected " + hash.currentSize() + " to be " + unique); + } + } + + @Benchmark + @OperationsPerInvocation(ITERATIONS) + public void ordinatorArray() { + Ordinator64 hash = new Ordinator64( + new PageCacheRecycler(Settings.EMPTY), + new NoopCircuitBreaker("bench"), + new Ordinator64.IdSpace() + ); + hash.add(testLongs, targetInts, testLongs.length); + if (hash.currentSize() != unique) { + throw new AssertionError(); + } + } +} diff --git a/build-tools-internal/src/main/groovy/elasticsearch.ide.gradle b/build-tools-internal/src/main/groovy/elasticsearch.ide.gradle index 683a2d5604055..d38ad278a6afd 100644 --- a/build-tools-internal/src/main/groovy/elasticsearch.ide.gradle +++ b/build-tools-internal/src/main/groovy/elasticsearch.ide.gradle @@ -161,7 +161,9 @@ if (providers.systemProperty('idea.active').getOrNull() == 'true') { '--add-opens=java.base/java.nio.file=ALL-UNNAMED', '--add-opens=java.base/java.time=ALL-UNNAMED', '--add-opens=java.base/java.lang=ALL-UNNAMED', - '--add-opens=java.management/java.lang.management=ALL-UNNAMED' + '--add-opens=java.management/java.lang.management=ALL-UNNAMED', + '--enable-preview', + '--add-modules=jdk.incubator.vector' ].join(' ') } } diff --git a/build-tools-internal/src/main/java/org/elasticsearch/gradle/internal/ElasticsearchJavaBasePlugin.java b/build-tools-internal/src/main/java/org/elasticsearch/gradle/internal/ElasticsearchJavaBasePlugin.java index 7a5bead71fb0e..478241c7addf6 100644 --- a/build-tools-internal/src/main/java/org/elasticsearch/gradle/internal/ElasticsearchJavaBasePlugin.java +++ b/build-tools-internal/src/main/java/org/elasticsearch/gradle/internal/ElasticsearchJavaBasePlugin.java @@ -123,7 +123,7 @@ public static void configureCompile(Project project) { // fail on all javac warnings. // TODO Discuss moving compileOptions.getCompilerArgs() to use provider api with Gradle team. List compilerArgs = compileOptions.getCompilerArgs(); - compilerArgs.add("-Werror"); + // compilerArgs.add("-Werror"); NOCOMMIT add me back once we figure out how to not fail compiling with preview features compilerArgs.add("-Xlint:all,-path,-serial,-options,-deprecation,-try,-removal"); compilerArgs.add("-Xdoclint:all"); compilerArgs.add("-Xdoclint:-missing"); diff --git a/build-tools-internal/src/main/java/org/elasticsearch/gradle/internal/ElasticsearchTestBasePlugin.java b/build-tools-internal/src/main/java/org/elasticsearch/gradle/internal/ElasticsearchTestBasePlugin.java index 50f4000612981..b3033067efd0d 100644 --- a/build-tools-internal/src/main/java/org/elasticsearch/gradle/internal/ElasticsearchTestBasePlugin.java +++ b/build-tools-internal/src/main/java/org/elasticsearch/gradle/internal/ElasticsearchTestBasePlugin.java @@ -108,7 +108,9 @@ public void execute(Task t) { "--add-opens=java.base/java.nio.file=ALL-UNNAMED", "--add-opens=java.base/java.time=ALL-UNNAMED", "--add-opens=java.management/java.lang.management=ALL-UNNAMED", - "-XX:+HeapDumpOnOutOfMemoryError" + "-XX:+HeapDumpOnOutOfMemoryError", + "--enable-preview", + "--add-modules=jdk.incubator.vector" ); test.getJvmArgumentProviders().add(new SimpleCommandLineArgumentProvider("-XX:HeapDumpPath=" + heapdumpDir)); diff --git a/x-pack/plugin/esql/compute/build.gradle b/x-pack/plugin/esql/compute/build.gradle index d6a27b4122edb..bf23cfeb76418 100644 --- a/x-pack/plugin/esql/compute/build.gradle +++ b/x-pack/plugin/esql/compute/build.gradle @@ -14,6 +14,10 @@ tasks.named("compileJava").configure { options.compilerArgs.addAll(["-s", "${projectDir}/src/main/generated"]) } +tasks.named('forbiddenApisMain').configure { + failOnMissingClasses = false // Ignore the vector apis +} + tasks.named('checkstyleMain').configure { source = "src/main/java" excludes = [ "**/*.java.st" ] @@ -396,4 +400,20 @@ tasks.named('stringTemplates').configure { it.inputFile = multivalueDedupeInputFile it.outputFile = "org/elasticsearch/compute/operator/MultivalueDedupeBytesRef.java" } + File blockHashInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/aggregation/blockhash/X-BlockHash.java.st") + template { + it.properties = intProperties + it.inputFile = blockHashInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/blockhash/IntBlockHash.java" + } + template { + it.properties = longProperties + it.inputFile = blockHashInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/blockhash/LongBlockHash.java" + } + template { + it.properties = doubleProperties + it.inputFile = blockHashInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/blockhash/DoubleBlockHash.java" + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/DoubleBlockHash.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/blockhash/DoubleBlockHash.java similarity index 67% rename from x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/DoubleBlockHash.java rename to x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/blockhash/DoubleBlockHash.java index 79c748e7901a5..c2346ac607ab5 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/DoubleBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/blockhash/DoubleBlockHash.java @@ -7,9 +7,10 @@ package org.elasticsearch.compute.aggregation.blockhash; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; -import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; import org.elasticsearch.compute.aggregation.SeenGroupIds; import org.elasticsearch.compute.data.Block; @@ -28,11 +29,12 @@ import java.util.BitSet; /** - * Maps a {@link DoubleBlock} column to group ids. + * Maps {@link DoubleBlock} to group ids. + * This class is generated. Edit {@code X-BlockHash.java.st} instead. */ final class DoubleBlockHash extends BlockHash { private final int channel; - private final LongHash longHash; + private final Ordinator64 ordinator; /** * Have we seen any {@code null} values? @@ -43,9 +45,11 @@ final class DoubleBlockHash extends BlockHash { */ private boolean seenNull; - DoubleBlockHash(int channel, BigArrays bigArrays) { + DoubleBlockHash(PageCacheRecycler recycler, CircuitBreaker breaker, int channel) { this.channel = channel; - this.longHash = new LongHash(1, bigArrays); + Ordinator64.IdSpace idSpace = new Ordinator64.IdSpace(); + idSpace.next(); // Reserve 0 for nulls. + this.ordinator = new Ordinator64(recycler, breaker, idSpace); } @Override @@ -61,58 +65,61 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) { private LongVector add(DoubleVector vector) { long[] groups = new long[vector.getPositionCount()]; + // TODO use the array flavored add for (int i = 0; i < vector.getPositionCount(); i++) { - groups[i] = hashOrdToGroupNullReserved(longHash.add(Double.doubleToLongBits(vector.getDouble(i)))); + groups[i] = ordinator.add(Double.doubleToLongBits(vector.getDouble(i))); } return new LongArrayVector(groups, groups.length); } private LongBlock add(DoubleBlock block) { - MultivalueDedupe.HashResult result = new MultivalueDedupeDouble(block).hash(longHash); + MultivalueDedupe.HashResult result = new MultivalueDedupeDouble(block).hash(ordinator); seenNull |= result.sawNull(); return result.ords(); } @Override public DoubleBlock[] getKeys() { + // TODO call something like takeKeyOwnership to claim the keys array directly + + // If we've seen null we'll store it in 0 if (seenNull) { - final int size = Math.toIntExact(longHash.size() + 1); - final double[] keys = new double[size]; - for (int i = 1; i < size; i++) { - keys[i] = Double.longBitsToDouble(longHash.get(i - 1)); + double[] keys = new double[ordinator.currentSize() + 1]; + for (Ordinator64.Itr itr = ordinator.iterator(); itr.next();) { + keys[itr.id()] = Double.longBitsToDouble(itr.key()); } BitSet nulls = new BitSet(1); nulls.set(0); return new DoubleBlock[] { new DoubleArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.ASCENDING) }; } - - final int size = Math.toIntExact(longHash.size()); - final double[] keys = new double[size]; - for (int i = 0; i < size; i++) { - keys[i] = Double.longBitsToDouble(longHash.get(i)); + double[] keys = new double[ordinator.currentSize() + (seenNull ? 1 : 0)]; + for (Ordinator64.Itr itr = ordinator.iterator(); itr.next();) { + // We reserved the id 0 for null but didn't see it. + keys[itr.id() - 1] = Double.longBitsToDouble(itr.key()); } - // TODO claim the array and wrap? return new DoubleBlock[] { new DoubleArrayVector(keys, keys.length).asBlock() }; } @Override public IntVector nonEmpty() { - return IntVector.range(seenNull ? 0 : 1, Math.toIntExact(longHash.size() + 1)); + return IntVector.range(seenNull ? 0 : 1, Math.toIntExact(ordinator.currentSize() + 1)); } @Override public BitArray seenGroupIds(BigArrays bigArrays) { - return new SeenGroupIds.Range(seenNull ? 0 : 1, Math.toIntExact(longHash.size() + 1)).seenGroupIds(bigArrays); + return new SeenGroupIds.Range(seenNull ? 0 : 1, Math.toIntExact(ordinator.currentSize() + 1)).seenGroupIds(bigArrays); } @Override public void close() { - longHash.close(); + ordinator.close(); } @Override public String toString() { - return "DoubleBlockHash{channel=" + channel + ", entries=" + longHash.size() + ", seenNull=" + seenNull + '}'; + return "DoubleBlockHash{channel=" + channel + ", entries=" + ordinator.currentSize() + ", seenNull=" + seenNull + '}'; } + + // TODO plumb ordinator.status } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/IntBlockHash.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/blockhash/IntBlockHash.java similarity index 65% rename from x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/IntBlockHash.java rename to x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/blockhash/IntBlockHash.java index b4e991cebbe47..9b0a74034ae59 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/IntBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/blockhash/IntBlockHash.java @@ -7,9 +7,10 @@ package org.elasticsearch.compute.aggregation.blockhash; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; -import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; import org.elasticsearch.compute.aggregation.SeenGroupIds; import org.elasticsearch.compute.data.Block; @@ -27,11 +28,13 @@ import java.util.BitSet; /** - * Maps a {@link IntBlock} column to group ids. + * Maps {@link IntBlock} to group ids. + * This class is generated. Edit {@code X-BlockHash.java.st} instead. */ final class IntBlockHash extends BlockHash { private final int channel; - private final LongHash longHash; + private final Ordinator64 ordinator; + /** * Have we seen any {@code null} values? *

@@ -41,9 +44,12 @@ final class IntBlockHash extends BlockHash { */ private boolean seenNull; - IntBlockHash(int channel, BigArrays bigArrays) { + IntBlockHash(PageCacheRecycler recycler, CircuitBreaker breaker, int channel) { this.channel = channel; - this.longHash = new LongHash(1, bigArrays); + // TODO build and use Ordinator32 + Ordinator64.IdSpace idSpace = new Ordinator64.IdSpace(); + idSpace.next(); // Reserve 0 for nulls. + this.ordinator = new Ordinator64(recycler, breaker, idSpace); } @Override @@ -59,55 +65,63 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) { private LongVector add(IntVector vector) { long[] groups = new long[vector.getPositionCount()]; + // TODO use the array flavored add for (int i = 0; i < vector.getPositionCount(); i++) { - groups[i] = hashOrdToGroupNullReserved(longHash.add(vector.getInt(i))); + groups[i] = ordinator.add(vector.getInt(i)); } return new LongArrayVector(groups, groups.length); } private LongBlock add(IntBlock block) { - MultivalueDedupe.HashResult result = new MultivalueDedupeInt(block).hash(longHash); + MultivalueDedupe.HashResult result = new MultivalueDedupeInt(block).hash(ordinator); seenNull |= result.sawNull(); return result.ords(); } @Override public IntBlock[] getKeys() { + // TODO call something like takeKeyOwnership to claim the keys array directly + + // If we've seen null we'll store it in 0 if (seenNull) { - final int size = Math.toIntExact(longHash.size() + 1); - final int[] keys = new int[size]; - for (int i = 1; i < size; i++) { - keys[i] = (int) longHash.get(i - 1); + int[] keys = new int[ordinator.currentSize() + 1]; + for (Ordinator64.Itr itr = ordinator.iterator(); itr.next();) { + // TODO build and use Ordinator32 and drop the cast + keys[itr.id()] = Math.toIntExact(itr.key()); } BitSet nulls = new BitSet(1); nulls.set(0); return new IntBlock[] { new IntArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.ASCENDING) }; } - final int size = Math.toIntExact(longHash.size()); - final int[] keys = new int[size]; - for (int i = 0; i < size; i++) { - keys[i] = (int) longHash.get(i); + int[] keys = new int[ordinator.currentSize() + (seenNull ? 1 : 0)]; + for (Ordinator64.Itr itr = ordinator.iterator(); itr.next();) { + // We reserved the id 0 for null but didn't see it. + // TODO build and use Ordinator32 and drop the cast + keys[itr.id() - 1] = Math.toIntExact(itr.key()); } + return new IntBlock[] { new IntArrayVector(keys, keys.length).asBlock() }; } @Override public IntVector nonEmpty() { - return IntVector.range(seenNull ? 0 : 1, Math.toIntExact(longHash.size() + 1)); + return IntVector.range(seenNull ? 0 : 1, Math.toIntExact(ordinator.currentSize() + 1)); } @Override public BitArray seenGroupIds(BigArrays bigArrays) { - return new SeenGroupIds.Range(seenNull ? 0 : 1, Math.toIntExact(longHash.size() + 1)).seenGroupIds(bigArrays); + return new SeenGroupIds.Range(seenNull ? 0 : 1, Math.toIntExact(ordinator.currentSize() + 1)).seenGroupIds(bigArrays); } @Override public void close() { - longHash.close(); + ordinator.close(); } @Override public String toString() { - return "IntBlockHash{channel=" + channel + ", entries=" + longHash.size() + ", seenNull=" + seenNull + '}'; + return "IntBlockHash{channel=" + channel + ", entries=" + ordinator.currentSize() + ", seenNull=" + seenNull + '}'; } + + // TODO plumb ordinator.status } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/LongBlockHash.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/blockhash/LongBlockHash.java similarity index 70% rename from x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/LongBlockHash.java rename to x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/blockhash/LongBlockHash.java index d5e57171e9c71..1055ed33126d8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/LongBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/blockhash/LongBlockHash.java @@ -7,9 +7,10 @@ package org.elasticsearch.compute.aggregation.blockhash; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; -import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; import org.elasticsearch.compute.aggregation.SeenGroupIds; import org.elasticsearch.compute.data.Block; @@ -26,10 +27,11 @@ /** * Maps {@link LongBlock} to group ids. + * This class is generated. Edit {@code X-BlockHash.java.st} instead. */ final class LongBlockHash extends BlockHash { private final int channel; - private final LongHash longHash; + private final Ordinator64 ordinator; /** * Have we seen any {@code null} values? @@ -40,9 +42,11 @@ final class LongBlockHash extends BlockHash { */ private boolean seenNull; - LongBlockHash(int channel, BigArrays bigArrays) { + LongBlockHash(PageCacheRecycler recycler, CircuitBreaker breaker, int channel) { this.channel = channel; - this.longHash = new LongHash(1, bigArrays); + Ordinator64.IdSpace idSpace = new Ordinator64.IdSpace(); + idSpace.next(); // Reserve 0 for nulls. + this.ordinator = new Ordinator64(recycler, breaker, idSpace); } @Override @@ -58,58 +62,61 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) { private LongVector add(LongVector vector) { long[] groups = new long[vector.getPositionCount()]; + // TODO use the array flavored add for (int i = 0; i < vector.getPositionCount(); i++) { - groups[i] = hashOrdToGroupNullReserved(longHash.add(vector.getLong(i))); + groups[i] = ordinator.add(vector.getLong(i)); } return new LongArrayVector(groups, groups.length); } private LongBlock add(LongBlock block) { - MultivalueDedupe.HashResult result = new MultivalueDedupeLong(block).hash(longHash); + MultivalueDedupe.HashResult result = new MultivalueDedupeLong(block).hash(ordinator); seenNull |= result.sawNull(); return result.ords(); } @Override public LongBlock[] getKeys() { + // TODO call something like takeKeyOwnership to claim the keys array directly + + // If we've seen null we'll store it in 0 if (seenNull) { - final int size = Math.toIntExact(longHash.size() + 1); - final long[] keys = new long[size]; - for (int i = 1; i < size; i++) { - keys[i] = longHash.get(i - 1); + long[] keys = new long[ordinator.currentSize() + 1]; + for (Ordinator64.Itr itr = ordinator.iterator(); itr.next();) { + keys[itr.id()] = itr.key(); } BitSet nulls = new BitSet(1); nulls.set(0); return new LongBlock[] { new LongArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.ASCENDING) }; } - - final int size = Math.toIntExact(longHash.size()); - final long[] keys = new long[size]; - for (int i = 0; i < size; i++) { - keys[i] = longHash.get(i); + long[] keys = new long[ordinator.currentSize() + (seenNull ? 1 : 0)]; + for (Ordinator64.Itr itr = ordinator.iterator(); itr.next();) { + // We reserved the id 0 for null but didn't see it. + keys[itr.id() - 1] = itr.key(); } - // TODO call something like takeKeyOwnership to claim the keys array directly return new LongBlock[] { new LongArrayVector(keys, keys.length).asBlock() }; } @Override public IntVector nonEmpty() { - return IntVector.range(seenNull ? 0 : 1, Math.toIntExact(longHash.size() + 1)); + return IntVector.range(seenNull ? 0 : 1, Math.toIntExact(ordinator.currentSize() + 1)); } @Override public BitArray seenGroupIds(BigArrays bigArrays) { - return new SeenGroupIds.Range(seenNull ? 0 : 1, Math.toIntExact(longHash.size() + 1)).seenGroupIds(bigArrays); + return new SeenGroupIds.Range(seenNull ? 0 : 1, Math.toIntExact(ordinator.currentSize() + 1)).seenGroupIds(bigArrays); } @Override public void close() { - longHash.close(); + ordinator.close(); } @Override public String toString() { - return "LongBlockHash{channel=" + channel + ", entries=" + longHash.size() + ", seenNull=" + seenNull + '}'; + return "LongBlockHash{channel=" + channel + ", entries=" + ordinator.currentSize() + ", seenNull=" + seenNull + '}'; } + + // TODO plumb ordinator.status } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeBytesRef.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeBytesRef.java index 13a0849504a4a..cb8c44c4690e1 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeBytesRef.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeBytesRef.java @@ -312,14 +312,15 @@ private void writeSortedWork(BytesRefBlock.Builder builder) { /** * Writes an already deduplicated {@link #work} to a hash. */ - private void hashUniquedWork(BytesRefHash hash, LongBlock.Builder builder) { + private void hashUniquedWork(BytesRefHash ordinator, LongBlock.Builder builder) { if (w == 1) { - hash(builder, hash, work[0]); + hash(builder, ordinator, work[0]); return; } builder.beginPositionEntry(); + // TODO use array flavored add for (int i = 0; i < w; i++) { - hash(builder, hash, work[i]); + hash(builder, ordinator, work[i]); } builder.endPositionEntry(); } @@ -327,18 +328,18 @@ private void hashUniquedWork(BytesRefHash hash, LongBlock.Builder builder) { /** * Writes a sorted {@link #work} to a hash, skipping duplicates. */ - private void hashSortedWork(BytesRefHash hash, LongBlock.Builder builder) { + private void hashSortedWork(BytesRefHash ordinator, LongBlock.Builder builder) { if (w == 1) { - hash(builder, hash, work[0]); + hash(builder, ordinator, work[0]); return; } builder.beginPositionEntry(); BytesRef prev = work[0]; - hash(builder, hash, prev); + hash(builder, ordinator, prev); for (int i = 1; i < w; i++) { if (false == prev.equals(work[i])) { prev = work[i]; - hash(builder, hash, prev); + hash(builder, ordinator, prev); } } builder.endPositionEntry(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeDouble.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeDouble.java index 1f451c2cdac11..0156614219008 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeDouble.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeDouble.java @@ -8,9 +8,8 @@ package org.elasticsearch.compute.operator; import org.apache.lucene.util.ArrayUtil; -import org.elasticsearch.common.util.LongHash; import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; -import org.elasticsearch.compute.aggregation.blockhash.BlockHash; +import org.elasticsearch.compute.aggregation.blockhash.Ordinator64; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.LongBlock; @@ -140,7 +139,7 @@ public DoubleBlock dedupeToBlockUsingCopyMissing() { * Dedupe values and build a {@link LongBlock} suitable for passing * as the grouping block to a {@link GroupingAggregatorFunction}. */ - public MultivalueDedupe.HashResult hash(LongHash hash) { + public MultivalueDedupe.HashResult hash(Ordinator64 hash) { LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getPositionCount()); boolean sawNull = false; for (int p = 0; p < block.getPositionCount(); p++) { @@ -301,14 +300,15 @@ private void writeSortedWork(DoubleBlock.Builder builder) { /** * Writes an already deduplicated {@link #work} to a hash. */ - private void hashUniquedWork(LongHash hash, LongBlock.Builder builder) { + private void hashUniquedWork(Ordinator64 ordinator, LongBlock.Builder builder) { if (w == 1) { - hash(builder, hash, work[0]); + hash(builder, ordinator, work[0]); return; } builder.beginPositionEntry(); + // TODO use array flavored add for (int i = 0; i < w; i++) { - hash(builder, hash, work[i]); + hash(builder, ordinator, work[i]); } builder.endPositionEntry(); } @@ -316,18 +316,18 @@ private void hashUniquedWork(LongHash hash, LongBlock.Builder builder) { /** * Writes a sorted {@link #work} to a hash, skipping duplicates. */ - private void hashSortedWork(LongHash hash, LongBlock.Builder builder) { + private void hashSortedWork(Ordinator64 ordinator, LongBlock.Builder builder) { if (w == 1) { - hash(builder, hash, work[0]); + hash(builder, ordinator, work[0]); return; } builder.beginPositionEntry(); double prev = work[0]; - hash(builder, hash, prev); + hash(builder, ordinator, prev); for (int i = 1; i < w; i++) { if (prev != work[i]) { prev = work[i]; - hash(builder, hash, prev); + hash(builder, ordinator, prev); } } builder.endPositionEntry(); @@ -361,7 +361,7 @@ private void grow(int size) { work = ArrayUtil.grow(work, size); } - private void hash(LongBlock.Builder builder, LongHash hash, double v) { - builder.appendLong(BlockHash.hashOrdToGroupNullReserved(hash.add(Double.doubleToLongBits(v)))); + private void hash(LongBlock.Builder builder, Ordinator64 ordinator, double v) { + builder.appendLong(ordinator.add(Double.doubleToLongBits(v))); } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeInt.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeInt.java index e8e9f60189f15..bab7be3e56469 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeInt.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeInt.java @@ -8,9 +8,8 @@ package org.elasticsearch.compute.operator; import org.apache.lucene.util.ArrayUtil; -import org.elasticsearch.common.util.LongHash; import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; -import org.elasticsearch.compute.aggregation.blockhash.BlockHash; +import org.elasticsearch.compute.aggregation.blockhash.Ordinator64; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; @@ -140,7 +139,7 @@ public IntBlock dedupeToBlockUsingCopyMissing() { * Dedupe values and build a {@link LongBlock} suitable for passing * as the grouping block to a {@link GroupingAggregatorFunction}. */ - public MultivalueDedupe.HashResult hash(LongHash hash) { + public MultivalueDedupe.HashResult hash(Ordinator64 hash) { LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getPositionCount()); boolean sawNull = false; for (int p = 0; p < block.getPositionCount(); p++) { @@ -301,14 +300,15 @@ private void writeSortedWork(IntBlock.Builder builder) { /** * Writes an already deduplicated {@link #work} to a hash. */ - private void hashUniquedWork(LongHash hash, LongBlock.Builder builder) { + private void hashUniquedWork(Ordinator64 ordinator, LongBlock.Builder builder) { if (w == 1) { - hash(builder, hash, work[0]); + hash(builder, ordinator, work[0]); return; } builder.beginPositionEntry(); + // TODO use array flavored add for (int i = 0; i < w; i++) { - hash(builder, hash, work[i]); + hash(builder, ordinator, work[i]); } builder.endPositionEntry(); } @@ -316,18 +316,18 @@ private void hashUniquedWork(LongHash hash, LongBlock.Builder builder) { /** * Writes a sorted {@link #work} to a hash, skipping duplicates. */ - private void hashSortedWork(LongHash hash, LongBlock.Builder builder) { + private void hashSortedWork(Ordinator64 ordinator, LongBlock.Builder builder) { if (w == 1) { - hash(builder, hash, work[0]); + hash(builder, ordinator, work[0]); return; } builder.beginPositionEntry(); int prev = work[0]; - hash(builder, hash, prev); + hash(builder, ordinator, prev); for (int i = 1; i < w; i++) { if (prev != work[i]) { prev = work[i]; - hash(builder, hash, prev); + hash(builder, ordinator, prev); } } builder.endPositionEntry(); @@ -361,7 +361,7 @@ private void grow(int size) { work = ArrayUtil.grow(work, size); } - private void hash(LongBlock.Builder builder, LongHash hash, int v) { - builder.appendLong(BlockHash.hashOrdToGroupNullReserved(hash.add(v))); + private void hash(LongBlock.Builder builder, Ordinator64 ordinator, int v) { + builder.appendLong(ordinator.add(v)); } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeLong.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeLong.java index f334e1bd3f61f..3ae674da91e1a 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeLong.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeLong.java @@ -8,9 +8,8 @@ package org.elasticsearch.compute.operator; import org.apache.lucene.util.ArrayUtil; -import org.elasticsearch.common.util.LongHash; import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; -import org.elasticsearch.compute.aggregation.blockhash.BlockHash; +import org.elasticsearch.compute.aggregation.blockhash.Ordinator64; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.LongBlock; @@ -140,7 +139,7 @@ public LongBlock dedupeToBlockUsingCopyMissing() { * Dedupe values and build a {@link LongBlock} suitable for passing * as the grouping block to a {@link GroupingAggregatorFunction}. */ - public MultivalueDedupe.HashResult hash(LongHash hash) { + public MultivalueDedupe.HashResult hash(Ordinator64 hash) { LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getPositionCount()); boolean sawNull = false; for (int p = 0; p < block.getPositionCount(); p++) { @@ -301,14 +300,15 @@ private void writeSortedWork(LongBlock.Builder builder) { /** * Writes an already deduplicated {@link #work} to a hash. */ - private void hashUniquedWork(LongHash hash, LongBlock.Builder builder) { + private void hashUniquedWork(Ordinator64 ordinator, LongBlock.Builder builder) { if (w == 1) { - hash(builder, hash, work[0]); + hash(builder, ordinator, work[0]); return; } builder.beginPositionEntry(); + // TODO use array flavored add for (int i = 0; i < w; i++) { - hash(builder, hash, work[i]); + hash(builder, ordinator, work[i]); } builder.endPositionEntry(); } @@ -316,18 +316,18 @@ private void hashUniquedWork(LongHash hash, LongBlock.Builder builder) { /** * Writes a sorted {@link #work} to a hash, skipping duplicates. */ - private void hashSortedWork(LongHash hash, LongBlock.Builder builder) { + private void hashSortedWork(Ordinator64 ordinator, LongBlock.Builder builder) { if (w == 1) { - hash(builder, hash, work[0]); + hash(builder, ordinator, work[0]); return; } builder.beginPositionEntry(); long prev = work[0]; - hash(builder, hash, prev); + hash(builder, ordinator, prev); for (int i = 1; i < w; i++) { if (prev != work[i]) { prev = work[i]; - hash(builder, hash, prev); + hash(builder, ordinator, prev); } } builder.endPositionEntry(); @@ -361,7 +361,7 @@ private void grow(int size) { work = ArrayUtil.grow(work, size); } - private void hash(LongBlock.Builder builder, LongHash hash, long v) { - builder.appendLong(BlockHash.hashOrdToGroupNullReserved(hash.add(v))); + private void hash(LongBlock.Builder builder, Ordinator64 ordinator, long v) { + builder.appendLong(ordinator.add(v)); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/module-info.java b/x-pack/plugin/esql/compute/src/main/java/module-info.java index 91e45baa7bda9..e548d1784d290 100644 --- a/x-pack/plugin/esql/compute/src/main/java/module-info.java +++ b/x-pack/plugin/esql/compute/src/main/java/module-info.java @@ -6,6 +6,7 @@ */ module org.elasticsearch.compute { + requires jdk.incubator.vector; requires org.apache.lucene.core; requires org.elasticsearch.base; requires org.elasticsearch.server; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java index 9106508f7e262..329bac156e3f6 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java @@ -7,11 +7,13 @@ package org.elasticsearch.compute.aggregation.blockhash; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; import org.elasticsearch.common.util.BytesRefHash; import org.elasticsearch.common.util.LongHash; import org.elasticsearch.common.util.LongLongHash; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; import org.elasticsearch.compute.aggregation.SeenGroupIds; import org.elasticsearch.compute.data.Block; @@ -22,6 +24,7 @@ import org.elasticsearch.core.Releasable; import java.util.List; +import java.util.function.Supplier; /** * A specialized hash table implementation maps values of a {@link Block} to ids (in longs). @@ -59,42 +62,47 @@ public abstract sealed class BlockHash implements Releasable, SeenGroupIds // public abstract BitArray seenGroupIds(BigArrays bigArrays); /** - * Creates a specialized hash table that maps one or more {@link Block}s to ids. - * @param emitBatchSize maximum batch size to be emitted when handling combinatorial - * explosion of groups caused by multivalued fields + * Builds {@link BlockHash}es. */ - public static BlockHash build(List groups, BigArrays bigArrays, int emitBatchSize) { - if (groups.size() == 1) { - return newForElementType(groups.get(0).channel(), groups.get(0).elementType(), bigArrays); - } - if (groups.size() == 2) { - var g1 = groups.get(0); - var g2 = groups.get(1); - if (g1.elementType() == ElementType.LONG && g2.elementType() == ElementType.LONG) { - return new LongLongBlockHash(bigArrays, g1.channel(), g2.channel(), emitBatchSize); - } - if (g1.elementType() == ElementType.BYTES_REF && g2.elementType() == ElementType.LONG) { - return new BytesRefLongBlockHash(bigArrays, g1.channel(), g2.channel(), false, emitBatchSize); + public record Factory(BigArrays bigArrays, PageCacheRecycler recycler, Supplier breaker) { + /** + * Creates a specialized hash table that maps one or more {@link Block}s to ids. + * @param emitBatchSize maximum batch size to be emitted when handling combinatorial + * explosion of groups caused by multivalued fields + */ + public BlockHash build(List groups, int emitBatchSize) { + if (groups.size() == 1) { + return newForElementType(groups.get(0).channel(), groups.get(0).elementType()); } - if (g1.elementType() == ElementType.LONG && g2.elementType() == ElementType.BYTES_REF) { - return new BytesRefLongBlockHash(bigArrays, g2.channel(), g1.channel(), true, emitBatchSize); + if (groups.size() == 2) { + var g1 = groups.get(0); + var g2 = groups.get(1); + if (g1.elementType() == ElementType.LONG && g2.elementType() == ElementType.LONG) { + return new LongLongBlockHash(bigArrays, g1.channel(), g2.channel(), emitBatchSize); + } + if (g1.elementType() == ElementType.BYTES_REF && g2.elementType() == ElementType.LONG) { + return new BytesRefLongBlockHash(bigArrays, g1.channel(), g2.channel(), false, emitBatchSize); + } + if (g1.elementType() == ElementType.LONG && g2.elementType() == ElementType.BYTES_REF) { + return new BytesRefLongBlockHash(bigArrays, g2.channel(), g1.channel(), true, emitBatchSize); + } } + return new PackedValuesBlockHash(groups, bigArrays, emitBatchSize); } - return new PackedValuesBlockHash(groups, bigArrays, emitBatchSize); - } - /** - * Creates a specialized hash table that maps a {@link Block} of the given input element type to ids. - */ - private static BlockHash newForElementType(int channel, ElementType type, BigArrays bigArrays) { - return switch (type) { - case BOOLEAN -> new BooleanBlockHash(channel); - case INT -> new IntBlockHash(channel, bigArrays); - case LONG -> new LongBlockHash(channel, bigArrays); - case DOUBLE -> new DoubleBlockHash(channel, bigArrays); - case BYTES_REF -> new BytesRefBlockHash(channel, bigArrays); - default -> throw new IllegalArgumentException("unsupported grouping element type [" + type + "]"); - }; + /** + * Creates a specialized hash table that maps a {@link Block} of the given input element type to ids. + */ + private BlockHash newForElementType(int channel, ElementType type) { + return switch (type) { + case BOOLEAN -> new BooleanBlockHash(channel); + case INT -> new IntBlockHash(recycler, breaker.get(), channel); + case LONG -> new LongBlockHash(recycler, breaker.get(), channel); + case DOUBLE -> new DoubleBlockHash(recycler, breaker.get(), channel); + case BYTES_REF -> new BytesRefBlockHash(channel, bigArrays); + default -> throw new IllegalArgumentException("unsupported grouping element type [" + type + "]"); + }; + } } /** diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator.java new file mode 100644 index 0000000000000..d2b6416d7943b --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator.java @@ -0,0 +1,293 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation.blockhash; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public abstract class Ordinator { + protected final PageCacheRecycler recycler; + protected final CircuitBreaker breaker; + protected final IdSpace idSpace; + + protected int capacity; + protected int mask; + protected int nextGrowSize; + protected int currentSize; + protected int growCount; + + protected Ordinator( + PageCacheRecycler recycler, + CircuitBreaker breaker, + IdSpace idSpace, + int initialCapacity, + float smallCoreFillFactor + ) { + this.recycler = recycler; + this.breaker = breaker; + this.idSpace = idSpace; + + this.capacity = initialCapacity; + this.mask = capacity - 1; + this.nextGrowSize = (int) (capacity * smallCoreFillFactor); + + assert initialCapacity == Integer.highestOneBit(initialCapacity) : "intial capacity is a power of two"; + } + + /** + * How many entries are in the {@link Ordinator64}. + */ + public final int currentSize() { + return currentSize; + } + + /** + * Performance information hopefully useful for debugging. + */ + public abstract Status status(); + + /** + * Build an iterator to walk all values and ids. + */ + public abstract Itr iterator(); + + /** + * Sequence of {@code int}s assigned to ids. These can be shared between + * {@link Ordinator}s. + */ + public static class IdSpace { + private int id; + + /** + * Allocate the next id. + */ + public int next() { + return id++; + } + } + + /** + * Performance information about the {@link Ordinator} hopefully useful for debugging. + */ + public abstract static class Status implements NamedWriteable, ToXContentObject { + private final int growCount; + private final int capacity; + private final int size; + private final int nextGrowSize; + + protected Status(int growCount, int capacity, int size, int nextGrowSize) { + this.growCount = growCount; + this.capacity = capacity; + this.size = size; + this.nextGrowSize = nextGrowSize; + } + + protected Status(StreamInput in) throws IOException { + this(in.readVInt(), in.readVInt(), in.readVInt(), in.readVInt()); + } + + /** + * The number of times this {@link Ordinator} has grown. + */ + public int growCount() { + return growCount; + } + + /** + * The size of the {@link Ordinator}. + */ + public int capacity() { + return capacity; + } + + /** + * Number of entries added to the {@link Ordinator}. + */ + public int size() { + return size; + } + + /** + * When {@link #size} grows to this number the hash will grow again. + */ + public int nextGrowSize() { + return nextGrowSize; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(growCount); + out.writeVInt(capacity); + out.writeVInt(size); + out.writeVInt(nextGrowSize); + } + + @Override + public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("grow_count", growCount); + builder.field("capacity", capacity); + builder.field("size", size); + builder.field("next_grow_size", nextGrowSize); + builder.field("core", getWriteableName()); + toXContentFragment(builder, params); + return builder.endObject(); + } + + protected abstract void toXContentFragment(XContentBuilder builder, Params params) throws IOException; + } + + static class SmallCoreStatus extends Status { + SmallCoreStatus(int growCount, int capacity, int size, int nextGrowSize) { + super(growCount, capacity, size, nextGrowSize); + } + + SmallCoreStatus(StreamInput in) throws IOException { + super(in); + } + + @Override + public String getWriteableName() { + return "small"; + } + + @Override + protected void toXContentFragment(XContentBuilder builder, Params params) throws IOException {} + } + + static class BigCoreStatus extends Status { + /** + * The number of times and {@link Ordinator64#add} operation needed to probe additional + * entries. If all is right with the world this should be {@code 0}, meaning + * every entry found an empty slot within {@code SIMD_WIDTH} slots from its + * natural positions. Such hashes will never have to probe on read. More + * generally, a {@code find} operation should take on average + * {@code insertProbes / size} probes. + */ + private final int insertProbes; + + /** + * The number of {@link PageCacheRecycler#PAGE_SIZE_IN_BYTES} pages allocated for keys. + */ + public final int keyPages; + + /** + * The number of {@link PageCacheRecycler#PAGE_SIZE_IN_BYTES} pages allocated for ids. + */ + public final int idPages; + + BigCoreStatus(int growCount, int capacity, int size, int nextGrowSize, int insertProbes, int keyPages, int idPages) { + super(growCount, capacity, size, nextGrowSize); + this.insertProbes = insertProbes; + this.keyPages = keyPages; + this.idPages = idPages; + } + + BigCoreStatus(StreamInput in) throws IOException { + super(in); + insertProbes = in.readVInt(); + keyPages = in.readVInt(); + idPages = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(insertProbes); + out.writeVInt(keyPages); + out.writeVInt(idPages); + } + + /** + * The number of {@link PageCacheRecycler#PAGE_SIZE_IN_BYTES} pages allocated for keys. + */ + public int keyPages() { + return keyPages; + } + + /** + * The number of {@link PageCacheRecycler#PAGE_SIZE_IN_BYTES} pages allocated for ids. + */ + public int idPages() { + return idPages; + } + + @Override + public String getWriteableName() { + return "big"; + } + + @Override + protected void toXContentFragment(XContentBuilder builder, Params params) throws IOException { + builder.field("insert_probes", insertProbes); + builder.field("key_pages", keyPages); + builder.field("id_pages", idPages); + } + } + + /** + * Shared superstructure for hash cores. Basically just page tracking + * and {@link Releasable}. + */ + abstract class Core implements Releasable { + final List toClose = new ArrayList<>(); + + byte[] grabPage() { + breaker.addEstimateBytesAndMaybeBreak(PageCacheRecycler.PAGE_SIZE_IN_BYTES, "ordinator"); + toClose.add(() -> breaker.addWithoutBreaking(-PageCacheRecycler.PAGE_SIZE_IN_BYTES)); + Recycler.V page = recycler.bytePage(false); + toClose.add(page); + return page.v(); + } + + /** + * Build the status for this core. + */ + protected abstract Status status(); + + /** + * Build an iterator for all values in the core. + */ + protected abstract Itr iterator(); + + @Override + public void close() { + Releasables.close(toClose); + } + } + + /** + * Iterates the entries in the {@link Ordinator}. + */ + public abstract class Itr { + protected int slot = -1; + + /** + * Advance to the next entry in the {@link Ordinator}, returning {@code false} + * if there aren't any more entries.. + */ + public abstract boolean next(); + + /** + * The id the iterator is current pointing to. + */ + public abstract int id(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator64.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator64.java new file mode 100644 index 0000000000000..f3ce8d3ec910c --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator64.java @@ -0,0 +1,622 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation.blockhash; + +import jdk.incubator.vector.ByteVector; +import jdk.incubator.vector.VectorMask; +import jdk.incubator.vector.VectorSpecies; + +import org.apache.lucene.util.hppc.BitMixer; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.nio.ByteOrder; +import java.util.Arrays; + +/** + * Assigns {@code int} ids to {@code long}s, vending the in order they are added. + *

+ * At it's core there are two hash table implementations, a "small core" and + * a "big core". The "small core" is a simple + * open addressed + * hash table with a fixed 60% load factor and a table of 2048. It quite quick + * because it has a fixed size and never grows. + *

+ *

+ * When the "small core" has more entries than it's load factor the "small core" + * is replaced with a "big core". The "big core" functions quite similarly to + * a Swisstable, Google's + * fancy SIMD hash table. In this table there's a contiguous array of "control" + * bytes that are either {@code 0b1111_1111} for empty entries or + * {@code 0b0aaa_aaaa} for populated entries, where {@code aaa_aaaa} are the top + * 7 bytes of the hash. To find an entry by key you hash it, grab the top 7 bytes + * or it, and perform a SIMD read of the control array starting at the expected + * slot. We use the widest SIMD instruction the CPU supports, meaning 64 or 32 + * bytes. If any of those match we check the actual key. So instead of scanning + * one slot at a time "small core", we effectively scan a whole bunch at once. + * This allows us to run a much higher load factor (85%) without any performance + * penalty so the extra byte feels super worth it. + *

+ *

+ * When a "big core" fills it's table to the fill factor, we build a new "big core" + * and read all values in the old "big core" into the new one. + *

+ */ +@SuppressWarnings("preview") +public class Ordinator64 extends Ordinator implements Releasable { + private static final VectorSpecies BS = ByteVector.SPECIES_PREFERRED; + + private static final int PAGE_SHIFT = 14; + + private static final int PAGE_MASK = PageCacheRecycler.PAGE_SIZE_IN_BYTES - 1; + + private static final int KEY_SIZE = Long.BYTES; + + private static final int ID_SIZE = Integer.BYTES; + + static final int INITIAL_CAPACITY = PageCacheRecycler.PAGE_SIZE_IN_BYTES / KEY_SIZE; + + static { + if (PageCacheRecycler.PAGE_SIZE_IN_BYTES >> PAGE_SHIFT != 1) { + throw new AssertionError("bad constants"); + } + if (Integer.highestOneBit(KEY_SIZE) != KEY_SIZE) { + throw new AssertionError("not a power of two"); + } + if (Integer.highestOneBit(ID_SIZE) != ID_SIZE) { + throw new AssertionError("not a power of two"); + } + if (Integer.highestOneBit(INITIAL_CAPACITY) != INITIAL_CAPACITY) { + throw new AssertionError("not a power of two"); + } + if (ID_SIZE > KEY_SIZE) { + throw new AssertionError("key too small"); + } + } + + private static final VarHandle longHandle = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.nativeOrder()); + private static final VarHandle intHandle = MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.nativeOrder()); + + private SmallCore smallCore; + private BigCore bigCore; + + public Ordinator64(PageCacheRecycler recycler, CircuitBreaker breaker, IdSpace idSpace) { + super(recycler, breaker, idSpace, INITIAL_CAPACITY, Ordinator64.SmallCore.FILL_FACTOR); + this.smallCore = new SmallCore(); + } + + /** + * Find an {@code id} by a {@code key}. + */ + public int find(long key) { + int hash = hash(key); + if (smallCore != null) { + return smallCore.find(key, hash); + } else { + return bigCore.find(key, hash, control(hash)); + } + } + + /** + * Add many {@code key}s at once, putting their {@code id}s into an array. + * If any {@code key} was already present it's previous assigned {@code id} + * will be added to the array. If it wasn't present it'll be assigned a new + * {@code id}. + *

+ * This method tends to be faster than {@link #add(long)}. + *

+ */ + public void add(long[] keys, int[] ids, int length) { + int i = 0; + for (; i < length; i++) { + if (bigCore != null) { + for (; i < length; i++) { + long k = keys[i]; + ids[i] = bigCore.add(k, hash(k)); + } + return; + } + + ids[i] = add(keys[i]); + } + } + + /** + * Add many {@code key}s at once, putting their {@code id}s into a builder. + * If any {@code key} was already present it's previous assigned {@code id} + * will be added to the builder. If it wasn't present it'll be assigned a new + * {@code id}. + *

+ * This method tends to be faster than {@link #add(long)}. + *

+ */ + public void add(long[] keys, LongBlock.Builder ids, int length) { + int i = 0; + for (; i < length; i++) { + if (bigCore != null) { + for (; i < length; i++) { + long k = keys[i]; + ids.appendLong(bigCore.add(k, hash(k))); + } + return; + } + + ids.appendLong(add(keys[i])); + } + } + + /** + * Add a {@code key}, returning its {@code id}s. If it was already present + * it's previous assigned {@code id} will be returned. If it wasn't present + * it'll be assigned a new {@code id}. + */ + public int add(long key) { + int hash = hash(key); + if (smallCore != null) { + if (currentSize < nextGrowSize) { + return smallCore.add(key, hash); + } + smallCore.transitionToBigCore(); + } + return bigCore.add(key, hash); + } + + @Override + public Status status() { + return smallCore != null ? smallCore.status() : bigCore.status(); + } + + public abstract class Itr extends Ordinator.Itr { + /** + * The key the iterator is current pointing to. + */ + public abstract long key(); + } + + @Override + public Itr iterator() { + return smallCore != null ? smallCore.iterator() : bigCore.iterator(); + } + + /** + * Build the control byte for a populated entry out of the hash. + * The control bytes for a populated entry has the high bit clear + * and the remaining 7 bits contain the top 7 bits of the hash. + * So it looks like {@code 0b0xxx_xxxx}. + */ + private byte control(int hash) { + return (byte) (hash >>> (Integer.SIZE - 7)); + } + + @Override + public void close() { + Releasables.close(smallCore, bigCore); + } + + private int growTracking() { + // Juggle constants for the new page size + growCount++; + // TODO what about MAX_INT? + int oldCapacity = capacity; + capacity <<= 1; + mask = capacity - 1; + nextGrowSize = (int) (capacity * BigCore.FILL_FACTOR); + return oldCapacity; + } + + /** + * Open addressed hash table the probes by triangle numbers. Empty + * {@code id}s are encoded as {@code -1}. This hash table can't + * grow, and is instead replaced by a {@link BigCore}. + *

+ * This uses two pages from the {@link PageCacheRecycler}, one + * for the {@code keys} and one for the {@code ids}. + *

+ */ + class SmallCore extends Core { + static final float FILL_FACTOR = 0.6F; + + private final byte[] keyPage; + private final byte[] idPage; + + private SmallCore() { + boolean success = false; + try { + keyPage = grabPage(); + idPage = grabPage(); + Arrays.fill(idPage, (byte) 0xff); + success = true; + } finally { + if (success == false) { + close(); + } + } + } + + int find(long key, int hash) { + int slotIncrement = 0; + int slot = slot(hash); + while (true) { + int id = id(slot); + if (id < 0) { + // Empty + return -1; + } + if (key(slot) == key) { + return id; + } + + slotIncrement++; + slot = slot(slot + slotIncrement); + } + } + + int add(long key, int hash) { + int slotIncrement = 0; + int slot = slot(hash); + while (true) { + int keyOffset = keyOffset(slot); + int idOffset = idOffset(slot); + long slotKey = (long) longHandle.get(keyPage, keyOffset); + int slotId = (int) intHandle.get(idPage, idOffset); + if (slotId >= 0) { + if (slotKey == key) { + return slotId; + } + } else { + currentSize++; + longHandle.set(keyPage, keyOffset, key); + int id = idSpace.next(); + intHandle.set(idPage, idOffset, id); + return id; + } + + slotIncrement++; + slot = slot(slot + slotIncrement); + } + } + + void transitionToBigCore() { + int oldCapacity = growTracking(); + + try { + bigCore = new BigCore(); + rehash(oldCapacity); + } finally { + close(); + smallCore = null; + } + } + + @Override + protected Status status() { + return new SmallCoreStatus(growCount, capacity, currentSize, nextGrowSize); + } + + @Override + protected Itr iterator() { + return new Itr() { + @Override + public boolean next() { + do { + slot++; + } while (slot < capacity && SmallCore.this.id(slot) < 0); + return slot < capacity; + } + + @Override + public int id() { + return SmallCore.this.id(slot); + } + + @Override + public long key() { + return SmallCore.this.key(slot); + } + }; + } + + private void rehash(int oldCapacity) { + for (int slot = 0; slot < oldCapacity; slot++) { + int id = (int) intHandle.get(idPage, idOffset(slot)); + if (id < 0) { + continue; + } + long key = (long) longHandle.get(keyPage, keyOffset(slot)); + int hash = hash(key); + bigCore.insert(key, hash, control(hash), id); + } + } + + private long key(int slot) { + return (long) longHandle.get(keyPage, keyOffset(slot)); + } + + private int id(int slot) { + return (int) intHandle.get(idPage, idOffset(slot)); + } + } + + /** + * A Swisstable inspired hashtable. + */ + class BigCore extends Core { + static final float FILL_FACTOR = 0.85F; + + private static final byte CONTROL_EMPTY = (byte) 0b1111_1111; + + /** + * The "control" bytes from the Swisstable algorithm. This'll contain + * {@link #CONTROL_EMPTY} for empty entries and {@code 0b0aaa_aaaa} for + * filled entries, where {@code aaa_aaaa} are the top seven bits of the + * hash. These are tests by SIMD instructions as a quick first pass to + * check many entries at once. + *

+ * This array has to be contiguous or we loose too much speed so it + * isn't managed by the {@link PageCacheRecycler}, instead we + * allocate it directly. + *

+ *

+ * This array contains {@code capacity + SIMD_LANES} entries with the + * first {@code SIMD_LANES} bytes cloned to the end of the array so + * the simd probes for possible matches never had to worry about + * "wrapping" around the array. + */ + private final byte[] controlData; + + /** + * Pages of {@code keys}, vended by the {@link PageCacheRecycler}. It's + * important that the size of keys be a power of two, so we can quickly + * select the appropriate page and keys never span multiple pages. + */ + private final byte[][] keyPages; + + /** + * Pages of {@code ids}, vended by the {@link PageCacheRecycler}. Ids + * are {@code int}s so it's very quick to select the appropriate page + * for each slot. + */ + private final byte[][] idPages; + + /** + * The number of times and {@link #add} operation needed to probe additional + * entries. If all is right with the world this should be {@code 0}, meaning + * every entry found an empty slot within {@code SIMD_WIDTH} slots from its + * natural positions. Such hashes will never have to probe on read. More + * generally, a {@code find} operation should take on average + * {@code insertProbes / size} probes. + */ + private int insertProbes; + + BigCore() { + int controlLength = capacity + BS.length(); + breaker.addEstimateBytesAndMaybeBreak(controlLength, "ordinator"); + toClose.add(() -> breaker.addWithoutBreaking(-controlLength)); + controlData = new byte[controlLength]; + Arrays.fill(controlData, (byte) 0xFF); + + boolean success = false; + try { + int keyPagesNeeded = (capacity * KEY_SIZE - 1) >> PAGE_SHIFT; + keyPagesNeeded++; + keyPages = new byte[keyPagesNeeded][]; + for (int i = 0; i < keyPagesNeeded; i++) { + keyPages[i] = grabPage(); + } + assert keyPages[keyOffset(mask) >> PAGE_SHIFT] != null; + + int idPagesNeeded = (capacity * ID_SIZE - 1) >> PAGE_SHIFT; + idPagesNeeded++; + idPages = new byte[idPagesNeeded][]; + for (int i = 0; i < idPagesNeeded; i++) { + idPages[i] = grabPage(); + } + assert idPages[idOffset(mask) >> PAGE_SHIFT] != null; + success = true; + } finally { + if (false == success) { + close(); + } + } + } + + /** + * Probe chunks for the value. + *

+ * Each probe is: + *

+ *
    + *
  1. Build a bit mask of all matching control values.
  2. + *
  3. If any match, check if the actual values. If any of those match, return them.
  4. + *
  5. No values matched, so check the control values for EMPTY flags. If there are any then the value isn't in the hash.
  6. + *
  7. There aren't any EMPTY flags, meaning this chunk is full. So we should continue probing.
  8. + *
+ *

+ * We probe via triangle numbers, adding 1, then 2, then 3, then 4, etc. That'll + * help protect us from chunky hashes. And it's simple math. And it'll hit all the + * buckets (proof). + * The probe loop doesn't stop if it never finds an EMPTY flag. But it'll always + * find one because we keep a load factor lower than 100%. + *

+ */ + private int find(long key, int hash, byte control) { + int slotIncrement = 0; + int slot = slot(hash); + while (true) { + VectorMask candidateMatches = controlMatches(slot, control); + // TODO the double checking could be vectorized for some key types. Longs, probably. + + int first; + while ((first = candidateMatches.firstTrue()) < candidateMatches.length()) { + int checkSlot = slot(slot + first); + + if (key(checkSlot) == key) { + return id(checkSlot); + } + // Clear the first set bit and try again + candidateMatches = candidateMatches.indexInRange(-1 - first, candidateMatches.length()); + } + + if (controlMatches(slot, CONTROL_EMPTY).anyTrue()) { + return -1; + } + + slotIncrement += BS.length(); + slot = slot(slot + slotIncrement); + } + } + + int add(long key, int hash) { + byte control = control(hash); + int found = find(key, hash, control); + if (found >= 0) { + return found; + } + + currentSize++; + if (currentSize >= nextGrowSize) { + assert currentSize == nextGrowSize; + grow(); + } + + int id = idSpace.next(); + bigCore.insert(key, hash, control, id); + return id; + } + + /** + * Insert the key into the first empty slot that allows it. Used by {@link #add} + * after we verify that the key isn't in the index. And used by {@link #rehash} + * because we know all keys are unique. + */ + void insert(long key, int hash, byte control, int id) { + int slotIncrement = 0; + int slot = slot(hash); + while (true) { + VectorMask empty = controlMatches(slot, CONTROL_EMPTY); + if (empty.anyTrue()) { + slot = slot(slot + empty.firstTrue()); + int keyOffset = keyOffset(slot); + int idOffset = idOffset(slot); + + longHandle.set(keyPages[keyOffset >> PAGE_SHIFT], keyOffset & PAGE_MASK, key); + intHandle.set(idPages[idOffset >> PAGE_SHIFT], idOffset & PAGE_MASK, id); + controlData[slot] = control; + /* + * Mirror the first BS.length bytes to the end of the array. All + * other positions are just written twice. + */ + controlData[((slot - BS.length()) & mask) + BS.length()] = control; + return; + } + + slotIncrement += BS.length(); + slot = slot(slot + slotIncrement); + insertProbes++; + } + } + + @Override + protected Status status() { + return new BigCoreStatus(growCount, capacity, currentSize, nextGrowSize, insertProbes, keyPages.length, idPages.length); + } + + @Override + protected Itr iterator() { + return new Itr() { + @Override + public boolean next() { + do { + slot++; + } while (slot < capacity && controlData[slot] == CONTROL_EMPTY); + return slot < capacity; + } + + @Override + public int id() { + return BigCore.this.id(slot); + } + + @Override + public long key() { + return BigCore.this.key(slot); + } + }; + } + + private void grow() { + int oldCapacity = growTracking(); + try { + bigCore = new BigCore(); + rehash(oldCapacity); + } finally { + close(); + } + } + + private void rehash(int oldCapacity) { + int slot = 0; + while (slot < oldCapacity) { + VectorMask empty = controlMatches(slot, CONTROL_EMPTY); + // TODO iterate like in find - it's faster. + for (int i = 0; i < empty.length(); i++) { + if (empty.laneIsSet(i)) { + slot++; + continue; + } + long key = key(slot); + int hash = hash(key); + int id = id(slot); + bigCore.insert(key, hash, control(hash), id); + slot++; + } + } + } + + /** + * Checks the control byte at {@code slot} and the next few bytes ahead + * of {@code slot} for the control bits. The extra probed bytes is as + * many as will fit in your widest simd instruction. So, 32 or 64 will + * be common. + */ + private VectorMask controlMatches(int slot, byte control) { + return ByteVector.fromArray(BS, controlData, slot).eq(control); + } + + private long key(int slot) { + int keyOffset = keyOffset(slot); + return (long) longHandle.get(keyPages[keyOffset >> PAGE_SHIFT], keyOffset & PAGE_MASK); + } + + private int id(int slot) { + int idOffset = idOffset(slot); + return (int) intHandle.get(idPages[idOffset >> PAGE_SHIFT], idOffset & PAGE_MASK); + } + } + + int keyOffset(int slot) { + return slot * KEY_SIZE; + } + + int idOffset(int slot) { + return slot * ID_SIZE; + } + + int hash(long v) { + return BitMixer.mix(v); + } + + int slot(int hash) { + return hash & mask; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/X-BlockHash.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/X-BlockHash.java.st new file mode 100644 index 0000000000000..c24240c7f632c --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/X-BlockHash.java.st @@ -0,0 +1,156 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation.blockhash; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BitArray; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; +import org.elasticsearch.compute.aggregation.SeenGroupIds; +import org.elasticsearch.compute.data.Block; +$if(double)$ +import org.elasticsearch.compute.data.DoubleArrayBlock; +import org.elasticsearch.compute.data.DoubleArrayVector; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.DoubleVector; +$endif$ +$if(int)$ +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntArrayVector; +import org.elasticsearch.compute.data.IntBlock; +$endif$ +import org.elasticsearch.compute.data.IntVector; +$if(long)$ +import org.elasticsearch.compute.data.LongArrayBlock; +$endif$ +import org.elasticsearch.compute.data.LongArrayVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.MultivalueDedupe; +import org.elasticsearch.compute.operator.MultivalueDedupe$Type$; + +import java.util.BitSet; + +/** + * Maps {@link $Type$Block} to group ids. + * This class is generated. Edit {@code X-BlockHash.java.st} instead. + */ +final class $Type$BlockHash extends BlockHash { + private final int channel; + private final Ordinator64 ordinator; + + /** + * Have we seen any {@code null} values? + *

+ * We reserve the 0 ordinal for the {@code null} key so methods like + * {@link #nonEmpty} need to skip 0 if we haven't seen any null values. + *

+ */ + private boolean seenNull; + + $Type$BlockHash(PageCacheRecycler recycler, CircuitBreaker breaker, int channel) { + this.channel = channel; +$if(int)$ + // TODO build and use Ordinator32 +$endif$ + Ordinator64.IdSpace idSpace = new Ordinator64.IdSpace(); + idSpace.next(); // Reserve 0 for nulls. + this.ordinator = new Ordinator64(recycler, breaker, idSpace); + } + + @Override + public void add(Page page, GroupingAggregatorFunction.AddInput addInput) { + $Type$Block block = page.getBlock(channel); + $Type$Vector vector = block.asVector(); + if (vector == null) { + addInput.add(0, add(block)); + } else { + addInput.add(0, add(vector)); + } + } + + private LongVector add($Type$Vector vector) { + long[] groups = new long[vector.getPositionCount()]; + // TODO use the array flavored add + for (int i = 0; i < vector.getPositionCount(); i++) { +$if(double)$ + groups[i] = ordinator.add(Double.doubleToLongBits(vector.get$Type$(i))); +$else$ + groups[i] = ordinator.add(vector.get$Type$(i)); +$endif$ + } + return new LongArrayVector(groups, groups.length); + } + + private LongBlock add($Type$Block block) { + MultivalueDedupe.HashResult result = new MultivalueDedupe$Type$(block).hash(ordinator); + seenNull |= result.sawNull(); + return result.ords(); + } + + @Override + public $Type$Block[] getKeys() { + // TODO call something like takeKeyOwnership to claim the keys array directly + + // If we've seen null we'll store it in 0 + if (seenNull) { + $type$[] keys = new $type$[ordinator.currentSize() + 1]; + for (Ordinator64.Itr itr = ordinator.iterator(); itr.next();) { +$if(int)$ + // TODO build and use Ordinator32 and drop the cast + keys[itr.id()] = Math.toIntExact(itr.key()); +$elseif(double)$ + keys[itr.id()] = Double.longBitsToDouble(itr.key()); +$else$ + keys[itr.id()] = itr.key(); +$endif$ + } + BitSet nulls = new BitSet(1); + nulls.set(0); + return new $Type$Block[] { new $Type$ArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.ASCENDING) }; + } + $type$[] keys = new $type$[ordinator.currentSize() + (seenNull ? 1 : 0)]; + for (Ordinator64.Itr itr = ordinator.iterator(); itr.next();) { + // We reserved the id 0 for null but didn't see it. +$if(int)$ + // TODO build and use Ordinator32 and drop the cast + keys[itr.id() - 1] = Math.toIntExact(itr.key()); +$elseif(double)$ + keys[itr.id() - 1] = Double.longBitsToDouble(itr.key()); +$else$ + keys[itr.id() - 1] = itr.key(); +$endif$ + } + + return new $Type$Block[] { new $Type$ArrayVector(keys, keys.length).asBlock() }; + } + + @Override + public IntVector nonEmpty() { + return IntVector.range(seenNull ? 0 : 1, Math.toIntExact(ordinator.currentSize() + 1)); + } + + @Override + public BitArray seenGroupIds(BigArrays bigArrays) { + return new SeenGroupIds.Range(seenNull ? 0 : 1, Math.toIntExact(ordinator.currentSize() + 1)).seenGroupIds(bigArrays); + } + + @Override + public void close() { + ordinator.close(); + } + + @Override + public String toString() { + return "$Type$BlockHash{channel=" + channel + ", entries=" + ordinator.currentSize() + ", seenNull=" + seenNull + '}'; + } + + // TODO plumb ordinator.status +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java index 1910cc4ec590c..203e7338fbd96 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java @@ -7,7 +7,6 @@ package org.elasticsearch.compute.operator; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.Describable; import org.elasticsearch.compute.aggregation.GroupingAggregator; import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; @@ -36,11 +35,11 @@ public record HashAggregationOperatorFactory( List groups, List aggregators, int maxPageSize, - BigArrays bigArrays + BlockHash.Factory blockHashFactory ) implements OperatorFactory { @Override public Operator get(DriverContext driverContext) { - return new HashAggregationOperator(aggregators, () -> BlockHash.build(groups, bigArrays, maxPageSize), driverContext); + return new HashAggregationOperator(aggregators, () -> blockHashFactory.build(groups, maxPageSize), driverContext); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java index 3a1cf5fee3512..bd2859a418429 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java @@ -57,12 +57,22 @@ public record OrdinalsGroupingOperatorFactory( String groupingField, List aggregators, int maxPageSize, + BlockHash.Factory blockHashFactory, BigArrays bigArrays ) implements OperatorFactory { @Override public Operator get(DriverContext driverContext) { - return new OrdinalsGroupingOperator(sources, docChannel, groupingField, aggregators, maxPageSize, bigArrays, driverContext); + return new OrdinalsGroupingOperator( + sources, + docChannel, + groupingField, + aggregators, + maxPageSize, + blockHashFactory, + bigArrays, + driverContext + ); } @Override @@ -77,6 +87,7 @@ public String describe() { private final List aggregatorFactories; private final Map ordinalAggregators; + private final BlockHash.Factory blockHashFactory; private final BigArrays bigArrays; private final DriverContext driverContext; @@ -93,6 +104,7 @@ public OrdinalsGroupingOperator( String groupingField, List aggregatorFactories, int maxPageSize, + BlockHash.Factory blockHashFactory, BigArrays bigArrays, DriverContext driverContext ) { @@ -109,6 +121,7 @@ public OrdinalsGroupingOperator( this.aggregatorFactories = aggregatorFactories; this.ordinalAggregators = new HashMap<>(); this.maxPageSize = maxPageSize; + this.blockHashFactory = blockHashFactory; this.bigArrays = bigArrays; this.driverContext = driverContext; } @@ -166,7 +179,7 @@ public void addInput(Page page) { channelIndex, aggregatorFactories, maxPageSize, - bigArrays, + blockHashFactory, driverContext ); } @@ -416,13 +429,13 @@ private static class ValuesAggregator implements Releasable { int channelIndex, List aggregatorFactories, int maxPageSize, - BigArrays bigArrays, + BlockHash.Factory blockHashFactory, DriverContext driverContext ) { this.extractor = new ValuesSourceReaderOperator(sources, docChannel, groupingField); this.aggregator = new HashAggregationOperator( aggregatorFactories, - () -> BlockHash.build(List.of(new GroupSpec(channelIndex, sources.get(0).elementType())), bigArrays, maxPageSize), + () -> blockHashFactory.build(List.of(new GroupSpec(channelIndex, sources.get(0).elementType())), maxPageSize), driverContext ); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/X-MultivalueDedupe.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/X-MultivalueDedupe.java.st index 7c4fdb7bebdec..c148ccbd7653b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/X-MultivalueDedupe.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/X-MultivalueDedupe.java.st @@ -11,11 +11,12 @@ import org.apache.lucene.util.ArrayUtil; $if(BytesRef)$ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.util.BytesRefHash; -$else$ -import org.elasticsearch.common.util.LongHash; -$endif$ import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; import org.elasticsearch.compute.aggregation.blockhash.BlockHash; +$else$ +import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; +import org.elasticsearch.compute.aggregation.blockhash.Ordinator64; +$endif$ $if(long)$ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.LongBlock; @@ -24,8 +25,8 @@ $else$ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.$Type$Block; import org.elasticsearch.compute.data.LongBlock; -$endif$ +$endif$ import java.util.Arrays; /** @@ -179,7 +180,7 @@ $endif$ $if(BytesRef)$ public MultivalueDedupe.HashResult hash(BytesRefHash hash) { $else$ - public MultivalueDedupe.HashResult hash(LongHash hash) { + public MultivalueDedupe.HashResult hash(Ordinator64 hash) { $endif$ LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getPositionCount()); boolean sawNull = false; @@ -389,17 +390,18 @@ $endif$ * Writes an already deduplicated {@link #work} to a hash. */ $if(BytesRef)$ - private void hashUniquedWork(BytesRefHash hash, LongBlock.Builder builder) { + private void hashUniquedWork(BytesRefHash ordinator, LongBlock.Builder builder) { $else$ - private void hashUniquedWork(LongHash hash, LongBlock.Builder builder) { + private void hashUniquedWork(Ordinator64 ordinator, LongBlock.Builder builder) { $endif$ if (w == 1) { - hash(builder, hash, work[0]); + hash(builder, ordinator, work[0]); return; } builder.beginPositionEntry(); + // TODO use array flavored add for (int i = 0; i < w; i++) { - hash(builder, hash, work[i]); + hash(builder, ordinator, work[i]); } builder.endPositionEntry(); } @@ -408,17 +410,17 @@ $endif$ * Writes a sorted {@link #work} to a hash, skipping duplicates. */ $if(BytesRef)$ - private void hashSortedWork(BytesRefHash hash, LongBlock.Builder builder) { + private void hashSortedWork(BytesRefHash ordinator, LongBlock.Builder builder) { $else$ - private void hashSortedWork(LongHash hash, LongBlock.Builder builder) { + private void hashSortedWork(Ordinator64 ordinator, LongBlock.Builder builder) { $endif$ if (w == 1) { - hash(builder, hash, work[0]); + hash(builder, ordinator, work[0]); return; } builder.beginPositionEntry(); $type$ prev = work[0]; - hash(builder, hash, prev); + hash(builder, ordinator, prev); for (int i = 1; i < w; i++) { $if(BytesRef)$ if (false == prev.equals(work[i])) { @@ -426,7 +428,7 @@ $else$ if (prev != work[i]) { $endif$ prev = work[i]; - hash(builder, hash, prev); + hash(builder, ordinator, prev); } } builder.endPositionEntry(); @@ -487,12 +489,14 @@ $endif$ $if(BytesRef)$ private void hash(LongBlock.Builder builder, BytesRefHash hash, BytesRef v) { $else$ - private void hash(LongBlock.Builder builder, LongHash hash, $type$ v) { + private void hash(LongBlock.Builder builder, Ordinator64 ordinator, $type$ v) { $endif$ $if(double)$ - builder.appendLong(BlockHash.hashOrdToGroupNullReserved(hash.add(Double.doubleToLongBits(v)))); -$else$ + builder.appendLong(ordinator.add(Double.doubleToLongBits(v))); +$elseif(BytesRef)$ builder.appendLong(BlockHash.hashOrdToGroupNullReserved(hash.add(v))); +$else$ + builder.appendLong(ordinator.add(v)); $endif$ } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index 114576b7bed7e..adfd28c9ab943 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.compute.aggregation.CountAggregatorFunction; import org.elasticsearch.compute.aggregation.blockhash.BlockHash; @@ -70,6 +71,7 @@ import org.elasticsearch.index.fielddata.plain.SortedNumericIndexFieldData; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; import org.elasticsearch.search.aggregations.support.ValuesSource; @@ -324,7 +326,10 @@ public void testGroupingWithOrdinals() throws Exception { } writer.commit(); Map actualCounts = new HashMap<>(); - BigArrays bigArrays = bigArrays(); + PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY); + CircuitBreakerService breakerService = new NoneCircuitBreakerService(); + BigArrays bigArrays = new MockBigArrays(recycler, breakerService); + BlockHash.Factory blockHashFactory = new BlockHash.Factory(bigArrays, recycler, () -> breakerService.getBreaker("test")); boolean shuffleDocs = randomBoolean(); Operator shuffleDocsOperator = new AbstractPageMappingOperator() { @Override @@ -402,14 +407,14 @@ public String toString() { gField, List.of(CountAggregatorFunction.supplier(bigArrays, List.of(1)).groupingAggregatorFactory(INITIAL)), randomPageSize(), + blockHashFactory, bigArrays, driverContext ), new HashAggregationOperator( List.of(CountAggregatorFunction.supplier(bigArrays, List.of(1, 2)).groupingAggregatorFactory(FINAL)), - () -> BlockHash.build( + () -> blockHashFactory.build( List.of(new HashAggregationOperator.GroupSpec(0, ElementType.BYTES_REF)), - bigArrays, randomPageSize() ), driverContext @@ -543,13 +548,6 @@ public BytesRef nextValue() throws IOException { }; } - /** - * Creates a {@link BigArrays} that tracks releases but doesn't throw circuit breaking exceptions. - */ - private BigArrays bigArrays() { - return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); - } - public static void assertDriverContext(DriverContext driverContext) { assertTrue(driverContext.isFinished()); assertThat(driverContext.getSnapshot().releasables(), empty()); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java index ac3edc4c61a88..b4940b25f513b 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java @@ -8,9 +8,13 @@ package org.elasticsearch.compute.aggregation; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.compute.aggregation.blockhash.BlockHash; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BooleanBlock; import org.elasticsearch.compute.data.BytesRefBlock; @@ -68,11 +72,12 @@ protected final Operator.OperatorFactory simpleWithMode(BigArrays bigArrays, Agg if (randomBoolean()) { supplier = chunkGroups(emitChunkSize, supplier); } + // TOOD pass PageCacheRecycler and Breaker too return new HashAggregationOperator.HashAggregationOperatorFactory( List.of(new HashAggregationOperator.GroupSpec(0, ElementType.LONG)), List.of(supplier.groupingAggregatorFactory(mode)), randomPageSize(), - bigArrays + new BlockHash.Factory(bigArrays, new PageCacheRecycler(Settings.EMPTY), () -> new NoopCircuitBreaker("test")) ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashRandomizedTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashRandomizedTests.java index c585108a89fd0..780e0586ddca8 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashRandomizedTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashRandomizedTests.java @@ -10,14 +10,10 @@ import com.carrotsearch.randomizedtesting.annotations.Name; import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; -import org.elasticsearch.common.util.MockBigArrays; -import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.data.BasicBlockTests; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.ElementType; -import org.elasticsearch.compute.operator.HashAggregationOperator; import org.elasticsearch.compute.operator.MultivalueDedupeTests; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ListMatcher; @@ -93,7 +89,7 @@ public void test() { int pageCount = between(1, 10); int positionCount = 100; int emitBatchSize = 100; - try (BlockHash blockHash = newBlockHash(emitBatchSize, types)) { + try (BlockHash blockHash = BlockHashTests.newBlockHash(forcePackedHash, emitBatchSize, types)) { /* * Only the long/long, long/bytes_ref, and bytes_ref/long implementations don't collect nulls. */ @@ -160,17 +156,6 @@ public void test() { } } - private BlockHash newBlockHash(int emitBatchSize, List types) { - List specs = new ArrayList<>(types.size()); - for (int c = 0; c < types.size(); c++) { - specs.add(new HashAggregationOperator.GroupSpec(c, types.get(c))); - } - MockBigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService()); - return forcePackedHash - ? new PackedValuesBlockHash(specs, bigArrays, emitBatchSize) - : BlockHash.build(specs, bigArrays, emitBatchSize); - } - private static class KeyComparator implements Comparator> { @Override public int compare(List lhs, List rhs) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java index 7c56691a3ae41..b1a8ab29044d4 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java @@ -11,6 +11,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.inject.name.Named; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; @@ -20,6 +21,7 @@ import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.DoubleArrayVector; import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.IntArrayVector; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; @@ -28,6 +30,7 @@ import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.HashAggregationOperator; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; @@ -1024,17 +1027,22 @@ private OrdsAndKeys hash(Block... values) { return result[0]; } - private void hash(Consumer callback, int emitBatchSize, Block... values) { - List specs = new ArrayList<>(values.length); - for (int c = 0; c < values.length; c++) { - specs.add(new HashAggregationOperator.GroupSpec(c, values[c].elementType())); + static BlockHash newBlockHash(boolean forcePackedHash, int emitBatchSize, List types) { + List specs = new ArrayList<>(types.size()); + for (int c = 0; c < types.size(); c++) { + specs.add(new HashAggregationOperator.GroupSpec(c, types.get(c))); } - MockBigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService()); - try ( - BlockHash blockHash = forcePackedHash - ? new PackedValuesBlockHash(specs, bigArrays, emitBatchSize) - : BlockHash.build(specs, bigArrays, emitBatchSize) - ) { + PageCacheRecycler recycler = new PageCacheRecycler(Settings.EMPTY); + CircuitBreakerService breakerService = new NoneCircuitBreakerService(); + MockBigArrays bigArrays = new MockBigArrays(recycler, breakerService); + return forcePackedHash + ? new PackedValuesBlockHash(specs, bigArrays, emitBatchSize) + : new BlockHash.Factory(bigArrays, recycler, () -> breakerService.getBreaker("test")).build(specs, emitBatchSize); + } + + private void hash(Consumer callback, int emitBatchSize, Block... values) { + List types = Arrays.stream(values).map(Block::elementType).toList(); + try (BlockHash blockHash = newBlockHash(forcePackedHash, emitBatchSize, types)) { hash(true, blockHash, callback, values); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator64Tests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator64Tests.java new file mode 100644 index 0000000000000..7bb84dadb5019 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator64Tests.java @@ -0,0 +1,396 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation.blockhash; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.compute.data.LongArrayVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.IntUnaryOperator; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.nullValue; + +public class Ordinator64Tests extends ESTestCase { + @ParametersFactory + public static List params() { + List params = new ArrayList<>(); + for (AddType addType : AddType.values()) { + params.add(new Object[] { addType, "tiny", 5, 0, 1, 1 }); + params.add(new Object[] { addType, "small", Ordinator64.INITIAL_CAPACITY / 2, 0, 1, 1 }); + params.add(new Object[] { addType, "two key pages", PageCacheRecycler.PAGE_SIZE_IN_BYTES / Long.BYTES, 1, 2, 1 }); + params.add(new Object[] { addType, "two id pages", PageCacheRecycler.PAGE_SIZE_IN_BYTES / Integer.BYTES, 2, 4, 2 }); + params.add(new Object[] { addType, "many", PageCacheRecycler.PAGE_SIZE_IN_BYTES, 4, 16, 8 }); + params.add(new Object[] { addType, "huge", 100_000, 6, 64, 32 }); + } + return params; + } + + private enum AddType { + SINGLE_VALUE, + ARRAY, + BUILDER; + } + + private final AddType addType; + private final String name; + private final int count; + private final int expectedGrowCount; + private final int expectedKeyPageCount; + private final int expectedIdPageCount; + + public Ordinator64Tests( + @Name("addType") AddType addType, + @Name("name") String name, + @Name("count") int count, + @Name("expectedGrowCount") int expectedGrowCount, + @Name("expectedKeyPageCount") int expectedKeyPageCount, + @Name("expectedIdPageCount") int expectedIdPageCount + ) { + this.addType = addType; + this.name = name; + this.count = count; + this.expectedGrowCount = expectedGrowCount; + this.expectedKeyPageCount = expectedKeyPageCount; + this.expectedIdPageCount = expectedIdPageCount; + } + + public void testValues() { + Set values = randomValues(count); + long[] v = values.stream().mapToLong(Long::longValue).toArray(); + + TestRecycler recycler = new TestRecycler(); + CircuitBreaker breaker = new NoopCircuitBreaker("test"); + try (Ordinator64 ord = new Ordinator64(recycler, breaker, new Ordinator64.IdSpace())) { + assertThat(ord.currentSize(), equalTo(0)); + + switch (addType) { + case SINGLE_VALUE -> { + for (int i = 0; i < v.length; i++) { + assertThat(ord.add(v[i]), equalTo(i)); + assertThat(ord.currentSize(), equalTo(i + 1)); + assertThat(ord.add(v[i]), equalTo(i)); + assertThat(ord.currentSize(), equalTo(i + 1)); + } + for (int i = 0; i < v.length; i++) { + assertThat(ord.add(v[i]), equalTo(i)); + } + assertThat(ord.currentSize(), equalTo(v.length)); + } + case ARRAY -> { + int[] target = new int[v.length]; + ord.add(v, target, v.length); + assertThat(target, equalTo(IntStream.range(0, count).toArray())); + assertThat(ord.currentSize(), equalTo(v.length)); + + Arrays.fill(target, 0); + ord.add(v, target, v.length); + assertThat(target, equalTo(IntStream.range(0, count).toArray())); + assertThat(ord.currentSize(), equalTo(v.length)); + } + case BUILDER -> { + LongBlock.Builder target = LongBlock.newBlockBuilder(count); + ord.add(v, target, v.length); + assertThat(target.build(), equalTo(new LongArrayVector(LongStream.range(0, count).toArray(), count).asBlock())); + assertThat(ord.currentSize(), equalTo(v.length)); + + target = LongBlock.newBlockBuilder(count); + ord.add(v, target, v.length); + assertThat(target.build(), equalTo(new LongArrayVector(LongStream.range(0, count).toArray(), count).asBlock())); + assertThat(ord.currentSize(), equalTo(v.length)); + } + default -> throw new IllegalArgumentException(); + } + for (int i = 0; i < v.length; i++) { + assertThat(ord.find(v[i]), equalTo(i)); + } + assertThat(ord.currentSize(), equalTo(v.length)); + assertThat(ord.find(randomValueOtherThanMany(values::contains, ESTestCase::randomLong)), equalTo(-1)); + + assertStatus(ord); + assertThat("Only currently used pages are open", recycler.open, hasSize(expectedKeyPageCount + expectedIdPageCount)); + + Long[] iterated = new Long[count]; + for (Ordinator64.Itr itr = ord.iterator(); itr.next();) { + assertThat(iterated[itr.id()], nullValue()); + iterated[itr.id()] = itr.key(); + } + for (int i = 0; i < v.length; i++) { + assertThat(iterated[i], equalTo(v[i])); + } + } + assertThat(recycler.open, hasSize(0)); + } + + public void testSharedIdSpace() { + Set leftValues = randomValues(count); + Set rightValues = randomValues(count); + long[] left = leftValues.stream().mapToLong(Long::longValue).toArray(); + long[] right = rightValues.stream().mapToLong(Long::longValue).toArray(); + + TestRecycler recycler = new TestRecycler(); + CircuitBreaker breaker = new NoopCircuitBreaker("test"); + Ordinator64.IdSpace idSpace = new Ordinator64.IdSpace(); + try ( + Ordinator64 leftOrd = new Ordinator64(recycler, breaker, idSpace); + Ordinator64 rightOrd = new Ordinator64(recycler, breaker, idSpace); + ) { + assertThat(leftOrd.currentSize(), equalTo(0)); + assertThat(rightOrd.currentSize(), equalTo(0)); + + IntUnaryOperator leftMap, rightMap, leftMapInverse, rightMapInverse; + switch (addType) { + case SINGLE_VALUE -> { + leftMap = i -> 2 * i; + rightMap = i -> 2 * i + 1; + leftMapInverse = i -> i / 2; + rightMapInverse = i -> (i - 1) / 2; + + for (int i = 0; i < count; i++) { + assertThat(leftOrd.add(left[i]), equalTo(2 * i)); + assertThat(leftOrd.currentSize(), equalTo(i + 1)); + assertThat(rightOrd.add(right[i]), equalTo(2 * i + 1)); + assertThat(rightOrd.currentSize(), equalTo(i + 1)); + + assertThat(leftOrd.add(left[i]), equalTo(2 * i)); + assertThat(leftOrd.currentSize(), equalTo(i + 1)); + assertThat(rightOrd.add(right[i]), equalTo(2 * i + 1)); + assertThat(rightOrd.currentSize(), equalTo(i + 1)); + } + for (int i = 0; i < count; i++) { + assertThat(leftOrd.add(left[i]), equalTo(2 * i)); + assertThat(leftOrd.currentSize(), equalTo(count)); + assertThat(rightOrd.add(right[i]), equalTo(2 * i + 1)); + assertThat(rightOrd.currentSize(), equalTo(count)); + } + assertThat(leftOrd.currentSize(), equalTo(count)); + assertThat(rightOrd.currentSize(), equalTo(count)); + } + case ARRAY -> { + leftMap = i -> i; + rightMap = i -> count + i; + leftMapInverse = i -> i; + rightMapInverse = i -> i - count; + + int[] target = new int[count]; + + leftOrd.add(left, target, count); + assertThat(target, equalTo(IntStream.range(0, count).toArray())); + assertThat(leftOrd.currentSize(), equalTo(count)); + + Arrays.fill(target, 0); + rightOrd.add(right, target, count); + assertThat(target, equalTo(IntStream.range(count, 2 * count).toArray())); + assertThat(leftOrd.currentSize(), equalTo(count)); + + Arrays.fill(target, 0); + leftOrd.add(left, target, count); + assertThat(target, equalTo(IntStream.range(0, count).toArray())); + assertThat(leftOrd.currentSize(), equalTo(count)); + + Arrays.fill(target, 0); + rightOrd.add(right, target, count); + assertThat(target, equalTo(IntStream.range(count, 2 * count).toArray())); + assertThat(leftOrd.currentSize(), equalTo(count)); + + for (int i = 0; i < count; i++) { + assertThat(leftOrd.find(left[i]), equalTo(i)); + assertThat(rightOrd.find(right[i]), equalTo(count + i)); + } + } + case BUILDER -> { + leftMap = i -> i; + rightMap = i -> count + i; + leftMapInverse = i -> i; + rightMapInverse = i -> i - count; + + LongBlock.Builder target = LongBlock.newBlockBuilder(count); + + leftOrd.add(left, target, count); + assertThat(target.build(), equalTo(new LongArrayVector(LongStream.range(0, count).toArray(), count).asBlock())); + assertThat(leftOrd.currentSize(), equalTo(count)); + + target = LongBlock.newBlockBuilder(count); + rightOrd.add(right, target, count); + assertThat(target.build(), equalTo(new LongArrayVector(LongStream.range(count, 2 * count).toArray(), count).asBlock())); + assertThat(leftOrd.currentSize(), equalTo(count)); + + target = LongBlock.newBlockBuilder(count); + leftOrd.add(left, target, count); + assertThat(target.build(), equalTo(new LongArrayVector(LongStream.range(0, count).toArray(), count).asBlock())); + assertThat(leftOrd.currentSize(), equalTo(count)); + + target = LongBlock.newBlockBuilder(count); + rightOrd.add(right, target, count); + assertThat(target.build(), equalTo(new LongArrayVector(LongStream.range(count, 2 * count).toArray(), count).asBlock())); + assertThat(leftOrd.currentSize(), equalTo(count)); + + for (int i = 0; i < count; i++) { + assertThat(leftOrd.find(left[i]), equalTo(i)); + assertThat(rightOrd.find(right[i]), equalTo(count + i)); + } + } + default -> throw new IllegalArgumentException(); + } + for (int i = 0; i < count; i++) { + assertThat(leftOrd.find(left[i]), equalTo(leftMap.applyAsInt(i))); + assertThat(rightOrd.find(right[i]), equalTo(rightMap.applyAsInt(i))); + } + + assertStatus(leftOrd); + assertStatus(rightOrd); + assertThat("Only currently used pages are open", recycler.open, hasSize(2 * (expectedKeyPageCount + expectedIdPageCount))); + + Long[] iterated = new Long[count]; + for (Ordinator64.Itr itr = leftOrd.iterator(); itr.next();) { + int id = leftMapInverse.applyAsInt(itr.id()); + assertThat(iterated[id], nullValue()); + iterated[id] = itr.key(); + } + for (int i = 0; i < left.length; i++) { + assertThat(iterated[i], equalTo(left[i])); + } + + iterated = new Long[count]; + for (Ordinator64.Itr itr = rightOrd.iterator(); itr.next();) { + int id = rightMapInverse.applyAsInt(itr.id()); + assertThat(iterated[id], nullValue()); + iterated[id] = itr.key(); + } + for (int i = 0; i < right.length; i++) { + assertThat(iterated[i], equalTo(right[i])); + } + } + assertThat(recycler.open, hasSize(0)); + } + + public void testBreaker() { + Set values = randomValues(count); + long[] v = values.stream().mapToLong(Long::longValue).toArray(); + + TestRecycler recycler = new TestRecycler(); + long breakAt = (expectedIdPageCount + expectedKeyPageCount) * PageCacheRecycler.PAGE_SIZE_IN_BYTES; + if (expectedGrowCount == 0) { + breakAt -= 10; + } + CircuitBreaker breaker = new MockBigArrays.LimitedBreaker("test", ByteSizeValue.ofBytes(breakAt)); + Exception e = expectThrows(CircuitBreakingException.class, () -> { + try (Ordinator64 ord = new Ordinator64(recycler, breaker, new Ordinator64.IdSpace())) { + switch (addType) { + case SINGLE_VALUE -> { + for (int i = 0; i < v.length; i++) { + assertThat(ord.add(v[i]), equalTo(i)); + } + } + case ARRAY -> { + int[] target = new int[v.length]; + ord.add(v, target, v.length); + } + case BUILDER -> { + LongBlock.Builder target = LongBlock.newBlockBuilder(count); + ord.add(v, target, v.length); + } + default -> throw new IllegalArgumentException(); + } + } + }); + assertThat(e.getMessage(), equalTo("over test limit")); + assertThat(recycler.open, hasSize(0)); + } + + private void assertStatus(Ordinator64 ord) { + Ordinator.Status status = ord.status(); + assertThat(status.size(), equalTo(count)); + if (expectedGrowCount == 0) { + assertThat(status.growCount(), equalTo(0)); + assertThat(status.capacity(), equalTo(Ordinator64.INITIAL_CAPACITY)); + assertThat(status.nextGrowSize(), equalTo((int) (Ordinator64.INITIAL_CAPACITY * Ordinator64.SmallCore.FILL_FACTOR))); + } else { + assertThat(status.growCount(), equalTo(expectedGrowCount)); + assertThat(status.capacity(), equalTo(Ordinator64.INITIAL_CAPACITY << expectedGrowCount)); + assertThat( + status.nextGrowSize(), + equalTo((int) ((Ordinator64.INITIAL_CAPACITY << expectedGrowCount) * Ordinator64.BigCore.FILL_FACTOR)) + ); + + Ordinator.BigCoreStatus s = (Ordinator.BigCoreStatus) status; + assertThat(s.keyPages(), equalTo(expectedKeyPageCount)); + assertThat(s.idPages(), equalTo(expectedIdPageCount)); + } + } + + private Set randomValues(int count) { + Set values = new HashSet<>(); + while (values.size() < count) { + values.add(randomLong()); + } + return values; + } + + static class TestRecycler extends PageCacheRecycler { + private final List> open = new ArrayList<>(); + + TestRecycler() { + super(Settings.EMPTY); + } + + @Override + public Recycler.V bytePage(boolean clear) { + return new MyV<>(super.bytePage(clear)); + } + + @Override + public Recycler.V objectPage() { + return new MyV<>(super.objectPage()); + } + + class MyV implements Recycler.V { + private final Recycler.V delegate; + + MyV(Recycler.V delegate) { + this.delegate = delegate; + open.add(this); + } + + @Override + public T v() { + return delegate.v(); + } + + @Override + public boolean isRecycled() { + return delegate.isRecycled(); + } + + @Override + public void close() { + open.remove(this); + delegate.close(); + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java index 954a1f179f259..180b23c3caf92 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java @@ -7,8 +7,12 @@ package org.elasticsearch.compute.operator; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.compute.aggregation.MaxLongAggregatorFunction; import org.elasticsearch.compute.aggregation.MaxLongAggregatorFunctionSupplier; @@ -16,6 +20,7 @@ import org.elasticsearch.compute.aggregation.SumLongAggregatorFunction; import org.elasticsearch.compute.aggregation.SumLongAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.SumLongGroupingAggregatorFunctionTests; +import org.elasticsearch.compute.aggregation.blockhash.BlockHash; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongBlock; @@ -48,6 +53,9 @@ protected Operator.OperatorFactory simpleWithMode(BigArrays bigArrays, Aggregato sumChannels = maxChannels = List.of(1); } + // TODO pass this stuff into the method + PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY); + BlockHash.Factory blockHashFactory = new BlockHash.Factory(bigArrays, recycler, () -> new NoopCircuitBreaker("test")); return new HashAggregationOperator.HashAggregationOperatorFactory( List.of(new HashAggregationOperator.GroupSpec(0, ElementType.LONG)), List.of( @@ -55,7 +63,7 @@ protected Operator.OperatorFactory simpleWithMode(BigArrays bigArrays, Aggregato new MaxLongAggregatorFunctionSupplier(bigArrays, maxChannels).groupingAggregatorFactory(mode) ), randomPageSize(), - bigArrays + blockHashFactory ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MultivalueDedupeTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MultivalueDedupeTests.java index b616b9f9bff7e..74ec0195067fa 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MultivalueDedupeTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MultivalueDedupeTests.java @@ -7,13 +7,18 @@ package org.elasticsearch.compute.operator; +import com.carrotsearch.randomizedtesting.annotations.Name; import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BytesRefHash; -import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.compute.aggregation.blockhash.Ordinator; +import org.elasticsearch.compute.aggregation.blockhash.Ordinator64; import org.elasticsearch.compute.data.BasicBlockTests; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockTestUtils; @@ -29,8 +34,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.NavigableSet; import java.util.Set; import java.util.TreeSet; @@ -81,13 +88,13 @@ public static List params() { private final int maxDupsPerPosition; public MultivalueDedupeTests( - ElementType elementType, - int positionCount, - boolean nullAllowed, - int minValuesPerPosition, - int maxValuesPerPosition, - int minDupsPerPosition, - int maxDupsPerPosition + @Name("elementType") ElementType elementType, + @Name("positionCount") int positionCount, + @Name("nullAllowed") boolean nullAllowed, + @Name("minValuesPerPosition") int minValuesPerPosition, + @Name("maxValuesPerPosition") int maxValuesPerPosition, + @Name("minDupsPerPosition") int minDupsPerPosition, + @Name("maxDupesPerPosition") int maxDupsPerPosition ) { this.elementType = elementType; this.positionCount = positionCount; @@ -253,14 +260,14 @@ private void assertBooleanHash(Set previousValues, BasicBlockTests.Rand everSeen[2] = true; } LongBlock hashes = new MultivalueDedupeBoolean((BooleanBlock) b.block()).hash(everSeen); - List hashedValues = new ArrayList<>(); + Map hashedValues = new HashMap<>(); if (everSeen[1]) { - hashedValues.add(false); + hashedValues.put(1, false); } if (everSeen[2]) { - hashedValues.add(true); + hashedValues.put(2, true); } - assertHash(b, hashes, hashedValues.size(), previousValues, i -> hashedValues.get((int) i)); + assertHash(b, hashes, previousValues, hashedValues); } private void assertBytesRefHash(Set previousValues, BasicBlockTests.RandomBlock b) { @@ -268,66 +275,94 @@ private void assertBytesRefHash(Set previousValues, BasicBlockTests.Ra previousValues.stream().forEach(hash::add); MultivalueDedupe.HashResult hashes = new MultivalueDedupeBytesRef((BytesRefBlock) b.block()).hash(hash); assertThat(hashes.sawNull(), equalTo(b.values().stream().anyMatch(v -> v == null))); - assertHash(b, hashes.ords(), hash.size(), previousValues, i -> hash.get(i, new BytesRef())); + assertHash(b, hashes.ords(), previousValues, hashedValues(hash)); + } + + private Map hashedValues(BytesRefHash hash) { + Map result = new HashMap<>(); + for (int i = 0; i < hash.size(); i++) { + result.put(i + 1, hash.get(i, new BytesRef())); + } + return result; } private void assertIntHash(Set previousValues, BasicBlockTests.RandomBlock b) { - LongHash hash = new LongHash(1, BigArrays.NON_RECYCLING_INSTANCE); - previousValues.stream().forEach(hash::add); - MultivalueDedupe.HashResult hashes = new MultivalueDedupeInt((IntBlock) b.block()).hash(hash); + Ordinator64 ordinator = new Ordinator64( + new PageCacheRecycler(Settings.EMPTY), + new NoopCircuitBreaker("test"), + new Ordinator.IdSpace() + ); + previousValues.stream().forEach(ordinator::add); + MultivalueDedupe.HashResult hashes = new MultivalueDedupeInt((IntBlock) b.block()).hash(ordinator); assertThat(hashes.sawNull(), equalTo(b.values().stream().anyMatch(v -> v == null))); - assertHash(b, hashes.ords(), hash.size(), previousValues, i -> (int) hash.get(i)); + assertHash(b, hashes.ords(), previousValues, hashedValues(ordinator, Math::toIntExact)); } private void assertLongHash(Set previousValues, BasicBlockTests.RandomBlock b) { - LongHash hash = new LongHash(1, BigArrays.NON_RECYCLING_INSTANCE); - previousValues.stream().forEach(hash::add); - MultivalueDedupe.HashResult hashes = new MultivalueDedupeLong((LongBlock) b.block()).hash(hash); + Ordinator64 ordinator = new Ordinator64( + new PageCacheRecycler(Settings.EMPTY), + new NoopCircuitBreaker("test"), + new Ordinator.IdSpace() + ); + previousValues.stream().forEach(ordinator::add); + MultivalueDedupe.HashResult hashes = new MultivalueDedupeLong((LongBlock) b.block()).hash(ordinator); assertThat(hashes.sawNull(), equalTo(b.values().stream().anyMatch(v -> v == null))); - assertHash(b, hashes.ords(), hash.size(), previousValues, i -> hash.get(i)); + assertHash(b, hashes.ords(), previousValues, hashedValues(ordinator, i -> i)); } private void assertDoubleHash(Set previousValues, BasicBlockTests.RandomBlock b) { - LongHash hash = new LongHash(1, BigArrays.NON_RECYCLING_INSTANCE); - previousValues.stream().forEach(d -> hash.add(Double.doubleToLongBits(d))); - MultivalueDedupe.HashResult hashes = new MultivalueDedupeDouble((DoubleBlock) b.block()).hash(hash); + Ordinator64 ordinator = new Ordinator64( + new PageCacheRecycler(Settings.EMPTY), + new NoopCircuitBreaker("test"), + new Ordinator.IdSpace() + ); + previousValues.stream().forEach(d -> ordinator.add(Double.doubleToLongBits(d))); + MultivalueDedupe.HashResult hashes = new MultivalueDedupeDouble((DoubleBlock) b.block()).hash(ordinator); assertThat(hashes.sawNull(), equalTo(b.values().stream().anyMatch(v -> v == null))); - assertHash(b, hashes.ords(), hash.size(), previousValues, i -> Double.longBitsToDouble(hash.get(i))); + assertHash(b, hashes.ords(), previousValues, hashedValues(ordinator, Double::longBitsToDouble)); + } + + private Map hashedValues(Ordinator64 ordinator, LongFunction key) { + Map result = new HashMap<>(); + for (Ordinator64.Itr itr = ordinator.iterator(); itr.next();) { + result.put(itr.id(), key.apply(itr.key())); + } + return result; } private void assertHash( BasicBlockTests.RandomBlock b, - LongBlock hashes, - long hashSize, + LongBlock ordinals, Set previousValues, - LongFunction lookup + Map hashedValues ) { Set allValues = new HashSet<>(); allValues.addAll(previousValues); for (int p = 0; p < b.block().getPositionCount(); p++) { - assertThat(hashes.isNull(p), equalTo(false)); - int count = hashes.getValueCount(p); - int start = hashes.getFirstValueIndex(p); + assertThat(ordinals.isNull(p), equalTo(false)); + int count = ordinals.getValueCount(p); + int start = ordinals.getFirstValueIndex(p); List v = b.values().get(p); if (v == null) { assertThat(count, equalTo(1)); - assertThat(hashes.getLong(start), equalTo(0L)); + assertThat(ordinals.getLong(start), equalTo(0L)); return; } List actualValues = new ArrayList<>(count); int end = start + count; for (int i = start; i < end; i++) { - actualValues.add(lookup.apply(hashes.getLong(i) - 1)); + int id = Math.toIntExact(ordinals.getLong(i)); + Object key = hashedValues.get(id); + if (key == null) { + fail("can't find key for [" + id + "]"); + } + actualValues.add(key); } assertThat(actualValues, containsInAnyOrder(v.stream().collect(Collectors.toSet()).stream().sorted().toArray())); allValues.addAll(v); } - Set hashedValues = new HashSet<>((int) hashSize); - for (long i = 0; i < hashSize; i++) { - hashedValues.add(lookup.apply(i)); - } - assertThat(hashedValues, equalTo(allValues)); + assertThat(new HashSet<>(hashedValues.values()), equalTo(allValues)); } private int assertEncodedPosition(BasicBlockTests.RandomBlock b, BatchEncoder encoder, int position, int offset, int valueOffset) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java index ae443e552d725..dca60c9a79a55 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java @@ -156,7 +156,7 @@ else if (mode == AggregateExec.Mode.PARTIAL) { groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(), aggregatorFactories, context.pageSize(aggregateExec.estimatedRowSize()), - context.bigArrays() + context.blockHashFactory() ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 3a72bff0d0c82..ad79f6b6fc9fa 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -173,6 +173,7 @@ public final Operator.OperatorFactory ordinalGroupingOperatorFactory( attrSource.name(), aggregatorFactories, context.pageSize(aggregateExec.estimatedRowSize()), + context.blockHashFactory(), context.bigArrays() ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 5240bba21b017..01c9345be02b8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -7,9 +7,12 @@ package org.elasticsearch.xpack.esql.planner; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.compute.Describable; +import org.elasticsearch.compute.aggregation.blockhash.BlockHash; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.Page; @@ -41,6 +44,7 @@ import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator.ExchangeSourceOperatorFactory; import org.elasticsearch.core.Releasables; import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.EsqlUnsupportedOperationException; @@ -107,6 +111,8 @@ public class LocalExecutionPlanner { private final String sessionId; private final CancellableTask parentTask; private final BigArrays bigArrays; + private final PageCacheRecycler recycler; + private final CircuitBreakerService breakerService; private final EsqlConfiguration configuration; private final ExchangeSourceHandler exchangeSourceHandler; private final ExchangeSinkHandler exchangeSinkHandler; @@ -117,6 +123,8 @@ public LocalExecutionPlanner( String sessionId, CancellableTask parentTask, BigArrays bigArrays, + PageCacheRecycler recycler, + CircuitBreakerService breakerService, EsqlConfiguration configuration, ExchangeSourceHandler exchangeSourceHandler, ExchangeSinkHandler exchangeSinkHandler, @@ -126,6 +134,8 @@ public LocalExecutionPlanner( this.sessionId = sessionId; this.parentTask = parentTask; this.bigArrays = bigArrays; + this.recycler = recycler; + this.breakerService = breakerService; this.exchangeSourceHandler = exchangeSourceHandler; this.exchangeSinkHandler = exchangeSinkHandler; this.enrichLookupService = enrichLookupService; @@ -143,7 +153,9 @@ public LocalExecutionPlan plan(PhysicalPlan node) { configuration.pragmas().taskConcurrency(), configuration.pragmas().dataPartitioning(), configuration.pragmas().pageSize(), - bigArrays + bigArrays, + recycler, + breakerService ); PhysicalOperation physicalOperation = plan(node, context); @@ -652,7 +664,9 @@ public record LocalExecutionPlannerContext( int taskConcurrency, DataPartitioning dataPartitioning, int configuredPageSize, - BigArrays bigArrays + BigArrays bigArrays, + PageCacheRecycler recycler, + CircuitBreakerService breakerService ) { void addDriverFactory(DriverFactory driverFactory) { driverFactories.add(driverFactory); @@ -674,6 +688,13 @@ int pageSize(Integer estimatedRowSize) { } return Math.max(SourceOperator.MIN_TARGET_PAGE_SIZE, SourceOperator.TARGET_PAGE_SIZE / estimatedRowSize); } + + /** + * Builder {@link BlockHash} implementations for grouping grouping aggregations. + */ + BlockHash.Factory blockHashFactory() { + return new BlockHash.Factory(bigArrays, recycler, () -> breakerService.getBreaker(CircuitBreaker.REQUEST)); + } } record DriverSupplier(BigArrays bigArrays, PhysicalOperation physicalOperation) implements Function, Describable { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index b7b9b3b4021dc..2514dd73eba0a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.compute.data.Page; @@ -41,6 +42,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.search.SearchService; @@ -85,6 +87,8 @@ public class ComputeService { private static final Logger LOGGER = LogManager.getLogger(ComputeService.class); private final SearchService searchService; private final BigArrays bigArrays; + private final PageCacheRecycler recycler; + private final CircuitBreakerService breakerService; private final TransportService transportService; private final Executor esqlExecutor; private final DriverTaskRunner driverRunner; @@ -97,11 +101,15 @@ public ComputeService( ExchangeService exchangeService, EnrichLookupService enrichLookupService, ThreadPool threadPool, - BigArrays bigArrays + BigArrays bigArrays, + PageCacheRecycler recycler, + CircuitBreakerService breakerService ) { this.searchService = searchService; this.transportService = transportService; this.bigArrays = bigArrays.withCircuitBreaking(); + this.recycler = recycler; + this.breakerService = breakerService; transportService.registerRequestHandler( DATA_ACTION_NAME, ESQL_THREAD_POOL_NAME, @@ -239,6 +247,8 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, context.sessionId, task, bigArrays, + recycler, + breakerService, context.configuration, context.exchangeSource(), context.exchangeSink(), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 614277e9d7216..22c005eae00a8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -15,7 +15,9 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.SearchService; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; @@ -56,7 +58,9 @@ public TransportEsqlQueryAction( ExchangeService exchangeService, ClusterService clusterService, ThreadPool threadPool, - BigArrays bigArrays + BigArrays bigArrays, + PageCacheRecycler recycler, + CircuitBreakerService breakerService ) { // TODO replace SAME when removing workaround for https://github.com/elastic/elasticsearch/issues/97916 super(EsqlQueryAction.NAME, transportService, actionFilters, EsqlQueryRequest::new, ThreadPool.Names.SAME); @@ -72,7 +76,9 @@ public TransportEsqlQueryAction( exchangeService, enrichLookupService, threadPool, - bigArrays + bigArrays, + recycler, + breakerService ); this.settings = settings; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 003dbe47486c9..7a3ea1a105912 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -13,6 +13,8 @@ import org.elasticsearch.common.logging.HeaderWarning; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Driver; @@ -20,6 +22,8 @@ import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.Tuple; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.search.internal.SearchContext; @@ -324,13 +328,18 @@ private ActualResults executePlan() throws Exception { var parsed = parser.createStatement(testCase.query); var testDataset = testsDataset(parsed); + PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY); + CircuitBreakerService breakerService = new NoneCircuitBreakerService(); + BigArrays bigArrays = new BigArrays(recycler, breakerService, "bigarrays"); String sessionId = "csv-test"; ExchangeSourceHandler exchangeSource = new ExchangeSourceHandler(between(1, 64), threadPool.executor(ESQL_THREAD_POOL_NAME)); ExchangeSinkHandler exchangeSink = new ExchangeSinkHandler(between(1, 64), threadPool::relativeTimeInMillis); LocalExecutionPlanner executionPlanner = new LocalExecutionPlanner( sessionId, new CancellableTask(1, "transport", "esql", null, TaskId.EMPTY_TASK_ID, Map.of()), - BigArrays.NON_RECYCLING_INSTANCE, + bigArrays, + recycler, + breakerService, configuration, exchangeSource, exchangeSink, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index dab7b3ee41922..8f83f6677e3ab 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -19,11 +19,15 @@ import org.apache.lucene.tests.index.RandomIndexWriter; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator; import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.core.IOUtils; import org.elasticsearch.index.mapper.MapperServiceTestCase; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.test.TestSearchContext; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; @@ -109,10 +113,14 @@ private Matcher maxPageSizeMatcher(boolean estimatedRowSizeIsHuge, int } private LocalExecutionPlanner planner() throws IOException { + PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY); + CircuitBreakerService breakerService = new NoneCircuitBreakerService(); return new LocalExecutionPlanner( "test", null, BigArrays.NON_RECYCLING_INSTANCE, + recycler, + breakerService, config(), null, null, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java index c088cae6f20c9..97db1dea1755b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java @@ -8,7 +8,9 @@ package org.elasticsearch.xpack.esql.planner; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.Describable; import org.elasticsearch.compute.aggregation.GroupingAggregator; import org.elasticsearch.compute.aggregation.blockhash.BlockHash; @@ -82,6 +84,8 @@ public Operator.OperatorFactory ordinalGroupingOperatorFactory( aggregatorFactories, groupElementType, context.bigArrays(), + context.recycler(), + () -> context.breakerService().getBreaker("test"), attrSource.name() ); } @@ -233,6 +237,8 @@ private class TestOrdinalsGroupingAggregationOperatorFactory implements Operator private List aggregators; private ElementType groupElementType; private BigArrays bigArrays; + private PageCacheRecycler recycler; + private Supplier breaker; private String columnName; TestOrdinalsGroupingAggregationOperatorFactory( @@ -240,12 +246,16 @@ private class TestOrdinalsGroupingAggregationOperatorFactory implements Operator List aggregatorFactories, ElementType groupElementType, BigArrays bigArrays, + PageCacheRecycler recycler, + Supplier breaker, String name ) { this.groupByChannel = channelIndex; this.aggregators = aggregatorFactories; this.groupElementType = groupElementType; this.bigArrays = bigArrays; + this.recycler = recycler; + this.breaker = breaker; this.columnName = name; } @@ -255,9 +265,8 @@ public Operator get(DriverContext driverContext) { int pageSize = random.nextBoolean() ? randomIntBetween(random, 1, 16) : randomIntBetween(random, 1, 10 * 1024); return new TestHashAggregationOperator( aggregators, - () -> BlockHash.build( + () -> new BlockHash.Factory(bigArrays, recycler, breaker).build( List.of(new HashAggregationOperator.GroupSpec(groupByChannel, groupElementType)), - bigArrays, pageSize ), columnName, From cd77d2c8df70f3d0aad23318644786f7c9f7b214 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 22 Aug 2023 13:59:36 -0400 Subject: [PATCH 2/2] More docs --- .../compute/aggregation/blockhash/Ordinator.java | 4 ++++ .../compute/aggregation/blockhash/Ordinator64.java | 9 +++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator.java index d2b6416d7943b..74a6f3947f8c1 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator.java @@ -22,6 +22,10 @@ import java.util.ArrayList; import java.util.List; +/** + * Superclass of table to assign {@code int} ids to various key types, + * vending the ids in order they are added. + */ public abstract class Ordinator { protected final PageCacheRecycler recycler; protected final CircuitBreaker breaker; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator64.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator64.java index f3ce8d3ec910c..4fc29ab207ddf 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator64.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/Ordinator64.java @@ -24,7 +24,7 @@ import java.util.Arrays; /** - * Assigns {@code int} ids to {@code long}s, vending the in order they are added. + * Assigns {@code int} ids to {@code long}s, vending the ids in order they are added. *

* At it's core there are two hash table implementations, a "small core" and * a "big core". The "small core" is a simple @@ -348,7 +348,12 @@ private int id(int slot) { } /** - * A Swisstable inspired hashtable. + * A Swisstable inspired hashtable. This differs from the normal swisstable + * in because it's adapted to Elasticsearch's {@link PageCacheRecycler}. + * The keys and ids are stored many {@link PageCacheRecycler#PAGE_SIZE_IN_BYTES} + * arrays, with the keys separated from the values. This is mostly so that we + * can be sure the array and offset into the array can be calculated by right + * shifts. */ class BigCore extends Core { static final float FILL_FACTOR = 0.85F;