Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move horizon planning to operators #11622

Merged
merged 8 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 6 additions & 26 deletions go/vt/vtgate/planbuilder/gen4_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,28 +202,18 @@ func newBuildSelectPlan(
return nil, nil, err
}

logical, err := operators.CreateLogicalOperatorFromAST(ctx, selStmt)
op, err := operators.PlanQuery(ctx, selStmt)
if err != nil {
return nil, nil, err
}

physOp, err := operators.TransformToPhysical(ctx, logical)
if err != nil {
return nil, nil, err
}

plan, err := transformToLogicalPlan(ctx, physOp, true)
plan, err := transformToLogicalPlan(ctx, op, true)
if err != nil {
return nil, nil, err
}

plan = optimizePlan(plan)

plan, err = planHorizon(ctx, plan, selStmt, true)
if err != nil {
return nil, nil, err
}

sel, isSel := selStmt.(*sqlparser.Select)
if isSel {
if err := setMiscFunc(plan, sel); err != nil {
Expand Down Expand Up @@ -312,17 +302,12 @@ func gen4UpdateStmtPlanner(

ctx := plancontext.NewPlanningContext(reservedVars, semTable, vschema, version)

logical, err := operators.CreateLogicalOperatorFromAST(ctx, updStmt)
op, err := operators.PlanQuery(ctx, updStmt)
if err != nil {
return nil, err
}

physOp, err := operators.TransformToPhysical(ctx, logical)
if err != nil {
return nil, err
}

plan, err := transformToLogicalPlan(ctx, physOp, true)
plan, err := transformToLogicalPlan(ctx, op, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -395,17 +380,12 @@ func gen4DeleteStmtPlanner(
}

ctx := plancontext.NewPlanningContext(reservedVars, semTable, vschema, version)
logical, err := operators.CreateLogicalOperatorFromAST(ctx, deleteStmt)
if err != nil {
return nil, err
}

physOp, err := operators.TransformToPhysical(ctx, logical)
op, err := operators.PlanQuery(ctx, deleteStmt)
if err != nil {
return nil, err
}

plan, err := transformToLogicalPlan(ctx, physOp, true)
plan, err := transformToLogicalPlan(ctx, op, true)
if err != nil {
return nil, err
}
Expand Down
72 changes: 51 additions & 21 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,53 @@ func transformToLogicalPlan(ctx *plancontext.PlanningContext, op operators.Opera
ASTPredicate: ast,
},
}, nil
case *operators.Horizon:
return transformHorizon(ctx, op, isRoot)
}

return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unknown type encountered: %T (transformToLogicalPlan)", op)
}

func transformHorizon(ctx *plancontext.PlanningContext, op *operators.Horizon, isRoot bool) (logicalPlan, error) {
source, err := transformToLogicalPlan(ctx, op.Source, isRoot)
if err != nil {
return nil, err
}
switch node := op.Select.(type) {
case *sqlparser.Select:
hp := horizonPlanning{
sel: node,
}

replaceSubQuery(ctx, node)
plan, err := hp.planHorizon(ctx, source, true)
if err != nil {
return nil, err
}
return planLimit(node.Limit, plan)
case *sqlparser.Union:
var err error
rb, isRoute := source.(*routeGen4)
if !isRoute && ctx.SemTable.NotSingleRouteErr != nil {
return nil, ctx.SemTable.NotSingleRouteErr
}
var plan logicalPlan
if isRoute && rb.isSingleShard() {
err = planSingleShardRoutePlan(node, rb)
plan = rb
} else {
plan, err = planOrderByOnUnion(ctx, source, node)
}
if err != nil {
return nil, err
}

return planLimit(node.Limit, plan)
default:
panic("only SELECT and UNION implement the SelectStatement interface")
}
}

func transformApplyJoinPlan(ctx *plancontext.PlanningContext, n *operators.ApplyJoin) (logicalPlan, error) {
lhs, err := transformToLogicalPlan(ctx, n.LHS, false)
if err != nil {
Expand Down Expand Up @@ -433,10 +475,10 @@ func pushWeightStringForDistinct(ctx *plancontext.PlanningContext, plan logicalP
}

func transformAndMerge(ctx *plancontext.PlanningContext, op *operators.Union) (sources []logicalPlan, err error) {
for i, source := range op.Sources {
for _, source := range op.Sources {
// first we go over all the operator inputs and turn them into logical plans,
// including horizon planning
plan, err := createLogicalPlan(ctx, source, op.SelectStmts[i])
plan, err := transformToLogicalPlan(ctx, source, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -481,7 +523,7 @@ func transformAndMerge(ctx *plancontext.PlanningContext, op *operators.Union) (s
func transformAndMergeInOrder(ctx *plancontext.PlanningContext, op *operators.Union) (sources []logicalPlan, err error) {
// We go over all the input operators and turn them into logical plans
for i, source := range op.Sources {
plan, err := createLogicalPlan(ctx, source, op.SelectStmts[i])
plan, err := transformToLogicalPlan(ctx, source, false)
if err != nil {
return nil, err
}
Expand All @@ -504,27 +546,15 @@ func transformAndMergeInOrder(ctx *plancontext.PlanningContext, op *operators.Un
return sources, nil
}

func createLogicalPlan(ctx *plancontext.PlanningContext, source operators.Operator, selStmt *sqlparser.Select) (logicalPlan, error) {
plan, err := transformToLogicalPlan(ctx, source, false)
if err != nil {
return nil, err
}
if selStmt != nil {
plan, err = planHorizon(ctx, plan, selStmt, true)
if err != nil {
return nil, err
}
if err := setMiscFunc(plan, selStmt); err != nil {
return nil, err
}
}
return plan, nil
}

func getCollationsFor(ctx *plancontext.PlanningContext, n *operators.Union) []collations.ID {
// TODO: coerce selects' select expressions' collations
var colls []collations.ID
for _, expr := range n.SelectStmts[0].SelectExprs {

sel, err := n.GetSelectFor(0)
if err != nil {
return nil
}
for _, expr := range sel.SelectExprs {
aliasedE, ok := expr.(*sqlparser.AliasedExpr)
if !ok {
return nil
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/planbuilder/operators/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewApplyJoin(lhs, rhs Operator, predicate sqlparser.Expr, leftOuterJoin boo
func (a *ApplyJoin) IPhysical() {}

// Clone implements the Operator interface
func (a *ApplyJoin) Clone(inputs []Operator) Operator {
func (a *ApplyJoin) clone(inputs []Operator) Operator {
Comment on lines 70 to +71
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can also change the comment Clone -> clone

checkSize(inputs, 2)
return &ApplyJoin{
LHS: inputs[0],
Expand All @@ -86,8 +86,8 @@ func (a *ApplyJoin) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparse
return addPredicate(a, ctx, expr, false)
}

// Inputs implements the Operator interface
func (a *ApplyJoin) Inputs() []Operator {
// inputs implements the Operator interface
func (a *ApplyJoin) inputs() []Operator {
return []Operator{a.LHS, a.RHS}
}

Expand Down
18 changes: 10 additions & 8 deletions go/vt/vtgate/planbuilder/operators/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,18 @@ func (f *Filter) compact(*plancontext.PlanningContext) (Operator, bool, error) {

func (u *Union) compact(*plancontext.PlanningContext) (Operator, bool, error) {
var newSources []Operator
var newSels []*sqlparser.Select
anythingChanged := false
for i, source := range u.Sources {
other, isUnion := source.(*Union)
if !isUnion {
for _, source := range u.Sources {
var other *Union
horizon, ok := source.(*Horizon)
if ok {
union, ok := horizon.Source.(*Union)
if ok {
other = union
}
}
if other == nil {
newSources = append(newSources, source)
newSels = append(newSels, u.SelectStmts[i])
continue
}
anythingChanged = true
Expand All @@ -65,16 +70,13 @@ func (u *Union) compact(*plancontext.PlanningContext) (Operator, bool, error) {
case u.Distinct:
// if the current UNION is a DISTINCT, we can safely ignore everything from children UNIONs, except LIMIT
newSources = append(newSources, other.Sources...)
newSels = append(newSels, other.SelectStmts...)

default:
newSources = append(newSources, other)
newSels = append(newSels, nil)
}
}
if anythingChanged {
u.Sources = newSources
u.SelectStmts = newSels
}
return u, anythingChanged, nil
}
Expand Down
16 changes: 8 additions & 8 deletions go/vt/vtgate/planbuilder/operators/correlated_subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ var _ PhysicalOperator = (*CorrelatedSubQueryOp)(nil)
// IPhysical implements the PhysicalOperator interface
func (s *SubQueryOp) IPhysical() {}

// Clone implements the Operator interface
func (s *SubQueryOp) Clone(inputs []Operator) Operator {
// clone implements the Operator interface
func (s *SubQueryOp) clone(inputs []Operator) Operator {
checkSize(inputs, 2)
result := &SubQueryOp{
Outer: inputs[0],
Expand All @@ -62,16 +62,16 @@ func (s *SubQueryOp) Clone(inputs []Operator) Operator {
return result
}

// Inputs implements the Operator interface
func (s *SubQueryOp) Inputs() []Operator {
// inputs implements the Operator interface
func (s *SubQueryOp) inputs() []Operator {
return []Operator{s.Outer, s.Inner}
}

// IPhysical implements the PhysicalOperator interface
func (c *CorrelatedSubQueryOp) IPhysical() {}

// Clone implements the Operator interface
func (c *CorrelatedSubQueryOp) Clone(inputs []Operator) Operator {
// clone implements the Operator interface
func (c *CorrelatedSubQueryOp) clone(inputs []Operator) Operator {
checkSize(inputs, 2)
columns := make([]*sqlparser.ColName, len(c.LHSColumns))
copy(columns, c.LHSColumns)
Expand All @@ -90,7 +90,7 @@ func (c *CorrelatedSubQueryOp) Clone(inputs []Operator) Operator {
return result
}

// Inputs implements the Operator interface
func (c *CorrelatedSubQueryOp) Inputs() []Operator {
// inputs implements the Operator interface
func (c *CorrelatedSubQueryOp) inputs() []Operator {
return []Operator{c.Outer, c.Inner}
}
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operators/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func (d *Delete) Introduces() semantics.TableSet {
// IPhysical implements the PhysicalOperator interface
func (d *Delete) IPhysical() {}

// Clone implements the Operator interface
func (d *Delete) Clone(inputs []Operator) Operator {
// clone implements the Operator interface
func (d *Delete) clone(inputs []Operator) Operator {
checkSize(inputs, 0)
return &Delete{
QTable: d.QTable,
Expand Down
23 changes: 13 additions & 10 deletions go/vt/vtgate/planbuilder/operators/derived.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package operators

import (
"golang.org/x/exp/slices"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
Expand All @@ -42,15 +44,16 @@ var _ PhysicalOperator = (*Derived)(nil)
func (d *Derived) IPhysical() {}

// Clone implements the Operator interface
func (d *Derived) Clone(inputs []Operator) Operator {
func (d *Derived) clone(inputs []Operator) Operator {
checkSize(inputs, 1)
clone := *d
clone.Source = inputs[0]
clone.ColumnAliases = sqlparser.CloneColumns(d.ColumnAliases)
clone.Columns = append([]*sqlparser.ColName{}, d.Columns...)
clone.ColumnsOffset = make([]int, 0, len(d.ColumnsOffset))
copy(clone.ColumnsOffset, d.ColumnsOffset)
return &clone
return &Derived{
Source: inputs[0],
Query: d.Query,
Alias: d.Alias,
ColumnAliases: sqlparser.CloneColumns(d.ColumnAliases),
Columns: slices.Clone(d.Columns),
ColumnsOffset: slices.Clone(d.ColumnsOffset),
}
}

// findOutputColumn returns the index on which the given name is found in the slice of
Expand Down Expand Up @@ -98,8 +101,8 @@ func (d *Derived) IsMergeable(ctx *plancontext.PlanningContext) bool {
return isMergeable(ctx, d.Query, d)
}

// Inputs implements the Operator interface
func (d *Derived) Inputs() []Operator {
// inputs implements the Operator interface
func (d *Derived) inputs() []Operator {
return []Operator{d.Source}
}

Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/planbuilder/operators/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func newFilter(op Operator, expr ...sqlparser.Expr) Operator {
// IPhysical implements the PhysicalOperator interface
func (f *Filter) IPhysical() {}

// Clone implements the Operator interface
func (f *Filter) Clone(inputs []Operator) Operator {
// clone implements the Operator interface
func (f *Filter) clone(inputs []Operator) Operator {
checkSize(inputs, 1)
predicatesClone := make([]sqlparser.Expr, len(f.Predicates))
copy(predicatesClone, f.Predicates)
Expand All @@ -49,8 +49,8 @@ func (f *Filter) Clone(inputs []Operator) Operator {
}
}

// Inputs implements the Operator interface
func (f *Filter) Inputs() []Operator {
// inputs implements the Operator interface
func (f *Filter) inputs() []Operator {
return []Operator{f.Source}
}

Expand Down
Loading