Skip to content

Commit

Permalink
push down of semiJoin and uncorrelated subqueries
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Aug 9, 2023
1 parent 17c2d84 commit 14d62cb
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 15 deletions.
3 changes: 2 additions & 1 deletion go/vt/sqlparser/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sqlparser
import (
"vitess.io/vitess/go/mysql/datetime"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vtgate/engine/opcode"
)

/*
Expand Down Expand Up @@ -2488,7 +2489,7 @@ type (
// CAUTION: you should only change argName and hasValuesArg through the setter methods
ExtractedSubquery struct {
Original Expr // original expression that was replaced by this ExtractedSubquery
OpCode int // this should really be engine.PulloutOpCode, but we cannot depend on engine :(
OpCode opcode.PulloutOpcode
Subquery *Subquery
OtherSide Expr // represents the side of the comparison, this field will be nil if Original is not a comparison
Merged bool // tells whether we need to rewrite this subquery to Original or not
Expand Down
4 changes: 2 additions & 2 deletions go/vt/sqlparser/ast_equals.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions go/vt/vtgate/planbuilder/operators/aggregation_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package operators
import (
"fmt"

"golang.org/x/exp/slices"

"vitess.io/vitess/go/slice"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine/opcode"
Expand All @@ -43,6 +46,11 @@ func tryPushingDownAggregator(ctx *plancontext.PlanningContext, aggregator *Aggr
if ctx.DelegateAggregation {
output, applyResult, err = pushDownAggregationThroughFilter(ctx, aggregator, src)
}

case *SemiJoin:
if ctx.DelegateAggregation {
output, applyResult, err = pushDownAggregationThroughSemiJoin(aggregator, src)
}
default:
return aggregator, rewrite.SameTree, nil
}
Expand Down Expand Up @@ -215,6 +223,36 @@ withNextColumn:
return aggregator, rewrite.NewTree("push aggregation under filter - keep original", aggregator), nil
}

// pushDownAggregationThroughSemiJoin is similar to pushDownAggregationThroughJoin, but it's simpler,
// because we don't get any inputs from the RHS, so there are no aggregations or groupings that have
// to be sent to the RHS
//
// We do however need to add the columns used in the subquery coming from the LHS to the grouping.
// That way we get the aggregation grouped by the column we need to use to decide if the row should
// be included in the result set or not.
func pushDownAggregationThroughSemiJoin(rootAggr *Aggregator, join *SemiJoin) (ops.Operator, *rewrite.ApplyResult, error) {
columnsNeeded := slice.Map(join.LHSColumns, func(colName *sqlparser.ColName) GroupBy {
return GroupBy{
Inner: colName,
SimplifiedExpr: colName,
ColOffset: -1,
WSOffset: -1,
}
})

cols := append(columnsNeeded, rootAggr.Grouping...)
join.LHS = &Aggregator{
Source: join.LHS,
QP: rootAggr.QP,
Grouping: cols,
Aggregations: slices.Clone(rootAggr.Aggregations),
Columns: slices.Clone(rootAggr.Columns),
}

rootAggr.aggregateTheAggregates()
return rootAggr, rewrite.NewTree("push Aggregation under semiJoin", rootAggr), nil
}

func collectColNamesNeeded(ctx *plancontext.PlanningContext, f *Filter) (columnsNeeded []*sqlparser.ColName) {
for _, p := range f.Predicates {
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/planbuilder/operators/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ func tryPushingDownProjection(
return p, rewrite.SameTree, nil
}
return pushDownProjectionInApplyJoin(ctx, p, src)
case *UncorrelatedSubQuery:
src.LHS, p.Source = p, src.LHS
return src, rewrite.NewTree("pushed projection into uncorrelated subQuery", p), nil
case *SemiJoin:
src.LHS, p.Source = p, src.LHS
return src, rewrite.NewTree("pushed projection into semiJoin", p), nil
case *Vindex:
return pushDownProjectionInVindex(ctx, p, src)
default:
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (i *Insert) ShortDescription() string {
}

func (i *Insert) GetOrdering() ([]ops.OrderBy, error) {
panic("does not expect insert operator to receive get ordering call")
return nil, nil
}

var _ ops.Operator = (*Insert)(nil)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (s *UncorrelatedSubQuery) SetInputs(ops []ops.Operator) {
}

func (s *UncorrelatedSubQuery) ShortDescription() string {
return ""
return s.Extracted.OpCode.String()
}

func (s *UncorrelatedSubQuery) AddColumns(ctx *plancontext.PlanningContext, reuseExisting bool, addToGroupBy []bool, exprs []*sqlparser.AliasedExpr) ([]int, error) {
Expand Down
14 changes: 11 additions & 3 deletions go/vt/vtgate/planbuilder/operators/subquery_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func optimizeSubQuery(ctx *plancontext.PlanningContext, op *SubQuery, ts semanti
continue
}

if inner.ExtractedSubquery.OpCode == int(popcode.PulloutExists) {
if inner.ExtractedSubquery.OpCode == popcode.PulloutExists {
correlatedTree, err := createSemiJoin(ctx, innerOp, outer, preds, inner.ExtractedSubquery)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -389,9 +389,17 @@ func createSemiJoin(
return nil, err
}
}

// We just a single row from the RHS
limitInner := &Limit{
Source: innerOp,
AST: &sqlparser.Limit{Rowcount: sqlparser.NewIntLiteral("1")},
Pushed: false,
}

return &SemiJoin{
LHS: newOuter,
RHS: innerOp,
RHS: limitInner,
Extracted: extractedSubquery,
Vars: vars,
LHSColumns: lhsCols,
Expand All @@ -404,7 +412,7 @@ func createSemiJoin(
func canMergeSubqueryOnColumnSelection(ctx *plancontext.PlanningContext, a, b *Route, predicate *sqlparser.ExtractedSubquery) bool {
left := predicate.OtherSide
opCode := predicate.OpCode
if opCode != int(popcode.PulloutValue) && opCode != int(popcode.PulloutIn) {
if opCode != popcode.PulloutValue && opCode != popcode.PulloutIn {
return false
}

Expand Down
8 changes: 5 additions & 3 deletions go/vt/vtgate/planbuilder/testdata/filter_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -3200,9 +3200,11 @@
"QueryType": "SELECT",
"Original": "select distinct user.id, user.col from user where user.col in (select id from music where col2 = 'a')",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Ordered",
"GroupBy": "(0|2), 1",
"OperatorType": "Distinct",
"Collations": [
"(0:2)",
"1"
],
"ResultColumns": 2,
"Inputs": [
{
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/semantics/binder.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,16 @@ func (b *binder) createExtractedSubquery(cursor *sqlparser.Cursor, currScope *sc
sq := &sqlparser.ExtractedSubquery{
Subquery: subq,
Original: subq,
OpCode: int(opcode.PulloutValue),
OpCode: opcode.PulloutValue,
}

switch par := cursor.Parent().(type) {
case *sqlparser.ComparisonExpr:
switch par.Operator {
case sqlparser.InOp:
sq.OpCode = int(opcode.PulloutIn)
sq.OpCode = opcode.PulloutIn
case sqlparser.NotInOp:
sq.OpCode = int(opcode.PulloutNotIn)
sq.OpCode = opcode.PulloutNotIn
}
subq, exp := GetSubqueryAndOtherSide(par)
sq.Original = &sqlparser.ComparisonExpr{
Expand All @@ -247,7 +247,7 @@ func (b *binder) createExtractedSubquery(cursor *sqlparser.Cursor, currScope *sc
}
sq.OtherSide = exp
case *sqlparser.ExistsExpr:
sq.OpCode = int(opcode.PulloutExists)
sq.OpCode = opcode.PulloutExists
sq.Original = par
}
return sq, nil
Expand Down

0 comments on commit 14d62cb

Please sign in to comment.