Skip to content

Commit

Permalink
gen4: plan group by and order by together
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Jul 13, 2021
1 parent 819a83d commit 72d6a00
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 28 deletions.
3 changes: 3 additions & 0 deletions go/test/endtoend/vtgate/gen4/gen4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func TestGroupBy(t *testing.T) {

assertMatches(t, conn, `select tcol1, tcol1 from t1 join t2 on t1.id = t2.id order by tcol1`,
`[[VARCHAR("A") VARCHAR("A")] [VARCHAR("A") VARCHAR("A")] [VARCHAR("B") VARCHAR("B")] [VARCHAR("C") VARCHAR("C")]]`)

assertMatches(t, conn, `select count(*) k, tcol1, tcol2, "abc" b from t2 group by tcol1, tcol2, b order by k, tcol2`,
`[[INT64(1) VARCHAR("B") VARCHAR("A") VARCHAR("abc")] [INT64(1) VARCHAR("C") VARCHAR("A") VARCHAR("abc")] [INT64(1) VARCHAR("C") VARCHAR("B") VARCHAR("abc")] [INT64(1) VARCHAR("A") VARCHAR("C") VARCHAR("abc")] [INT64(2) VARCHAR("A") VARCHAR("A") VARCHAR("abc")] [INT64(2) VARCHAR("B") VARCHAR("C") VARCHAR("abc")]]`)
}

func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) {
Expand Down
13 changes: 8 additions & 5 deletions go/vt/vtgate/planbuilder/abstract/queryprojection.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ type (

// QueryProjection contains the information about the projections, group by and order by expressions used to do horizon planning.
QueryProjection struct {
SelectExprs []SelectExpr
HasAggr bool
GroupByExprs []GroupBy
OrderExprs []OrderBy
SelectExprs []SelectExpr
HasAggr bool
GroupByExprs []GroupBy
OrderExprs []OrderBy
CanPushDownSorting bool
}

// OrderBy contains the expression to used in order by and also if ordering is needed at VTGate level then what the weight_string function expression to be sent down for evaluation.
Expand Down Expand Up @@ -92,14 +93,14 @@ func CreateQPFromSelect(sel *sqlparser.Select) (*QueryProjection, error) {
}

for _, group := range sel.GroupBy {
// todo dont ignore weightstringexpr
expr, weightStrExpr, err := qp.getSimplifiedExpr(group, "group statement")
if err != nil {
return nil, err
}
qp.GroupByExprs = append(qp.GroupByExprs, GroupBy{Inner: expr, WeightStrExpr: weightStrExpr})
}

canPushDownSorting := true
for _, order := range sel.OrderBy {
expr, weightStrExpr, err := qp.getSimplifiedExpr(order.Expr, "order clause")
if err != nil {
Expand All @@ -112,7 +113,9 @@ func CreateQPFromSelect(sel *sqlparser.Select) (*QueryProjection, error) {
},
WeightStrExpr: weightStrExpr,
})
canPushDownSorting = canPushDownSorting && !sqlparser.ContainsAggregation(weightStrExpr)
}
qp.CanPushDownSorting = canPushDownSorting

return qp, nil
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/planbuilder/ordered_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type orderedAggregate struct {
resultsBuilder
extraDistinct *sqlparser.ColName
eaggr *engine.OrderedAggregate
columnOffset map[sqlparser.Expr]int
}

// checkAggregates analyzes the select expression for aggregates. If it determines
Expand Down
17 changes: 0 additions & 17 deletions go/vt/vtgate/planbuilder/queryprojection_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func planHorizon(sel *sqlparser.Select, plan logicalPlan, semTable *semantics.Se
needsTruncation = needsTruncation || colAdded
}

if qp.HasAggr {
if qp.HasAggr && qp.CanPushDownSorting {
var colAdded bool
plan, colAdded, err = planOrderByUsingGroupBy(qp, plan, semTable)
if err != nil {
Expand Down
89 changes: 84 additions & 5 deletions go/vt/vtgate/planbuilder/selectGen4.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ func pushProjection(expr *sqlparser.AliasedExpr, plan logicalPlan, semTable *sem
}
node.Cols = append(node.Cols, column)
return len(node.Cols) - 1, true, nil
case *orderedAggregate:
for k, v := range node.columnOffset {
if sqlparser.EqualsExpr(expr.Expr, k) {
return v, false, nil
}
}
return 0, false, semantics.Gen4NotSupportedF("column not found in already added list: %s", sqlparser.String(expr))
default:
return 0, false, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "%T not yet supported", node)
}
Expand Down Expand Up @@ -129,7 +136,8 @@ func planAggregations(qp *abstract.QueryProjection, plan logicalPlan, semTable *
weightStrings: make(map[*resultColumn]int),
truncater: eaggr,
},
eaggr: eaggr,
eaggr: eaggr,
columnOffset: map[sqlparser.Expr]int{},
}
for _, e := range qp.SelectExprs {
offset, _, err := pushProjection(e.Col, plan, semTable, true, false)
Expand All @@ -143,6 +151,7 @@ func planAggregations(qp *abstract.QueryProjection, plan logicalPlan, semTable *
Opcode: opcode,
Col: offset,
})
oa.columnOffset[e.Col.Expr] = offset
}
}

Expand All @@ -154,6 +163,25 @@ func planAggregations(qp *abstract.QueryProjection, plan logicalPlan, semTable *
}
colAdded = colAdded || added
}

if !qp.CanPushDownSorting {
var orderExprs []abstract.OrderBy
// if we can't at a later stage push down the sorting to our inputs, we have to do ordering here
for _, groupExpr := range qp.GroupByExprs {
orderExprs = append(orderExprs, abstract.OrderBy{
Inner: &sqlparser.Order{Expr: groupExpr.Inner},
WeightStrExpr: groupExpr.WeightStrExpr},
)
}
if len(orderExprs) > 0 {
newInput, added, err := planOrderBy(qp, orderExprs, plan, semTable)
if err != nil {
return nil, false, err
}
oa.input = newInput
return oa, colAdded || added, nil
}
}
return oa, colAdded, nil
}

Expand All @@ -176,6 +204,7 @@ func planGroupByGen4(groupExpr abstract.GroupBy, plan logicalPlan, semTable *sem
if err != nil {
return false, err
}
node.columnOffset[groupExpr.WeightStrExpr] = keyCol
return colAdded || colAddedRecursively, nil
default:
return false, semantics.Gen4NotSupportedF("group by on: %T", plan)
Expand Down Expand Up @@ -212,15 +241,25 @@ func planOrderBy(qp *abstract.QueryProjection, orderExprs []abstract.OrderBy, pl
case *joinGen4:
return planOrderByForJoin(qp, orderExprs, plan, semTable)
case *orderedAggregate:
for _, order := range orderExprs {
if sqlparser.ContainsAggregation(order.WeightStrExpr) {
ms, err := createMemorySortPlanOnAggregation(plan, orderExprs)
if err != nil {
return nil, false, err
}
return ms, false, nil
}
}
newInput, colAdded, err := planOrderBy(qp, orderExprs, plan.input, semTable)
if err != nil {
return nil, false, err
}
plan.input = newInput

return plan, colAdded, nil
case *memorySort:
return plan, false, nil
default:
return nil, false, semantics.Gen4NotSupportedF("ordering on complex query")
return nil, false, semantics.Gen4NotSupportedF("ordering on complex query %T", plan)
}
}

Expand Down Expand Up @@ -310,7 +349,49 @@ func planOrderByForJoin(qp *abstract.QueryProjection, orderExprs []abstract.Orde
plan.Left = newLeft
return plan, false, nil
}
ms, colAdded, err := createMemorySortPlan(plan, orderExprs, semTable)
if err != nil {
return nil, false, err
}
return ms, colAdded, nil
}

func createMemorySortPlanOnAggregation(plan *orderedAggregate, orderExprs []abstract.OrderBy) (logicalPlan, error) {
primitive := &engine.MemorySort{}
ms := &memorySort{
resultsBuilder: resultsBuilder{
logicalPlanCommon: newBuilderCommon(plan),
weightStrings: make(map[*resultColumn]int),
truncater: primitive,
},
eMemorySort: primitive,
}

for _, order := range orderExprs {
offset, found := findExprInOrderedAggr(plan, order)
if !found {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "expected to find this expression")
}
ms.eMemorySort.OrderBy = append(ms.eMemorySort.OrderBy, engine.OrderbyParams{
Col: offset,
WeightStringCol: -1,
Desc: order.Inner.Direction == sqlparser.DescOrder,
StarColFixedIndex: offset,
})
}
return ms, nil
}

func findExprInOrderedAggr(plan *orderedAggregate, order abstract.OrderBy) (int, bool) {
for expr, i := range plan.columnOffset {
if sqlparser.EqualsExpr(order.WeightStrExpr, expr) {
return i, true
}
}
return 0, false
}

func createMemorySortPlan(plan logicalPlan, orderExprs []abstract.OrderBy, semTable *semantics.SemTable) (logicalPlan, bool, error) {
primitive := &engine.MemorySort{}
ms := &memorySort{
resultsBuilder: resultsBuilder{
Expand All @@ -335,9 +416,7 @@ func planOrderByForJoin(qp *abstract.QueryProjection, orderExprs []abstract.Orde
StarColFixedIndex: offset,
})
}

return ms, colAdded, nil

}

func allLeft(orderExprs []abstract.OrderBy, semTable *semantics.SemTable, lhsTables semantics.TableSet) bool {
Expand Down
Loading

0 comments on commit 72d6a00

Please sign in to comment.