Skip to content

Commit

Permalink
xform: use ordering from LIMIT as a hint for streaming group by
Browse files Browse the repository at this point in the history
Fixes #93410

A query with a grouped aggregation, a LIMIT and an ORDER BY
may not always explore the best-cost query plan.

Due to the existence of unique constraints on a table, the set of
grouping columns may be reduced during normalization via rule
ReduceGroupingCols such that it no longer includes columns
present in the ORDER BY clause. This eliminates possibly cheap
plans from consideration, for example, if the input to the
aggregation is a lookup join, it may be cheaper to sort the
input to the lookup join on the ORDER BY columns if they overlap
with the grouping columns, so that a streaming group-by with no
TopK operator can be used, and a full scan of the inputs to
the join is avoided.

This fix adds a new exploration rule which ensures that a grouped
aggregation with a LIMIT and ORDER BY clause considers using
streaming group-by with no TopK when possible.

Release note (bug fix): This patch fixes join queries involving
tables with unique constraints using LIMIT, GROUP BY and ORDER BY
clauses to ensure the optimizer considers streaming group-by with
no TopK operation, when possible. This is often the most efficient
query plan.
  • Loading branch information
Mark Sirek committed Dec 20, 2022
1 parent 9890516 commit 04eb457
Show file tree
Hide file tree
Showing 10 changed files with 1,054 additions and 414 deletions.
19 changes: 9 additions & 10 deletions pkg/sql/opt/exec/execbuilder/testdata/tpch_vec
Original file line number Diff line number Diff line change
Expand Up @@ -20906,17 +20906,16 @@ EXPLAIN (VEC) SELECT c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, s
----
└ Node 1
└ *colexec.topKSorter
└ *colexec.limitOp
└ *colexec.hashAggregator
└ *colexecjoin.hashJoiner
├ *colexecjoin.mergeJoinInnerOp
│ ├ *colfetcher.ColBatchScan
│ └ *colexecjoin.mergeJoinLeftSemiOp
│ ├ *colfetcher.ColBatchScan
│ └ *colexecsel.selGTFloat64Float64ConstOp
│ └ *colexec.orderedAggregator
│ └ *colfetcher.ColBatchScan
└ *colfetcher.ColBatchScan
└ *rowexec.joinReader
└ *rowexec.joinReader
└ *colexec.sortOp
└ *colexecjoin.mergeJoinLeftSemiOp
├ *colfetcher.ColBatchScan
└ *colexecsel.selGTFloat64Float64ConstOp
└ *colexec.orderedAggregator
└ *colfetcher.ColBatchScan

# Query 19
query T
Expand Down
272 changes: 129 additions & 143 deletions pkg/sql/opt/memo/testdata/stats_quality/tpch/q18

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions pkg/sql/opt/norm/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,28 @@ func (f *Factory) DisableOptimizations() {
f.NotifyOnMatchedRule(func(opt.RuleName) bool { return false })
}

// DisableOptimizationRules disables a specific set of transformation rules.
func (f *Factory) DisableOptimizationRules(disabledRules intsets.Fast) {
f.NotifyOnMatchedRule(func(rule opt.RuleName) bool {
if disabledRules.Contains(int(rule)) {
return false
} else {
return true
}
})
}

// DisableOptimizationRulesTemporarily disables a specific set transformation
// rules during the execution of the given function fn. A MatchedRuleFunc
// previously set by NotifyOnMatchedRule is not invoked during execution of fn,
// but will be invoked for future rule matches after fn returns.
func (f *Factory) DisableOptimizationRulesTemporarily(disabledRules intsets.Fast, fn func()) {
originalMatchedRule := f.matchedRule
f.DisableOptimizationRules(disabledRules)
fn()
f.matchedRule = originalMatchedRule
}

// DisableOptimizationsTemporarily disables all transformation rules during the
// execution of the given function fn. A MatchedRuleFunc previously set by
// NotifyOnMatchedRule is not invoked during execution of fn, but will be
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/opt/testutils/opttester/testfixtures/rewrite_stats.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ DATABASE=$2
FILENAME=$3

tables=($($COCKROACH_BINARY sql --insecure --format=tsv \
-e "USE $DATABASE; SELECT table_name FROM [SHOW TABLES] ORDER BY table_name;"))
-e "USE $DATABASE; SELECT table_name FROM [SHOW TABLES] ORDER BY table_name;" | tail -n +2))
tables=("${tables[@]:1}")

echo "Writing statistics to $FILENAME"
Expand All @@ -38,7 +38,7 @@ do
# 4. Remove the double quote at the end of the JSON.
# 5. Append '; to the last line.
$COCKROACH_BINARY sql --insecure --format=tsv \
-e "USE $DATABASE; SELECT jsonb_pretty(statistics) FROM [SHOW STATISTICS USING JSON FOR TABLE \"$table\"];" \
-e "USE $DATABASE; SELECT jsonb_pretty(statistics) FROM [SHOW STATISTICS USING JSON FOR TABLE \"$table\"];" | tail -n +2 \
| sed '1d' | sed 's/""/"/g' | sed 's/^"\[/\[/g' | sed 's/\]"$/\]/g' | sed '$ s/$/'\'';/' >> $FILENAME
echo "----" >> $FILENAME
echo "" >> $FILENAME
Expand Down
124 changes: 124 additions & 0 deletions pkg/sql/opt/xform/groupby_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/opt/ordering"
"github.com/cockroachdb/cockroach/pkg/sql/opt/props"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/intsets"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -172,6 +173,129 @@ func (c *CustomFuncs) MakeProjectFromPassthroughAggs(
}, grp)
}

// GroupingColsClosureOverlappingOrdering determines if the specified `ordering`
// columns overlaps with the closure of the grouping columns in `private` as
// determined by the functional dependencies of the `input` relation to the
// grouped aggregation. If found, this expanded set of `groupingCols` is
// returned, plus the overlapping ordering columns `newOrdering`, along with
// `ok`=true.
func (c *CustomFuncs) GroupingColsClosureOverlappingOrdering(
input memo.RelExpr, private *memo.GroupingPrivate, ordering props.OrderingChoice,
) (groupingCols opt.ColSet, newOrdering props.OrderingChoice, ok bool) {
if ordering.Any() {
// If the limit specifies no ordering, there is no ordering hint for us to
// use.
return opt.ColSet{}, props.OrderingChoice{}, false
}
groupingCols = private.GroupingCols
// If the result requires a specific ordering, use that as the ordering of the
// aggregation if possible.
// Find all columns determined by the grouping columns.
groupingColumnsClosure := input.Relational().FuncDeps.ComputeClosure(groupingCols)
// It is safe to add ordering columns present in the grouping column closure
// as grouping columns because the original grouping columns determine all
// other column values in the closure (within a group there is only one
// combined set of values for the other columns). Doing so allows the required
// ordering to be provided by the streaming group by and possibly remove the
// requirement for a TopK operator.
orderingColsInClosure := groupingColumnsClosure.Intersection(ordering.ColSet())
if orderingColsInClosure.Empty() {
// If we have no columns to add to the grouping, this rewrite has no effect.
return opt.ColSet{}, props.OrderingChoice{}, false
}
groupingCols = groupingCols.Union(orderingColsInClosure)
newOrdering, fullPrefix, found := getPrefixFromOrdering(ordering.ToOrdering(), private.Ordering, input,
func(id opt.ColumnID) bool { return groupingCols.Contains(id) })
if !found || !fullPrefix {
return opt.ColSet{}, props.OrderingChoice{}, false
}
return groupingCols, newOrdering, true
}

// GenerateStreamingGroupByLimitOrderingHint generates a LimitExpr with an input
// which is a GroupBy or DistinctOn expression with an expanded set of
// `groupingCols` which is an overlap of the closure of the original grouping
// columns closure and the ordering in the `limitExpr` as determined by
// `GroupingColsClosureOverlappingOrdering`, which also produces the
// `newOrdering`. Argument `private` is expected to be a canonical group-by.
func (c *CustomFuncs) GenerateStreamingGroupByLimitOrderingHint(
grp memo.RelExpr,
limitExpr *memo.LimitExpr,
aggregation memo.RelExpr,
input memo.RelExpr,
aggs memo.AggregationsExpr,
private *memo.GroupingPrivate,
groupingCols opt.ColSet,
newOrdering props.OrderingChoice,
) {
newPrivate := *private
newPrivate.Ordering = newOrdering
newPrivate.GroupingCols = groupingCols

// Remove constant column aggregate expressions if they've been added to
// the grouping columns. The column should appear in one place or the other,
// but not both.
newAggs := make(memo.AggregationsExpr, 0, len(aggs))
for _, agg := range aggs {
if !groupingCols.Contains(agg.Col) {
newAggs = append(newAggs, agg)
continue
}
constAggExpr, ok := agg.Agg.(*memo.ConstAggExpr)
if !ok {
newAggs = append(newAggs, agg)
continue
}
variableExpr, ok := constAggExpr.Input.(*memo.VariableExpr)
if !ok {
newAggs = append(newAggs, agg)
continue
}
if variableExpr.Col != agg.Col {
// Column IDs expected to match to safely remove this aggregation.
return
}
}
// Output columns are the union of grouping columns with columns from the
// aggregate projection list. Verify this is built correctly.
outputCols := groupingCols.Copy()
for i := range newAggs {
outputCols.Add(newAggs[i].Col)
}
if !aggregation.Relational().OutputCols.Equals(outputCols) {
// If the output columns in the new aggregation don't match those in the
// original aggregation, give up on this optimization.
return
}

var newAggregation memo.RelExpr
constructAggregation := func() {
newAggregation =
c.e.f.DynamicConstruct(
aggregation.Op(),
input,
&newAggs,
&newPrivate,
).(memo.RelExpr)
}
var disabledRules intsets.Fast
// The ReduceGroupingCols rule must be disabled to prevent the ordering
// columns from being removed from the grouping columns during operation
// construction. This rule already reduced the grouping columns on the initial
// construction. We are just adding back in any ordering columns which overlap
// with grouping columns in order to generate a better plan.
disabledRules.Add(int(opt.ReduceGroupingCols))

c.e.f.DisableOptimizationRulesTemporarily(disabledRules, constructAggregation)
newLimitExpr :=
&memo.LimitExpr{
Input: newAggregation,
Limit: limitExpr.Limit,
Ordering: limitExpr.Ordering,
}
grp.Memo().AddLimitToGroup(newLimitExpr, grp)
}

// GenerateStreamingGroupBy generates variants of a GroupBy, DistinctOn,
// EnsureDistinctOn, UpsertDistinctOn, or EnsureUpsertDistinctOn expression
// with more specific orderings on the grouping columns, using the interesting
Expand Down
36 changes: 36 additions & 0 deletions pkg/sql/opt/xform/rules/limit.opt
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,39 @@
(TopK $input:* $private:*)
=>
(GeneratePartialOrderTopK $input $private)

# GenerateStreamingGroupByLimitOrderingHint generates streaming group-by and
# distinct-on aggregations with an ordering matching the ordering specified in
# the Limit Op. The goal is to eliminate the need for a TopK operation.
[GenerateStreamingGroupByLimitOrderingHint, Explore]
(Limit
$aggregation:(GroupBy | DistinctOn
$input:*
$aggs:*
$private:* & (IsCanonicalGroupBy $private)
)
(Const $limit:* & (IsPositiveInt $limit))
$ordering:* &
(Let
(
$groupingCols
$newOrdering
$ok
):(GroupingColsClosureOverlappingOrdering
$input
$private
$ordering
)
$ok
)
)
=>
(GenerateStreamingGroupByLimitOrderingHint
(Root)
$aggregation
$input
$aggs
$private
$groupingCols
$newOrdering
)
Loading

0 comments on commit 04eb457

Please sign in to comment.