Skip to content

Commit

Permalink
Improve de-correlation of grouped distinct aggregations
Browse files Browse the repository at this point in the history
A grouped distinct aggregation is planned as two AggregationNodes:
one being the "distinct operator", and one doing the actual aggregation.

This change improves query performance in the case when two such
AggregationNodes are present in a correlated subquery.

Before this change, both AggregationNodes were extracted from the subquery,
and moved on top of the de-correlated JoinNode. After this change,
the "distinct operator" is not moved from the subquery if it can be
de-correlated by the PlanNodeDecorrelator. Leaving the "distinct operator"
in the subquery unblocks other optimizations, e.g. `PushAggregationThroughOuterJoin`.

This change affects the following rules:
`TransformCorrelatedGroupedAggregationWithProjection`,
`TransformCorrelatedGroupedAggregationWithoutProjection`.
A similar change was introduced for global distint aggregations, in rules:
`TransformCorrelatedGlobalAggregationWithProjection`.
`TransformCorrelatedGlobalAggregationWithoutProjection`.
  • Loading branch information
kasiafi committed May 31, 2022
1 parent d9bb461 commit 68e32d1
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@
/**
* This rule decorrelates a correlated subquery of INNER correlated join with:
* - single grouped aggregation, or
* - grouped aggregation over distinct operator (grouped aggregation with no aggregation assignments)
* - grouped aggregation over distinct operator (grouped aggregation with no aggregation assignments),
* in case when the distinct operator cannot be de-correlated by PlanNodeDecorrelator
* <p>
* In the case of single aggregation, it transforms:
* <pre>
Expand Down Expand Up @@ -141,19 +142,24 @@ public Pattern<CorrelatedJoinNode> getPattern()
@Override
public Result apply(CorrelatedJoinNode correlatedJoinNode, Captures captures, Context context)
{
// if there is another aggregation below the AggregationNode, handle both
PlanNode source = captures.get(SOURCE);

// if we fail to decorrelate the nested plan, and it contains a distinct operator, we can extract and special-handle the distinct operator
AggregationNode distinct = null;
if (isDistinctOperator(source)) {
distinct = (AggregationNode) source;
source = distinct.getSource();
}

// decorrelate nested plan
PlanNodeDecorrelator decorrelator = new PlanNodeDecorrelator(plannerContext, context.getSymbolAllocator(), context.getLookup());
Optional<PlanNodeDecorrelator.DecorrelatedNode> decorrelatedSource = decorrelator.decorrelateFilters(source, correlatedJoinNode.getCorrelation());
if (decorrelatedSource.isEmpty()) {
return Result.empty();
// we failed to decorrelate the nested plan, so check if we can extract a distinct operator from the nested plan
if (isDistinctOperator(source)) {
distinct = (AggregationNode) source;
source = distinct.getSource();
decorrelatedSource = decorrelator.decorrelateFilters(source, correlatedJoinNode.getCorrelation());
}
if (decorrelatedSource.isEmpty()) {
return Result.empty();
}
}

source = decorrelatedSource.get().getNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
/**
* This rule decorrelates a correlated subquery of INNER correlated join with:
* - single grouped aggregation, or
* - grouped aggregation over distinct operator (grouped aggregation with no aggregation assignments)
* - grouped aggregation over distinct operator (grouped aggregation with no aggregation assignments),
* in case when the distinct operator cannot be de-correlated by PlanNodeDecorrelator
* It is similar to TransformCorrelatedGroupedAggregationWithProjection rule, but does not support projection over aggregation in the subquery
* <p>
* In the case of single aggregation, it transforms:
Expand Down Expand Up @@ -132,19 +133,24 @@ public Pattern<CorrelatedJoinNode> getPattern()
@Override
public Result apply(CorrelatedJoinNode correlatedJoinNode, Captures captures, Context context)
{
// if there is another aggregation below the AggregationNode, handle both
PlanNode source = captures.get(SOURCE);

// if we fail to decorrelate the nested plan, and it contains a distinct operator, we can extract and special-handle the distinct operator
AggregationNode distinct = null;
if (isDistinctOperator(source)) {
distinct = (AggregationNode) source;
source = distinct.getSource();
}

// decorrelate nested plan
PlanNodeDecorrelator decorrelator = new PlanNodeDecorrelator(plannerContext, context.getSymbolAllocator(), context.getLookup());
Optional<PlanNodeDecorrelator.DecorrelatedNode> decorrelatedSource = decorrelator.decorrelateFilters(source, correlatedJoinNode.getCorrelation());
if (decorrelatedSource.isEmpty()) {
return Result.empty();
// we failed to decorrelate the nested plan, so check if we can extract a distinct operator from the nested plan
if (isDistinctOperator(source)) {
distinct = (AggregationNode) source;
source = distinct.getSource();
decorrelatedSource = decorrelator.decorrelateFilters(source, correlatedJoinNode.getCorrelation());
}
if (decorrelatedSource.isEmpty()) {
return Result.empty();
}
}

source = decorrelatedSource.get().getNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,38 @@ public void testCorrelatedDistinctAggregationRewriteToLeftOuterJoin()
anyTree(node(ValuesNode.class))))));
}

@Test
public void testCorrelatedDistinctGropuedAggregationRewriteToLeftOuterJoin()
{
assertPlan(
"SELECT (SELECT count(DISTINCT o.orderkey) FROM orders o WHERE c.custkey = o.custkey GROUP BY o.orderstatus), c.custkey FROM customer c",
output(
project(filter(
"(CASE \"is_distinct\" WHEN true THEN true ELSE CAST(fail(28, 'Scalar sub-query has returned multiple rows') AS boolean) END)",
project(markDistinct(
"is_distinct",
ImmutableList.of("unique"),
join(
LEFT,
ImmutableList.of(equiJoinClause("c_custkey", "o_custkey")),
project(assignUniqueId(
"unique",
tableScan("customer", ImmutableMap.of("c_custkey", "custkey")))),
project(aggregation(
singleGroupingSet("o_orderstatus", "o_custkey"),
ImmutableMap.of(Optional.of("count"), functionCall("count", ImmutableList.of("o_orderkey"))),
Optional.empty(),
SINGLE,
project(aggregation(
singleGroupingSet("o_orderstatus", "o_orderkey", "o_custkey"),
ImmutableMap.of(),
Optional.empty(),
FINAL,
anyTree(tableScan(
"orders",
ImmutableMap.of("o_orderkey", "orderkey", "o_orderstatus", "orderstatus", "o_custkey", "custkey"))))))))))))));
}

@Test
public void testRemovesTrivialFilters()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,50 @@ public void rewritesOnSubqueryWithDistinct()
"true",
values("a", "b")))))));
}

@Test
public void rewritesOnSubqueryWithDecorrelatableDistinct()
{
// distinct aggregation can be decorrelated in the subquery by PlanNodeDecorrelator
// because the correlated predicate is equality comparison
tester().assertThat(new TransformCorrelatedGroupedAggregationWithProjection(tester().getPlannerContext()))
.on(p -> p.correlatedJoin(
ImmutableList.of(p.symbol("corr")),
p.values(p.symbol("corr")),
INNER,
PlanBuilder.expression("true"),
p.project(
Assignments.of(p.symbol("expr_sum"), PlanBuilder.expression("sum + 1"), p.symbol("expr_count"), PlanBuilder.expression("count - 1")),
p.aggregation(outerBuilder -> outerBuilder
.singleGroupingSet(p.symbol("a"))
.addAggregation(p.symbol("sum"), PlanBuilder.expression("sum(a)"), ImmutableList.of(BIGINT))
.addAggregation(p.symbol("count"), PlanBuilder.expression("count()"), ImmutableList.of())
.source(p.aggregation(innerBuilder -> innerBuilder
.singleGroupingSet(p.symbol("a"))
.source(p.filter(
PlanBuilder.expression("b = corr"),
p.values(p.symbol("a"), p.symbol("b"))))))))))
.matches(
project(ImmutableMap.of("corr", expression("corr"), "expr_sum", expression("sum_agg + 1"), "expr_count", expression("count_agg - 1")),
aggregation(
singleGroupingSet("corr", "unique", "a"),
ImmutableMap.of(Optional.of("sum_agg"), functionCall("sum", ImmutableList.of("a")), Optional.of("count_agg"), functionCall("count", ImmutableList.of())),
Optional.empty(),
SINGLE,
join(
JoinNode.Type.INNER,
ImmutableList.of(),
Optional.of("b = corr"),
assignUniqueId(
"unique",
values("corr")),
aggregation(
singleGroupingSet("a", "b"),
ImmutableMap.of(),
Optional.empty(),
SINGLE,
filter(
"true",
values("a", "b")))))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,48 @@ public void rewritesOnSubqueryWithDistinct()
"true",
values("a", "b")))))));
}

@Test
public void rewritesOnSubqueryWithDecorrelatableDistinct()
{
// distinct aggregation can be decorrelated in the subquery by PlanNodeDecorrelator
// because the correlated predicate is equality comparison
tester().assertThat(new TransformCorrelatedGroupedAggregationWithoutProjection(tester().getPlannerContext()))
.on(p -> p.correlatedJoin(
ImmutableList.of(p.symbol("corr")),
p.values(p.symbol("corr")),
INNER,
PlanBuilder.expression("true"),
p.aggregation(outerBuilder -> outerBuilder
.singleGroupingSet(p.symbol("a"))
.addAggregation(p.symbol("sum"), PlanBuilder.expression("sum(a)"), ImmutableList.of(BIGINT))
.addAggregation(p.symbol("count"), PlanBuilder.expression("count()"), ImmutableList.of())
.source(p.aggregation(innerBuilder -> innerBuilder
.singleGroupingSet(p.symbol("a"))
.source(p.filter(
PlanBuilder.expression("b = corr"),
p.values(p.symbol("a"), p.symbol("b")))))))))
.matches(
project(ImmutableMap.of("corr", expression("corr"), "sum_agg", expression("sum_agg"), "count_agg", expression("count_agg")),
aggregation(
singleGroupingSet("corr", "unique", "a"),
ImmutableMap.of(Optional.of("sum_agg"), functionCall("sum", ImmutableList.of("a")), Optional.of("count_agg"), functionCall("count", ImmutableList.of())),
Optional.empty(),
SINGLE,
join(
JoinNode.Type.INNER,
ImmutableList.of(),
Optional.of("b = corr"),
assignUniqueId(
"unique",
values("corr")),
aggregation(
singleGroupingSet("a", "b"),
ImmutableMap.of(),
Optional.empty(),
SINGLE,
filter(
"true",
values("a", "b")))))));
}
}

0 comments on commit 68e32d1

Please sign in to comment.