Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: support trace decorelate solver #31007

Merged
merged 8 commits into from
Dec 30, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions planner/core/logical_plan_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,33 @@ func (s *testPlanSuite) TestSingleRuleTraceStep(c *C) {
assertRuleName string
assertRuleSteps []assertTraceStep
}{
{
sql: "select * from t as t1 where t1.a < (select sum(t2.a) from t as t2 where t2.b = t1.b);",
flags: []uint64{flagDecorrelate, flagBuildKeyInfo, flagPrunColumns},
assertRuleName: "decorrelate",
assertRuleSteps: []assertTraceStep{
{
assertAction: "MaxOneRow_7 removed from plan tree",
assertReason: "",
},
{
assertAction: "Selection_4 removed from plan tree",
assertReason: "Selection_4's conditions have been pushed into Apply_8",
},
{
assertAction: "Apply_8 simplified into Join_8",
assertReason: "Join_8 hasn't any corelated column, thus the inner plan is non-correlated",
},
{
assertAction: "Aggregation_5 pulled up as Join_8's parent, and Join_8's join type becomes left outer join",
assertReason: "Aggregation_5's functions haven't any group by items and Join_8's join type isn't inner join or left outer join, and hasn't any conditions",
},
{
assertAction: "Projection_6 is moved as Aggregation_5's parent",
assertReason: "Join_8's join type is left outer join, not semi join",
},
},
},
{
sql: "select * from t as t1 join t as t2 on t1.a = t2.a where t1.a < 1;",
flags: []uint64{flagPredicatePushDown, flagBuildKeyInfo, flagPrunColumns},
Expand Down
145 changes: 144 additions & 1 deletion planner/core/rule_decorrelate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
package core

import (
"bytes"
"context"
"fmt"
"math"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/plancodec"
)

// canPullUpAgg checks if an apply can pull an aggregation up.
Expand Down Expand Up @@ -128,7 +131,9 @@ func (s *decorrelateSolver) optimize(ctx context.Context, p LogicalPlan, opt *lo
// If the inner plan is non-correlated, the apply will be simplified to join.
join := &apply.LogicalJoin
join.self = join
join.tp = plancodec.TypeJoin
p = join
appendApplySimplifiedTraceStep(apply, join, opt)
} else if sel, ok := innerPlan.(*LogicalSelection); ok {
// If the inner plan is a selection, we add this condition to join predicates.
// Notice that no matter what kind of join is, it's always right.
Expand All @@ -139,11 +144,13 @@ func (s *decorrelateSolver) optimize(ctx context.Context, p LogicalPlan, opt *lo
apply.AttachOnConds(newConds)
innerPlan = sel.children[0]
apply.SetChildren(outerPlan, innerPlan)
appendRemoveSelectionTraceStep(apply, sel, opt)
return s.optimize(ctx, p, opt)
} else if m, ok := innerPlan.(*LogicalMaxOneRow); ok {
if m.children[0].MaxOneRow() {
innerPlan = m.children[0]
apply.SetChildren(outerPlan, innerPlan)
appendRemoveMaxOneRowTraceStep(m, opt)
return s.optimize(ctx, p, opt)
}
} else if proj, ok := innerPlan.(*LogicalProjection); ok {
Expand All @@ -162,8 +169,10 @@ func (s *decorrelateSolver) optimize(ctx context.Context, p LogicalPlan, opt *lo
return nil, err
}
proj.SetChildren(np)
appendMoveProjTraceStep(apply, np, proj, opt)
return proj, nil
}
appendRemoveProjTraceStep(apply, proj, opt)
return s.optimize(ctx, p, opt)
} else if agg, ok := innerPlan.(*LogicalAggregation); ok {
if apply.canPullUpAgg() && agg.canPullUp() {
Expand All @@ -188,7 +197,6 @@ func (s *decorrelateSolver) optimize(ctx context.Context, p LogicalPlan, opt *lo
}
apply.SetSchema(expression.MergeSchema(expression.NewSchema(outerColsInSchema...), innerPlan.Schema()))
resetNotNullFlag(apply.schema, outerPlan.Schema().Len(), apply.schema.Len())

for i, aggFunc := range agg.AggFuncs {
aggArgs := make([]expression.Expression, 0, len(aggFunc.Args))
for _, arg := range aggFunc.Args {
Expand Down Expand Up @@ -219,6 +227,7 @@ func (s *decorrelateSolver) optimize(ctx context.Context, p LogicalPlan, opt *lo
return nil, err
}
agg.SetChildren(np)
appendPullUpAggTraceStep(apply, np, agg, opt)
// TODO: Add a Projection if any argument of aggregate funcs or group by items are scalar functions.
// agg.buildProjectionIfNecessary()
return agg, nil
Expand All @@ -245,6 +254,9 @@ func (s *decorrelateSolver) optimize(ctx context.Context, p LogicalPlan, opt *lo
// There's no other correlated column.
groupByCols := expression.NewSchema(agg.GetGroupByCols()...)
if len(apply.CorCols) == 0 {
appendedGroupByCols := expression.NewSchema()
var appendedAggFuncs []*aggregation.AggFuncDesc

join := &apply.LogicalJoin
join.EqualConditions = append(join.EqualConditions, eqCondWithCorCol...)
for _, eqCond := range eqCondWithCorCol {
Expand All @@ -258,16 +270,19 @@ func (s *decorrelateSolver) optimize(ctx context.Context, p LogicalPlan, opt *lo
agg.AggFuncs = append(agg.AggFuncs, newFunc)
agg.schema.Append(clonedCol)
agg.schema.Columns[agg.schema.Len()-1].RetType = newFunc.RetTp
appendedAggFuncs = append(appendedAggFuncs, newFunc)
}
// If group by cols don't contain the join key, add it into this.
if !groupByCols.Contains(clonedCol) {
agg.GroupByItems = append(agg.GroupByItems, clonedCol)
groupByCols.Append(clonedCol)
appendedGroupByCols.Append(clonedCol)
}
}
// The selection may be useless, check and remove it.
if len(sel.Conditions) == 0 {
agg.SetChildren(sel.children[0])
appendRemoveSelectionTraceStep(agg, sel, opt)
}
defaultValueMap := s.aggDefaultValueMap(agg)
// We should use it directly, rather than building a projection.
Expand All @@ -282,7 +297,9 @@ func (s *decorrelateSolver) optimize(ctx context.Context, p LogicalPlan, opt *lo
}
proj.SetChildren(apply)
p = proj
appendAddProjTraceStep(apply, proj, opt)
}
appendModifyAggTraceStep(outerPlan, apply, agg, sel, appendedGroupByCols, appendedAggFuncs, eqCondWithCorCol, opt)
return s.optimize(ctx, p, opt)
}
sel.Conditions = originalExpr
Expand All @@ -294,6 +311,7 @@ func (s *decorrelateSolver) optimize(ctx context.Context, p LogicalPlan, opt *lo
// the top level Sort has no effect on the subquery's result.
innerPlan = sort.children[0]
apply.SetChildren(outerPlan, innerPlan)
appendRemoveSortTraceStep(sort, opt)
return s.optimize(ctx, p, opt)
}
}
Expand All @@ -312,3 +330,128 @@ func (s *decorrelateSolver) optimize(ctx context.Context, p LogicalPlan, opt *lo
func (*decorrelateSolver) name() string {
return "decorrelate"
}

func appendApplySimplifiedTraceStep(p *LogicalApply, j *LogicalJoin, opt *logicalOptimizeOp) {
action := func() string {
return fmt.Sprintf("%v_%v simplified into %v_%v", plancodec.TypeApply, p.ID(), plancodec.TypeJoin, j.ID())
}
reason := func() string {
return fmt.Sprintf("%v_%v hasn't any corelated column, thus the inner plan is non-correlated", p.TP(), p.ID())
}
opt.appendStepToCurrent(p.ID(), p.TP(), reason, action)
}

func appendRemoveSelectionTraceStep(p LogicalPlan, s *LogicalSelection, opt *logicalOptimizeOp) {
action := func() string {
return fmt.Sprintf("%v_%v removed from plan tree", s.TP(), s.ID())
}
reason := func() string {
return fmt.Sprintf("%v_%v's conditions have been pushed into %v_%v", s.TP(), s.ID(), p.TP(), p.ID())
}
opt.appendStepToCurrent(s.ID(), s.TP(), reason, action)
}

func appendRemoveMaxOneRowTraceStep(m *LogicalMaxOneRow, opt *logicalOptimizeOp) {
action := func() string {
return fmt.Sprintf("%v_%v removed from plan tree", m.TP(), m.ID())
}
reason := func() string {
return ""
}
opt.appendStepToCurrent(m.ID(), m.TP(), reason, action)
}

func appendRemoveProjTraceStep(p *LogicalApply, proj *LogicalProjection, opt *logicalOptimizeOp) {
action := func() string {
return fmt.Sprintf("%v_%v removed from plan tree", proj.TP(), proj.ID())
}
reason := func() string {
return fmt.Sprintf("%v_%v's columns all substituted into %v_%v", proj.TP(), proj.ID(), p.TP(), p.ID())
}
opt.appendStepToCurrent(proj.ID(), proj.TP(), reason, action)
}

func appendMoveProjTraceStep(p *LogicalApply, np LogicalPlan, proj *LogicalProjection, opt *logicalOptimizeOp) {
action := func() string {
return fmt.Sprintf("%v_%v is moved as %v_%v's parent", proj.TP(), proj.ID(), np.TP(), np.ID())
}
reason := func() string {
return fmt.Sprintf("%v_%v's join type is %v, not semi join", p.TP(), p.ID(), p.JoinType.String())
}
opt.appendStepToCurrent(proj.ID(), proj.TP(), reason, action)
}

func appendRemoveSortTraceStep(sort *LogicalSort, opt *logicalOptimizeOp) {
action := func() string {
return fmt.Sprintf("%v_%v removed from plan tree", sort.TP(), sort.ID())
}
reason := func() string {
return ""
}
opt.appendStepToCurrent(sort.ID(), sort.TP(), reason, action)
}

func appendPullUpAggTraceStep(p *LogicalApply, np LogicalPlan, agg *LogicalAggregation, opt *logicalOptimizeOp) {
action := func() string {
return fmt.Sprintf("%v_%v pulled up as %v_%v's parent, and %v_%v's join type becomes %v",
agg.TP(), agg.ID(), np.TP(), np.ID(), p.TP(), p.ID(), p.JoinType.String())
}
reason := func() string {
return fmt.Sprintf("%v_%v's functions haven't any group by items and %v_%v's join type isn't %v or %v, and hasn't any conditions",
agg.TP(), agg.ID(), p.TP(), p.ID(), InnerJoin.String(), LeftOuterJoin.String())
}
opt.appendStepToCurrent(agg.ID(), agg.TP(), reason, action)
}

func appendAddProjTraceStep(p *LogicalApply, proj *LogicalProjection, opt *logicalOptimizeOp) {
action := func() string {
return fmt.Sprintf("%v_%v is added as %v_%v's parent", proj.TP(), proj.ID(), p.TP(), p.ID())
}
reason := func() string {
return ""
}
opt.appendStepToCurrent(proj.ID(), proj.TP(), reason, action)
}

func appendModifyAggTraceStep(outerPlan LogicalPlan, p *LogicalApply, agg *LogicalAggregation, sel *LogicalSelection,
appendedGroupByCols *expression.Schema, appendedAggFuncs []*aggregation.AggFuncDesc,
eqCondWithCorCol []*expression.ScalarFunction, opt *logicalOptimizeOp) {
action := func() string {
buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v's groupby items added [", agg.TP(), agg.ID()))
for i, col := range appendedGroupByCols.Columns {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(col.String())
}
buffer.WriteString("], and functions added [")
for i, f := range appendedAggFuncs {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(f.String())
}
buffer.WriteString(fmt.Sprintf("], and %v_%v's conditions added [", p.TP(), p.ID()))
for i, cond := range eqCondWithCorCol {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(cond.String())
}
buffer.WriteString("]")
return buffer.String()
}
reason := func() string {
buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v's equal conditions [", sel.TP(), sel.ID()))
for i, cond := range eqCondWithCorCol {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(cond.String())
}
buffer.WriteString(fmt.Sprintf("] are correlated to %v_%v and pulled up as %v_%v's join key",
outerPlan.TP(), outerPlan.ID(), p.TP(), p.ID()))
return buffer.String()
}
opt.appendStepToCurrent(agg.ID(), agg.TP(), reason, action)
}