Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: New operator InsertionSelection to adhere to the operator model #14286

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 45 additions & 13 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,53 @@ func transformToLogicalPlan(ctx *plancontext.PlanningContext, op ops.Operator) (
return transformFkCascade(ctx, op)
case *operators.FkVerify:
return transformFkVerify(ctx, op)
case *operators.InsertSelection:
return transformInsertionSelection(ctx, op)
}

return nil, vterrors.VT13001(fmt.Sprintf("unknown type encountered: %T (transformToLogicalPlan)", op))
}

func transformInsertionSelection(ctx *plancontext.PlanningContext, op *operators.InsertSelection) (logicalPlan, error) {
rb, isRoute := op.InsertionOp.(*operators.Route)
if !isRoute {
return nil, vterrors.VT13001(fmt.Sprintf("Incorrect type encountered: %T (transformInsertionSelection)", op.InsertionOp))
}

stmt, dmlOp, err := operators.ToSQL(ctx, rb.Source)
if err != nil {
return nil, err
}

if stmtWithComments, ok := stmt.(sqlparser.Commented); ok && rb.Comments != nil {
stmtWithComments.SetComments(rb.Comments.GetComments())
}

ins := dmlOp.(*operators.Insert)
eins := &engine.Insert{
Opcode: mapToInsertOpCode(rb.Routing.OpCode(), true),
Keyspace: rb.Routing.Keyspace(),
TableName: ins.VTable.Name.String(),
Ignore: ins.Ignore,
ForceNonStreaming: op.ForceNonStreaming,
Generate: autoIncGenerate(ins.AutoIncrement),
ColVindexes: ins.ColVindexes,
VindexValues: ins.VindexValues,
VindexValueOffset: ins.VindexValueOffset,
}
lp := &insert{eInsert: eins}

eins.Prefix, eins.Mid, eins.Suffix = generateInsertShardedQuery(ins.AST)

selectionPlan, err := transformToLogicalPlan(ctx, op.SelectionOp)
if err != nil {
return nil, err
}
lp.source = selectionPlan

return lp, nil
}

// transformFkCascade transforms a FkCascade operator into a logical plan.
func transformFkCascade(ctx *plancontext.PlanningContext, fkc *operators.FkCascade) (logicalPlan, error) {
// We convert the parent operator to a logical plan.
Expand Down Expand Up @@ -460,11 +502,10 @@ func buildRouteLogicalPlan(ctx *plancontext.PlanningContext, op *operators.Route
func buildInsertLogicalPlan(ctx *plancontext.PlanningContext, rb *operators.Route, op ops.Operator, stmt *sqlparser.Insert) (logicalPlan, error) {
ins := op.(*operators.Insert)
eins := &engine.Insert{
Opcode: mapToInsertOpCode(rb.Routing.OpCode(), ins.Input != nil),
Opcode: mapToInsertOpCode(rb.Routing.OpCode(), false),
Keyspace: rb.Routing.Keyspace(),
TableName: ins.VTable.Name.String(),
Ignore: ins.Ignore,
ForceNonStreaming: ins.ForceNonStreaming,
Generate: autoIncGenerate(ins.AutoIncrement),
ColVindexes: ins.ColVindexes,
VindexValues: ins.VindexValues,
Expand All @@ -474,20 +515,11 @@ func buildInsertLogicalPlan(ctx *plancontext.PlanningContext, rb *operators.Rout

// we would need to generate the query on the fly. The only exception here is
// when unsharded query with autoincrement for that there is no input operator.
if eins.Opcode != engine.InsertUnsharded || ins.Input != nil {
if eins.Opcode != engine.InsertUnsharded {
eins.Prefix, eins.Mid, eins.Suffix = generateInsertShardedQuery(ins.AST)
}

if ins.Input == nil {
eins.Query = generateQuery(stmt)
} else {
newSrc, err := transformToLogicalPlan(ctx, ins.Input)
if err != nil {
return nil, err
}
lp.source = newSrc
}

eins.Query = generateQuery(stmt)
return lp, nil
}

Expand Down
49 changes: 13 additions & 36 deletions go/vt/vtgate/planbuilder/operators/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ type Insert struct {
AutoIncrement *Generate
// Ignore specifies whether to ignore duplicate key errors during insertion.
Ignore bool
// ForceNonStreaming when true, select first then insert, this is to avoid locking rows by select for insert.
ForceNonStreaming bool

// ColVindexes are the vindexes that will use the VindexValues or VindexValueOffset
ColVindexes []*vindexes.ColumnVindex
Expand All @@ -53,26 +51,11 @@ type Insert struct {
// that will appear in the result set of the select query.
VindexValueOffset [][]int

// Insert using select query will have select plan as input operator for the insert operation.
Input ops.Operator

noInputs
noColumns
noPredicates
}

func (i *Insert) Inputs() []ops.Operator {
if i.Input == nil {
return nil
}
return []ops.Operator{i.Input}
}

func (i *Insert) SetInputs(inputs []ops.Operator) {
if len(inputs) > 0 {
i.Input = inputs[0]
}
}

// Generate represents an auto-increment generator for the insert operation.
type Generate struct {
// Keyspace represents the keyspace information for the table.
Expand Down Expand Up @@ -102,18 +85,12 @@ func (i *Insert) GetOrdering(*plancontext.PlanningContext) []ops.OrderBy {

var _ ops.Operator = (*Insert)(nil)

func (i *Insert) Clone(inputs []ops.Operator) ops.Operator {
var input ops.Operator
if len(inputs) > 0 {
input = inputs[0]
}
func (i *Insert) Clone([]ops.Operator) ops.Operator {
return &Insert{
Input: input,
VTable: i.VTable,
AST: i.AST,
AutoIncrement: i.AutoIncrement,
Ignore: i.Ignore,
ForceNonStreaming: i.ForceNonStreaming,
ColVindexes: i.ColVindexes,
VindexValues: i.VindexValues,
VindexValueOffset: i.VindexValueOffset,
Expand Down Expand Up @@ -211,15 +188,12 @@ func createInsertOperator(ctx *plancontext.PlanningContext, insStmt *sqlparser.I
return nil, err
}
case sqlparser.SelectStatement:
route.Source, err = insertSelectPlan(ctx, insOp, insStmt, rows)
if err != nil {
return nil, err
}
return insertSelectPlan(ctx, insOp, route, insStmt, rows)
}
return route, nil
}

func insertSelectPlan(ctx *plancontext.PlanningContext, insOp *Insert, ins *sqlparser.Insert, sel sqlparser.SelectStatement) (*Insert, error) {
func insertSelectPlan(ctx *plancontext.PlanningContext, insOp *Insert, routeOp *Route, ins *sqlparser.Insert, sel sqlparser.SelectStatement) (*InsertSelection, error) {
if columnMismatch(insOp.AutoIncrement, ins, sel) {
return nil, vterrors.VT03006()
}
Expand All @@ -229,23 +203,26 @@ func insertSelectPlan(ctx *plancontext.PlanningContext, insOp *Insert, ins *sqlp
return nil, err
}

// select plan will be taken as input to insert rows into the table.
insOp.Input = selOp
// output of the select plan will be used to insert rows into the table.
insertSelect := &InsertSelection{
SelectionOp: selOp,
InsertionOp: routeOp,
}

// When the table you are steaming data from and table you are inserting from are same.
// When the table you are streaming data from and table you are inserting from are same.
// Then due to locking of the index range on the table we might not be able to insert into the table.
// Therefore, instead of streaming, this flag will ensure the records are first read and then inserted.
insertTbl := insOp.TablesUsed()[0]
selTables := TablesUsed(selOp)
for _, tbl := range selTables {
if insertTbl == tbl {
insOp.ForceNonStreaming = true
insertSelect.ForceNonStreaming = true
break
}
}

if len(insOp.ColVindexes) == 0 {
return insOp, nil
return insertSelect, nil
}

colVindexes := insOp.ColVindexes
Expand All @@ -266,7 +243,7 @@ func insertSelectPlan(ctx *plancontext.PlanningContext, insOp *Insert, ins *sqlp
}
}
insOp.VindexValueOffset = vv
return insOp, nil
return insertSelect, nil
}

func columnMismatch(gen *Generate, ins *sqlparser.Insert, sel sqlparser.SelectStatement) bool {
Expand Down
61 changes: 61 additions & 0 deletions go/vt/vtgate/planbuilder/operators/insert_selection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2023 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package operators

import (
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
)

// InsertSelection operator represents an INSERT into SELECT FROM query.
// It holds the operators for running the selection and insertion.
type InsertSelection struct {
SelectionOp ops.Operator
InsertionOp ops.Operator

// ForceNonStreaming when true, select first then insert, this is to avoid locking rows by select for insert.
ForceNonStreaming bool

noColumns
noPredicates
}

func (is *InsertSelection) Clone(inputs []ops.Operator) ops.Operator {
return &InsertSelection{
SelectionOp: inputs[0],
InsertionOp: inputs[1],
}
}

func (is *InsertSelection) Inputs() []ops.Operator {
return []ops.Operator{is.SelectionOp, is.InsertionOp}
}

func (is *InsertSelection) SetInputs(inputs []ops.Operator) {
is.SelectionOp = inputs[0]
is.InsertionOp = inputs[1]
}

func (is *InsertSelection) ShortDescription() string {
return ""
}

func (is *InsertSelection) GetOrdering(*plancontext.PlanningContext) []ops.OrderBy {
return nil
}

var _ ops.Operator = (*InsertSelection)(nil)