Skip to content

Commit

Permalink
Gen4 Operator Refactoring: ORDER BY & Debugging Enhancements (#12954)
Browse files Browse the repository at this point in the history
* refactor: Update operators to use AliasedExprs

Signed-off-by: Andres Taylor <[email protected]>

* feat: add ORDER BY to the operator planning

Signed-off-by: Andres Taylor <[email protected]>

* feat: support arbitrary expressions for ORDER BY

Signed-off-by: Andres Taylor <[email protected]>

* refactor: simplify the operator code

Signed-off-by: Andres Taylor <[email protected]>

* refactor: simplify 'haveMatchingVindex'

Signed-off-by: Andres Taylor <[email protected]>

* refactor: simplify pushDownProjectionInApplyJoin

Signed-off-by: Andres Taylor <[email protected]>

* debug: added tree printing to the operators

Signed-off-by: Andres Taylor <[email protected]>

* go mod tidy

Signed-off-by: Andres Taylor <[email protected]>

---------

Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay authored Apr 25, 2023
1 parent d469008 commit edb702b
Show file tree
Hide file tree
Showing 45 changed files with 1,581 additions and 1,532 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ require (
github.com/kr/pretty v0.3.1
github.com/kr/text v0.2.0
github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249
github.com/xlab/treeprint v1.2.0
golang.org/x/exp v0.0.0-20230131160201-f062dba9d201
golang.org/x/sync v0.1.0
modernc.org/sqlite v1.20.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,8 @@ github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/treeprint v1.2.0 h1:HzHnuAF1plUN2zGlAFHbSQP2qJ0ZAD3XF5XD7OesXRQ=
github.com/xlab/treeprint v1.2.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd/WEJu0=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3IfnEUduWvb9is428/nNb5L3U01M=
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM=
Expand Down
29 changes: 15 additions & 14 deletions go/vt/vtgate/planbuilder/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"vitess.io/vitess/go/sqltypes"
popcode "vitess.io/vitess/go/vt/vtgate/engine/opcode"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"

"vitess.io/vitess/go/vt/vtgate/semantics"
Expand Down Expand Up @@ -266,7 +267,7 @@ func (hp *horizonPlanning) planAggrUsingOA(
groupByKeys: make([]*engine.GroupByParams, 0, len(grouping)),
}

var order []operators.OrderBy
var order []ops.OrderBy
if hp.qp.CanPushDownSorting {
hp.qp.AlignGroupByAndOrderBy(ctx)
// the grouping order might have changed, so we reload the grouping expressions
Expand Down Expand Up @@ -599,7 +600,7 @@ func hasUniqueVindex(semTable *semantics.SemTable, groupByExprs []operators.Grou
return false
}

func (hp *horizonPlanning) planOrderBy(ctx *plancontext.PlanningContext, orderExprs []operators.OrderBy, plan logicalPlan) (logicalPlan, error) {
func (hp *horizonPlanning) planOrderBy(ctx *plancontext.PlanningContext, orderExprs []ops.OrderBy, plan logicalPlan) (logicalPlan, error) {
switch plan := plan.(type) {
case *routeGen4:
return planOrderByForRoute(ctx, orderExprs, plan, hp.qp.HasStar)
Expand All @@ -609,7 +610,7 @@ func (hp *horizonPlanning) planOrderBy(ctx *plancontext.PlanningContext, orderEx
return hp.planOrderByForHashJoin(ctx, orderExprs, plan)
case *orderedAggregate:
// remove ORDER BY NULL from the list of order by expressions since we will be doing the ordering on vtgate level so NULL is not useful
var orderExprsWithoutNils []operators.OrderBy
var orderExprsWithoutNils []ops.OrderBy
for _, expr := range orderExprs {
if sqlparser.IsNull(expr.Inner.Expr) {
continue
Expand Down Expand Up @@ -659,15 +660,15 @@ func (hp *horizonPlanning) planOrderBy(ctx *plancontext.PlanningContext, orderEx
return nil, vterrors.VT13001(fmt.Sprintf("ORDER BY in complex query %T", plan))
}

func isSpecialOrderBy(o operators.OrderBy) bool {
func isSpecialOrderBy(o ops.OrderBy) bool {
if sqlparser.IsNull(o.Inner.Expr) {
return true
}
f, isFunction := o.Inner.Expr.(*sqlparser.FuncExpr)
return isFunction && f.Name.Lowered() == "rand"
}

func planOrderByForRoute(ctx *plancontext.PlanningContext, orderExprs []operators.OrderBy, plan *routeGen4, hasStar bool) (logicalPlan, error) {
func planOrderByForRoute(ctx *plancontext.PlanningContext, orderExprs []ops.OrderBy, plan *routeGen4, hasStar bool) (logicalPlan, error) {
for _, order := range orderExprs {
err := checkOrderExprCanBePlannedInScatter(ctx, plan, order, hasStar)
if err != nil {
Expand Down Expand Up @@ -698,7 +699,7 @@ func planOrderByForRoute(ctx *plancontext.PlanningContext, orderExprs []operator

// checkOrderExprCanBePlannedInScatter verifies that the given order by expression can be planned.
// It checks if the expression exists in the plan's select list when the query is a scatter.
func checkOrderExprCanBePlannedInScatter(ctx *plancontext.PlanningContext, plan *routeGen4, order operators.OrderBy, hasStar bool) error {
func checkOrderExprCanBePlannedInScatter(ctx *plancontext.PlanningContext, plan *routeGen4, order ops.OrderBy, hasStar bool) error {
if !hasStar {
return nil
}
Expand Down Expand Up @@ -759,7 +760,7 @@ func weightStringFor(expr sqlparser.Expr) sqlparser.Expr {
return &sqlparser.WeightStringFuncExpr{Expr: expr}
}

func (hp *horizonPlanning) planOrderByForHashJoin(ctx *plancontext.PlanningContext, orderExprs []operators.OrderBy, plan *hashJoin) (logicalPlan, error) {
func (hp *horizonPlanning) planOrderByForHashJoin(ctx *plancontext.PlanningContext, orderExprs []ops.OrderBy, plan *hashJoin) (logicalPlan, error) {
if len(orderExprs) == 1 && isSpecialOrderBy(orderExprs[0]) {
rhs, err := hp.planOrderBy(ctx, orderExprs, plan.Right)
if err != nil {
Expand All @@ -783,7 +784,7 @@ func (hp *horizonPlanning) planOrderByForHashJoin(ctx *plancontext.PlanningConte
return sortPlan, nil
}

func (hp *horizonPlanning) planOrderByForJoin(ctx *plancontext.PlanningContext, orderExprs []operators.OrderBy, plan *joinGen4) (logicalPlan, error) {
func (hp *horizonPlanning) planOrderByForJoin(ctx *plancontext.PlanningContext, orderExprs []ops.OrderBy, plan *joinGen4) (logicalPlan, error) {
if len(orderExprs) == 1 && isSpecialOrderBy(orderExprs[0]) {
lhs, err := hp.planOrderBy(ctx, orderExprs, plan.Left)
if err != nil {
Expand Down Expand Up @@ -814,7 +815,7 @@ func (hp *horizonPlanning) planOrderByForJoin(ctx *plancontext.PlanningContext,
return sortPlan, nil
}

func createMemorySortPlanOnAggregation(ctx *plancontext.PlanningContext, plan *orderedAggregate, orderExprs []operators.OrderBy) (logicalPlan, error) {
func createMemorySortPlanOnAggregation(ctx *plancontext.PlanningContext, plan *orderedAggregate, orderExprs []ops.OrderBy) (logicalPlan, error) {
primitive := &engine.MemorySort{}
ms := &memorySort{
resultsBuilder: resultsBuilder{
Expand Down Expand Up @@ -843,7 +844,7 @@ func createMemorySortPlanOnAggregation(ctx *plancontext.PlanningContext, plan *o
return ms, nil
}

func findExprInOrderedAggr(ctx *plancontext.PlanningContext, plan *orderedAggregate, order operators.OrderBy) (keyCol int, weightStringCol int, found bool) {
func findExprInOrderedAggr(ctx *plancontext.PlanningContext, plan *orderedAggregate, order ops.OrderBy) (keyCol int, weightStringCol int, found bool) {
for _, key := range plan.groupByKeys {
if ctx.SemTable.EqualsExpr(order.WeightStrExpr, key.Expr) ||
ctx.SemTable.EqualsExpr(order.Inner.Expr, key.Expr) {
Expand All @@ -859,7 +860,7 @@ func findExprInOrderedAggr(ctx *plancontext.PlanningContext, plan *orderedAggreg
return 0, 0, false
}

func (hp *horizonPlanning) createMemorySortPlan(ctx *plancontext.PlanningContext, plan logicalPlan, orderExprs []operators.OrderBy, useWeightStr bool) (logicalPlan, error) {
func (hp *horizonPlanning) createMemorySortPlan(ctx *plancontext.PlanningContext, plan logicalPlan, orderExprs []ops.OrderBy, useWeightStr bool) (logicalPlan, error) {
primitive := &engine.MemorySort{}
ms := &memorySort{
resultsBuilder: resultsBuilder{
Expand Down Expand Up @@ -890,7 +891,7 @@ func (hp *horizonPlanning) createMemorySortPlan(ctx *plancontext.PlanningContext
return ms, nil
}

func orderExprsDependsOnTableSet(orderExprs []operators.OrderBy, semTable *semantics.SemTable, ts semantics.TableSet) bool {
func orderExprsDependsOnTableSet(orderExprs []ops.OrderBy, semTable *semantics.SemTable, ts semantics.TableSet) bool {
for _, expr := range orderExprs {
exprDependencies := semTable.RecursiveDeps(expr.Inner.Expr)
if !exprDependencies.IsSolvedBy(ts) {
Expand Down Expand Up @@ -961,7 +962,7 @@ func (hp *horizonPlanning) planDistinctOA(semTable *semantics.SemTable, currPlan
}

func (hp *horizonPlanning) addDistinct(ctx *plancontext.PlanningContext, plan logicalPlan) (logicalPlan, error) {
var orderExprs []operators.OrderBy
var orderExprs []ops.OrderBy
var groupByKeys []*engine.GroupByParams
for index, sExpr := range hp.qp.SelectExprs {
aliasExpr, err := sExpr.GetAliasedExpr()
Expand Down Expand Up @@ -989,7 +990,7 @@ func (hp *horizonPlanning) addDistinct(ctx *plancontext.PlanningContext, plan lo
grpParam.WeightStringCol = wOffset
groupByKeys = append(groupByKeys, grpParam)

orderExprs = append(orderExprs, operators.OrderBy{
orderExprs = append(orderExprs, ops.OrderBy{
Inner: &sqlparser.Order{Expr: inner},
WeightStrExpr: aliasExpr.Expr},
)
Expand Down
50 changes: 48 additions & 2 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,47 @@ func transformToLogicalPlan(ctx *plancontext.PlanningContext, op ops.Operator, i
return transformProjection(ctx, op)
case *operators.Limit:
return transformLimit(ctx, op)
case *operators.Ordering:
return transformOrdering(ctx, op)
}

return nil, vterrors.VT13001(fmt.Sprintf("unknown type encountered: %T (transformToLogicalPlan)", op))
}

func transformOrdering(ctx *plancontext.PlanningContext, op *operators.Ordering) (logicalPlan, error) {
plan, err := transformToLogicalPlan(ctx, op.Source, false)
if err != nil {
return nil, err
}

return createMemorySort(ctx, plan, op)
}

func createMemorySort(ctx *plancontext.PlanningContext, src logicalPlan, ordering *operators.Ordering) (logicalPlan, error) {
primitive := &engine.MemorySort{}
ms := &memorySort{
resultsBuilder: resultsBuilder{
logicalPlanCommon: newBuilderCommon(src),
weightStrings: make(map[*resultColumn]int),
truncater: primitive,
},
eMemorySort: primitive,
}

for idx, order := range ordering.Order {
collationID := ctx.SemTable.CollationForExpr(order.WeightStrExpr)
ms.eMemorySort.OrderBy = append(ms.eMemorySort.OrderBy, engine.OrderByParams{
Col: ordering.Offset[idx],
WeightStringCol: ordering.WOffset[idx],
Desc: order.Inner.Direction == sqlparser.DescOrder,
StarColFixedIndex: ordering.Offset[idx],
CollationID: collationID,
})
}

return ms, nil
}

func transformProjection(ctx *plancontext.PlanningContext, op *operators.Projection) (logicalPlan, error) {
src, err := transformToLogicalPlan(ctx, op.Source, false)
if err != nil {
Expand Down Expand Up @@ -233,8 +269,9 @@ func routeToEngineRoute(ctx *plancontext.PlanningContext, op *operators.Route) (
}

return &engine.Route{
TableName: strings.Join(tableNames, ", "),
RoutingParameters: rp,
TableName: strings.Join(tableNames, ", "),
RoutingParameters: rp,
TruncateColumnCount: op.ResultColumns,
}, nil
}

Expand Down Expand Up @@ -265,6 +302,15 @@ func transformRoutePlan(ctx *plancontext.PlanningContext, op *operators.Route) (
}
replaceSubQuery(ctx, sel)
eroute, err := routeToEngineRoute(ctx, op)
for _, order := range op.Ordering {
collation := ctx.SemTable.CollationForExpr(order.AST)
eroute.OrderBy = append(eroute.OrderBy, engine.OrderByParams{
Col: order.Offset,
WeightStringCol: order.WOffset,
Desc: order.Direction == sqlparser.DescOrder,
CollationID: collation,
})
}
if err != nil {
return nil, err
}
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vtgate/planbuilder/operators/SQL_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,26 @@ func buildQuery(op ops.Operator, qb *queryBuilder) error {
return buildHorizon(op, qb)
case *Limit:
return buildLimit(op, qb)
case *Ordering:
return buildOrdering(op, qb)
default:
return vterrors.VT13001(fmt.Sprintf("do not know how to turn %T into SQL", op))
}
return nil
}

func buildOrdering(op *Ordering, qb *queryBuilder) error {
err := buildQuery(op.Source, qb)
if err != nil {
return err
}

for _, order := range op.Order {
qb.sel.AddOrder(order.Inner)
}
return nil
}

func buildLimit(op *Limit, qb *queryBuilder) error {
err := buildQuery(op.Source, qb)
if err != nil {
Expand Down
62 changes: 37 additions & 25 deletions go/vt/vtgate/planbuilder/operators/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,12 @@ type ApplyJoin struct {
// so they can be used for the result of this expression that is using data from both sides.
// All fields will be used for these
type JoinColumn struct {
Original sqlparser.Expr // this is the original expression being passed through
BvNames []string // the BvNames and LHSCols line up
Original *sqlparser.AliasedExpr // this is the original expression being passed through
BvNames []string // the BvNames and LHSCols line up
LHSExprs []sqlparser.Expr
RHSExpr sqlparser.Expr
}

var _ ops.PhysicalOperator = (*ApplyJoin)(nil)

func NewApplyJoin(lhs, rhs ops.Operator, predicate sqlparser.Expr, leftOuterJoin bool) *ApplyJoin {
return &ApplyJoin{
LHS: lhs,
Expand All @@ -87,9 +85,6 @@ func NewApplyJoin(lhs, rhs ops.Operator, predicate sqlparser.Expr, leftOuterJoin
}
}

// IPhysical implements the PhysicalOperator interface
func (a *ApplyJoin) IPhysical() {}

// Clone implements the Operator interface
func (a *ApplyJoin) Clone(inputs []ops.Operator) ops.Operator {
return &ApplyJoin{
Expand Down Expand Up @@ -180,26 +175,35 @@ func (a *ApplyJoin) pushColRight(ctx *plancontext.PlanningContext, e *sqlparser.
return offset, nil
}

func (a *ApplyJoin) GetColumns() ([]sqlparser.Expr, error) {
func (a *ApplyJoin) GetColumns() ([]*sqlparser.AliasedExpr, error) {
return slices2.Map(a.ColumnsAST, jcToExpr), nil
}

func jcToExpr(c JoinColumn) sqlparser.Expr { return c.Original }
func (a *ApplyJoin) GetOrdering() ([]ops.OrderBy, error) {
return a.LHS.GetOrdering()
}

func jcToExpr(c JoinColumn) *sqlparser.AliasedExpr {
return c.Original
}

func (a *ApplyJoin) getJoinColumnFor(ctx *plancontext.PlanningContext, e sqlparser.Expr) (col JoinColumn, err error) {
col.Original = e
func (a *ApplyJoin) getJoinColumnFor(ctx *plancontext.PlanningContext, e *sqlparser.AliasedExpr) (col JoinColumn, err error) {
defer func() {
col.Original = e
}()
lhs := TableID(a.LHS)
rhs := TableID(a.RHS)
both := lhs.Merge(rhs)
deps := ctx.SemTable.RecursiveDeps(e)
expr := e.Expr
deps := ctx.SemTable.RecursiveDeps(expr)

switch {
case deps.IsSolvedBy(lhs):
col.LHSExprs = []sqlparser.Expr{e}
col.LHSExprs = []sqlparser.Expr{expr}
case deps.IsSolvedBy(rhs):
col.RHSExpr = e
col.RHSExpr = expr
case deps.IsSolvedBy(both):
col, err = BreakExpressionInLHSandRHS(ctx, e, TableID(a.LHS))
col, err = BreakExpressionInLHSandRHS(ctx, expr, TableID(a.LHS))
if err != nil {
return JoinColumn{}, err
}
Expand All @@ -210,11 +214,15 @@ func (a *ApplyJoin) getJoinColumnFor(ctx *plancontext.PlanningContext, e sqlpars
return
}

func jcToAliasedExpr(column JoinColumn) sqlparser.Expr {
return column.Original.Expr
}

func (a *ApplyJoin) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr) (ops.Operator, int, error) {
if offset, found := canReuseColumn(ctx, a.ColumnsAST, expr.Expr, jcToExpr); found {
if offset, found := canReuseColumn(ctx, a.ColumnsAST, expr.Expr, jcToAliasedExpr); found {
return a, offset, nil
}
col, err := a.getJoinColumnFor(ctx, expr.Expr)
col, err := a.getJoinColumnFor(ctx, expr)
if err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -271,24 +279,28 @@ func (a *ApplyJoin) planOffsets(ctx *plancontext.PlanningContext) (err error) {
}

func (a *ApplyJoin) addOffset(offset int) {
index := slices.Index(a.Columns, offset)
if index != -1 {
panic("should never pass through the same column")
}
a.Columns = append(a.Columns, offset)
}

func (a *ApplyJoin) Description() ops.OpDescription {
other := map[string]any{}
if len(a.Columns) > 0 {
other["OutputColumns"] = a.Columns
}
if a.Predicate != nil {
other["Predicate"] = sqlparser.String(a.Predicate)
}
return ops.OpDescription{
OperatorType: "Join",
Variant: "Apply",
Other: map[string]any{
"Predicate": sqlparser.String(a.Predicate),
"OutputColumns": a.Columns,
},
Other: other,
}
}

func (a *ApplyJoin) ShortDescription() string {
return sqlparser.String(a.Predicate)
}

func (jc JoinColumn) IsPureLeft() bool {
return jc.RHSExpr == nil
}
Expand Down
Loading

0 comments on commit edb702b

Please sign in to comment.