Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Disable optimizations with bad null handling #99434

Merged
merged 2 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ private static Operator operator(String grouping, String op, String dataType) {
};
return new HashAggregationOperator(
List.of(supplier(op, dataType, groups.size()).groupingAggregatorFactory(AggregatorMode.SINGLE)),
() -> BlockHash.build(groups, BIG_ARRAYS, 16 * 1024),
() -> BlockHash.build(groups, BIG_ARRAYS, 16 * 1024, false),
new DriverContext()
);
}
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/99434.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99434
summary: "ESQL: Disable optimizations with bad null handling"
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,21 @@ public abstract sealed class BlockHash implements Releasable, SeenGroupIds //
* Creates a specialized hash table that maps one or more {@link Block}s to ids.
* @param emitBatchSize maximum batch size to be emitted when handling combinatorial
* explosion of groups caused by multivalued fields
* @param allowBrokenOptimizations true ot allow optimizations with bad null handling. We will fix their
* null handling and remove this flag, but we need to disable these in
* production until we can. And this lets us continue to compile and
* test them.
*/
public static BlockHash build(List<HashAggregationOperator.GroupSpec> groups, BigArrays bigArrays, int emitBatchSize) {
public static BlockHash build(
List<HashAggregationOperator.GroupSpec> groups,
BigArrays bigArrays,
int emitBatchSize,
boolean allowBrokenOptimizations
) {
if (groups.size() == 1) {
return newForElementType(groups.get(0).channel(), groups.get(0).elementType(), bigArrays);
}
if (groups.size() == 2) {
if (allowBrokenOptimizations && groups.size() == 2) {
var g1 = groups.get(0);
var g2 = groups.get(1);
if (g1.elementType() == ElementType.LONG && g2.elementType() == ElementType.LONG) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public record HashAggregationOperatorFactory(
) implements OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
return new HashAggregationOperator(aggregators, () -> BlockHash.build(groups, bigArrays, maxPageSize), driverContext);
return new HashAggregationOperator(aggregators, () -> BlockHash.build(groups, bigArrays, maxPageSize, false), driverContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ private static class ValuesAggregator implements Releasable {
this.extractor = new ValuesSourceReaderOperator(sources, docChannel, groupingField);
this.aggregator = new HashAggregationOperator(
aggregatorFactories,
() -> BlockHash.build(List.of(new GroupSpec(channelIndex, sources.get(0).elementType())), bigArrays, maxPageSize),
() -> BlockHash.build(List.of(new GroupSpec(channelIndex, sources.get(0).elementType())), bigArrays, maxPageSize, false),
driverContext
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ public String toString() {
() -> BlockHash.build(
List.of(new HashAggregationOperator.GroupSpec(0, ElementType.BYTES_REF)),
bigArrays,
randomPageSize()
randomPageSize(),
false
),
driverContext
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private BlockHash newBlockHash(int emitBatchSize, List<ElementType> types) {
MockBigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService());
return forcePackedHash
? new PackedValuesBlockHash(specs, bigArrays, emitBatchSize)
: BlockHash.build(specs, bigArrays, emitBatchSize);
: BlockHash.build(specs, bigArrays, emitBatchSize, true);
}

private static class KeyComparator implements Comparator<List<?>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ private void hash(Consumer<OrdsAndKeys> callback, int emitBatchSize, Block... va
try (
BlockHash blockHash = forcePackedHash
? new PackedValuesBlockHash(specs, bigArrays, emitBatchSize)
: BlockHash.build(specs, bigArrays, emitBatchSize)
: BlockHash.build(specs, bigArrays, emitBatchSize, true)
) {
hash(true, blockHash, callback, values);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,23 +290,35 @@ null
;

byStringAndLong
from employees | eval trunk_worked_seconds = avg_worked_seconds / 100000000 * 100000000 | stats c = count(gender) by gender, trunk_worked_seconds | sort c desc;
FROM employees
| EVAL trunk_worked_seconds = avg_worked_seconds / 100000000 * 100000000
| STATS c = COUNT(gender) by gender, trunk_worked_seconds
| SORT c desc;

c:long | gender:keyword | trunk_worked_seconds:long
30 | M | 300000000
27 | M | 200000000
22 | F | 300000000
11 | F | 200000000
30 | M | 300000000
27 | M | 200000000
22 | F | 300000000
11 | F | 200000000
0 | null | 200000000
0 | null | 300000000
;

byStringAndLongWithAlias
from employees | eval trunk_worked_seconds = avg_worked_seconds / 100000000 * 100000000 | rename gender as g, trunk_worked_seconds as tws | keep g, tws | stats c = count(g) by g, tws | sort c desc;
FROM employees
| EVAL trunk_worked_seconds = avg_worked_seconds / 100000000 * 100000000
| RENAME gender as g, trunk_worked_seconds as tws
| KEEP g, tws
| STATS c = count(g) by g, tws
| SORT c desc;

c:long | g:keyword | tws:long
30 | M | 300000000
27 | M | 200000000
22 | F | 300000000
11 | F | 200000000
30 | M | 300000000
27 | M | 200000000
22 | F | 300000000
11 | F | 200000000
0 | null | 200000000
0 | null | 300000000
;

byStringAndString
Expand All @@ -324,35 +336,45 @@ c:long | gender:keyword | hire_year_str:keyword
;

byLongAndLong
from employees | eval trunk_worked_seconds = avg_worked_seconds / 100000000 * 100000000 | stats c = count(languages.long) by languages.long, trunk_worked_seconds | sort c desc, languages.long, trunk_worked_seconds;
FROM employees
| EVAL trunk_worked_seconds = avg_worked_seconds / 100000000 * 100000000
| STATS c = COUNT(languages.long) BY languages.long, trunk_worked_seconds
| SORT c DESC, languages.long, trunk_worked_seconds;

c:long | languages.long:long | trunk_worked_seconds:long
15 |5 |300000000
11 |2 |300000000
10 |4 |300000000
9 |3 |200000000
8 |1 |200000000
8 |2 |200000000
8 |3 |300000000
8 |4 |200000000
7 |1 |300000000
6 |5 |200000000
15 |5 |300000000
11 |2 |300000000
10 |4 |300000000
9 |3 |200000000
8 |1 |200000000
8 |2 |200000000
8 |3 |300000000
8 |4 |200000000
7 |1 |300000000
6 |5 |200000000
0 |null |200000000
0 |null |300000000
;

byUnmentionedLongAndLong
from employees | eval trunk_worked_seconds = avg_worked_seconds / 100000000 * 100000000 | stats c = count(gender) by languages.long, trunk_worked_seconds | sort c desc, trunk_worked_seconds;
FROM employees
| EVAL trunk_worked_seconds = avg_worked_seconds / 100000000 * 100000000
| STATS c = count(gender) by languages.long, trunk_worked_seconds
| SORT c desc, trunk_worked_seconds;

c:long | languages.long:long | trunk_worked_seconds:long
13 |5 |300000000
10 |2 |300000000
9 |3 |200000000
9 |4 |300000000
8 |4 |200000000
8 |3 |300000000
7 |1 |200000000
6 |2 |200000000
6 |1 |300000000
4 |5 |200000000
13 |5 |300000000
10 |2 |300000000
9 |3 |200000000
9 |4 |300000000
8 |4 |200000000
8 |3 |300000000
7 |1 |200000000
6 |2 |200000000
6 |null |300000000
6 |1 |300000000
4 |null |200000000
4 |5 |200000000
;

byUnmentionedIntAndLong
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ public Operator get(DriverContext driverContext) {
() -> BlockHash.build(
List.of(new HashAggregationOperator.GroupSpec(groupByChannel, groupElementType)),
bigArrays,
pageSize
pageSize,
false
),
columnName,
driverContext
Expand Down