Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: xform: use ordering from LIMIT as a hint for streaming group-by #94603

Merged
merged 2 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3348,6 +3348,10 @@ func (m *sessionDataMutator) SetOptimizerUseImprovedDisjunctionStats(val bool) {
m.data.OptimizerUseImprovedDisjunctionStats = val
}

func (m *sessionDataMutator) SetOptimizerUseLimitOrderingForStreamingGroupBy(val bool) {
m.data.OptimizerUseLimitOrderingForStreamingGroupBy = val
}

// Utility functions related to scrubbing sensitive information on SQL Stats.

// quantizeCounts ensures that the Count field in the
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -4760,6 +4760,7 @@ optimizer on
optimizer_use_forecasts on
optimizer_use_histograms on
optimizer_use_improved_disjunction_stats off
optimizer_use_limit_ordering_for_streaming_group_by off
optimizer_use_multicol_stats on
optimizer_use_not_visible_indexes off
override_multi_region_zone_config off
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -2806,6 +2806,7 @@ opt_split_scan_limit 2048 NULL
optimizer_use_forecasts on NULL NULL NULL string
optimizer_use_histograms on NULL NULL NULL string
optimizer_use_improved_disjunction_stats off NULL NULL NULL string
optimizer_use_limit_ordering_for_streaming_group_by off NULL NULL NULL string
optimizer_use_multicol_stats on NULL NULL NULL string
optimizer_use_not_visible_indexes off NULL NULL NULL string
override_multi_region_zone_config off NULL NULL NULL string
Expand Down Expand Up @@ -2943,6 +2944,7 @@ opt_split_scan_limit 2048 NULL
optimizer_use_forecasts on NULL user NULL on on
optimizer_use_histograms on NULL user NULL on on
optimizer_use_improved_disjunction_stats off NULL user NULL off off
optimizer_use_limit_ordering_for_streaming_group_by off NULL user NULL off off
optimizer_use_multicol_stats on NULL user NULL on on
optimizer_use_not_visible_indexes off NULL user NULL off off
override_multi_region_zone_config off NULL user NULL off off
Expand Down Expand Up @@ -3078,6 +3080,7 @@ optimizer NULL NULL NULL
optimizer_use_forecasts NULL NULL NULL NULL NULL
optimizer_use_histograms NULL NULL NULL NULL NULL
optimizer_use_improved_disjunction_stats NULL NULL NULL NULL NULL
optimizer_use_limit_ordering_for_streaming_group_by NULL NULL NULL NULL NULL
optimizer_use_multicol_stats NULL NULL NULL NULL NULL
optimizer_use_not_visible_indexes NULL NULL NULL NULL NULL
override_multi_region_zone_config NULL NULL NULL NULL NULL
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ opt_split_scan_limit 2048
optimizer_use_forecasts on
optimizer_use_histograms on
optimizer_use_improved_disjunction_stats off
optimizer_use_limit_ordering_for_streaming_group_by off
optimizer_use_multicol_stats on
optimizer_use_not_visible_indexes off
override_multi_region_zone_config off
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/opt/memo/memo.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ type Memo struct {
enforceHomeRegion bool
variableInequalityLookupJoinEnabled bool
useImprovedDisjunctionStats bool
useLimitOrderingForStreamingGroupBy bool

// curRank is the highest currently in-use scalar expression rank.
curRank opt.ScalarRank
Expand Down Expand Up @@ -213,6 +214,7 @@ func (m *Memo) Init(evalCtx *eval.Context) {
enforceHomeRegion: evalCtx.SessionData().EnforceHomeRegion,
variableInequalityLookupJoinEnabled: evalCtx.SessionData().VariableInequalityLookupJoinEnabled,
useImprovedDisjunctionStats: evalCtx.SessionData().OptimizerUseImprovedDisjunctionStats,
useLimitOrderingForStreamingGroupBy: evalCtx.SessionData().OptimizerUseLimitOrderingForStreamingGroupBy,
}
m.metadata.Init()
m.logPropsBuilder.init(evalCtx, m)
Expand Down Expand Up @@ -350,7 +352,8 @@ func (m *Memo) IsStale(
m.testingOptimizerDisableRuleProbability != evalCtx.SessionData().TestingOptimizerDisableRuleProbability ||
m.enforceHomeRegion != evalCtx.SessionData().EnforceHomeRegion ||
m.variableInequalityLookupJoinEnabled != evalCtx.SessionData().VariableInequalityLookupJoinEnabled ||
m.useImprovedDisjunctionStats != evalCtx.SessionData().OptimizerUseImprovedDisjunctionStats {
m.useImprovedDisjunctionStats != evalCtx.SessionData().OptimizerUseImprovedDisjunctionStats ||
m.useLimitOrderingForStreamingGroupBy != evalCtx.SessionData().OptimizerUseLimitOrderingForStreamingGroupBy {
return true, nil
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/opt/memo/memo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,12 @@ func TestMemoIsStale(t *testing.T) {
evalCtx.SessionData().VariableInequalityLookupJoinEnabled = false
notStale()

// Stale use limit ordering for streaming group by.
evalCtx.SessionData().OptimizerUseLimitOrderingForStreamingGroupBy = true
stale()
evalCtx.SessionData().OptimizerUseLimitOrderingForStreamingGroupBy = false
notStale()

// Stale testing_optimizer_random_seed.
evalCtx.SessionData().TestingOptimizerRandomSeed = 100
stale()
Expand Down
18 changes: 18 additions & 0 deletions pkg/sql/opt/norm/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,24 @@ func (f *Factory) DisableOptimizations() {
f.NotifyOnMatchedRule(func(opt.RuleName) bool { return false })
}

// DisableOptimizationRules disables a specific set of transformation rules.
func (f *Factory) DisableOptimizationRules(disabledRules util.FastIntSet) {
f.NotifyOnMatchedRule(func(rule opt.RuleName) bool {
return !disabledRules.Contains(int(rule))
})
}

// DisableOptimizationRulesTemporarily disables a specific set transformation
// rules during the execution of the given function fn. A MatchedRuleFunc
// previously set by NotifyOnMatchedRule is not invoked during execution of fn,
// but will be invoked for future rule matches after fn returns.
func (f *Factory) DisableOptimizationRulesTemporarily(disabledRules util.FastIntSet, fn func()) {
originalMatchedRule := f.matchedRule
f.DisableOptimizationRules(disabledRules)
fn()
f.matchedRule = originalMatchedRule
}

// DisableOptimizationsTemporarily disables all transformation rules during the
// execution of the given function fn. A MatchedRuleFunc previously set by
// NotifyOnMatchedRule is not invoked during execution of fn, but will be
Expand Down
128 changes: 128 additions & 0 deletions pkg/sql/opt/xform/groupby_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/opt/ordering"
"github.com/cockroachdb/cockroach/pkg/sql/opt/props"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -172,6 +173,133 @@ func (c *CustomFuncs) MakeProjectFromPassthroughAggs(
}, grp)
}

// GroupingColsClosureOverlappingOrdering determines if the specified `ordering`
// columns overlaps with the closure of the grouping columns in `private` as
// determined by the functional dependencies of the `input` relation to the
// grouped aggregation. If found, this expanded set of `groupingCols` is
// returned, plus the overlapping ordering columns `newOrdering`, along with
// `ok`=true.
func (c *CustomFuncs) GroupingColsClosureOverlappingOrdering(
input memo.RelExpr, private *memo.GroupingPrivate, ordering props.OrderingChoice,
) (groupingCols opt.ColSet, newOrdering props.OrderingChoice, ok bool) {
if ordering.Any() {
// If the limit specifies no ordering, there is no ordering hint for us to
// use.
return opt.ColSet{}, props.OrderingChoice{}, false
}
groupingCols = private.GroupingCols
// If the result requires a specific ordering, use that as the ordering of the
// aggregation if possible.
// Find all columns determined by the grouping columns.
groupingColumnsClosure := input.Relational().FuncDeps.ComputeClosure(groupingCols)
// It is safe to add ordering columns present in the grouping column closure
// as grouping columns because the original grouping columns determine all
// other column values in the closure (within a group there is only one
// combined set of values for the other columns). Doing so allows the required
// ordering to be provided by the streaming group by and possibly remove the
// requirement for a TopK operator.
orderingColsInClosure := groupingColumnsClosure.Intersection(ordering.ColSet())
if orderingColsInClosure.Empty() {
// If we have no columns to add to the grouping, this rewrite has no effect.
return opt.ColSet{}, props.OrderingChoice{}, false
}
groupingCols = groupingCols.Union(orderingColsInClosure)
newOrdering, fullPrefix, found := getPrefixFromOrdering(ordering.ToOrdering(), private.Ordering, input,
func(id opt.ColumnID) bool { return groupingCols.Contains(id) })
if !found || !fullPrefix {
return opt.ColSet{}, props.OrderingChoice{}, false
}
return groupingCols, newOrdering, true
}

// GenerateStreamingGroupByLimitOrderingHint generates a LimitExpr with an input
// which is a GroupBy or DistinctOn expression with an expanded set of
// `groupingCols` which is an overlap of the closure of the original grouping
// columns closure and the ordering in the `limitExpr` as determined by
// `GroupingColsClosureOverlappingOrdering`, which also produces the
// `newOrdering`. Argument `private` is expected to be a canonical group-by.
func (c *CustomFuncs) GenerateStreamingGroupByLimitOrderingHint(
grp memo.RelExpr,
limitExpr *memo.LimitExpr,
aggregation memo.RelExpr,
input memo.RelExpr,
aggs memo.AggregationsExpr,
private *memo.GroupingPrivate,
groupingCols opt.ColSet,
newOrdering props.OrderingChoice,
) {
if !c.e.evalCtx.SessionData().OptimizerUseLimitOrderingForStreamingGroupBy {
// This transformation rule is explicitly disabled.
return
}
newPrivate := *private
newPrivate.Ordering = newOrdering
newPrivate.GroupingCols = groupingCols

// Remove constant column aggregate expressions if they've been added to
// the grouping columns. The column should appear in one place or the other,
// but not both.
newAggs := make(memo.AggregationsExpr, 0, len(aggs))
for _, agg := range aggs {
if !groupingCols.Contains(agg.Col) {
newAggs = append(newAggs, agg)
continue
}
constAggExpr, ok := agg.Agg.(*memo.ConstAggExpr)
if !ok {
newAggs = append(newAggs, agg)
continue
}
variableExpr, ok := constAggExpr.Input.(*memo.VariableExpr)
if !ok {
newAggs = append(newAggs, agg)
continue
}
if variableExpr.Col != agg.Col {
// Column IDs expected to match to safely remove this aggregation.
return
}
}
// Output columns are the union of grouping columns with columns from the
// aggregate projection list. Verify this is built correctly.
outputCols := groupingCols.Copy()
for i := range newAggs {
outputCols.Add(newAggs[i].Col)
}
if !aggregation.Relational().OutputCols.Equals(outputCols) {
// If the output columns in the new aggregation don't match those in the
// original aggregation, give up on this optimization.
return
}

var newAggregation memo.RelExpr
constructAggregation := func() {
newAggregation =
c.e.f.DynamicConstruct(
aggregation.Op(),
input,
&newAggs,
&newPrivate,
).(memo.RelExpr)
}
var disabledRules util.FastIntSet
// The ReduceGroupingCols rule must be disabled to prevent the ordering
// columns from being removed from the grouping columns during operation
// construction. This rule already reduced the grouping columns on the initial
// construction. We are just adding back in any ordering columns which overlap
// with grouping columns in order to generate a better plan.
disabledRules.Add(int(opt.ReduceGroupingCols))

c.e.f.DisableOptimizationRulesTemporarily(disabledRules, constructAggregation)
newLimitExpr :=
&memo.LimitExpr{
Input: newAggregation,
Limit: limitExpr.Limit,
Ordering: limitExpr.Ordering,
}
grp.Memo().AddLimitToGroup(newLimitExpr, grp)
}

// GenerateStreamingGroupBy generates variants of a GroupBy, DistinctOn,
// EnsureDistinctOn, UpsertDistinctOn, or EnsureUpsertDistinctOn expression
// with more specific orderings on the grouping columns, using the interesting
Expand Down
36 changes: 36 additions & 0 deletions pkg/sql/opt/xform/rules/limit.opt
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,39 @@
(TopK $input:* $private:*)
=>
(GeneratePartialOrderTopK $input $private)

# GenerateStreamingGroupByLimitOrderingHint generates streaming group-by and
# distinct-on aggregations with an ordering matching the ordering specified in
# the Limit Op. The goal is to eliminate the need for a TopK operation.
[GenerateStreamingGroupByLimitOrderingHint, Explore]
(Limit
$aggregation:(GroupBy | DistinctOn
$input:*
$aggs:*
$private:* & (IsCanonicalGroupBy $private)
)
(Const $limit:* & (IsPositiveInt $limit))
$ordering:* &
(Let
(
$groupingCols
$newOrdering
$ok
):(GroupingColsClosureOverlappingOrdering
$input
$private
$ordering
)
$ok
)
)
=>
(GenerateStreamingGroupByLimitOrderingHint
(Root)
$aggregation
$input
$aggs
$private
$groupingCols
$newOrdering
)
Loading