Skip to content

Commit

Permalink
new planner handles non-grouping aggregation
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Nov 26, 2020
1 parent 98012e8 commit b8ac5f5
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 22 deletions.
55 changes: 39 additions & 16 deletions go/vt/vtgate/planbuilder/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ func (pb *primitiveBuilder) analyseSelectExpr(sel *sqlparser.Select) (*Horizon,

result := &Horizon{hasStar: stillHasStars}
if stillHasStars {
// We'll allow select * for simple routes.
// There is no point to continue to plan here.
// we might still allow this query, if it is a single sharded route
return result, nil
}
for _, node := range selectExprs {
Expand Down Expand Up @@ -183,63 +184,85 @@ func isAggregateExpression(expr sqlparser.Expr) bool {

func (pb *primitiveBuilder) planHorizon(sel *sqlparser.Select, horizon *Horizon) error {
rb, isRoute := pb.plan.(*route)
if isRoute {
if isRoute && len(horizon.aggregateFuncs) == 0 {
// since we can push down all of the aggregation to the route,
// we don't need to do anything else here
rb.Select = sel
return nil
}

resultColumns := make([]*resultColumn, 0, len(horizon.projections)+len(horizon.aggregateFuncs))
for _, projection := range horizon.projections {
newBuilder, rc, _, err := pb.pushProjection(pb.plan, projection.expr, projection.origin)
rc, _, err := pb.pushProjection(pb.plan, projection.expr, projection.origin)
if err != nil {
return err
}
pb.plan = newBuilder
resultColumns = append(resultColumns, rc)
}
if len(horizon.aggregateFuncs) > 0 {
newColumns, err := pb.planAggregation(rb, horizon)
if err != nil {
return err
}
resultColumns = append(resultColumns, newColumns...)
}
pb.st.SetResultColumns(resultColumns)
return nil
}

func (pb *primitiveBuilder) pushProjection(in logicalPlan, expr *sqlparser.AliasedExpr, origin logicalPlan) (logicalPlan, *resultColumn, int, error) {
func (pb *primitiveBuilder) planAggregation(rb *route, horizon *Horizon) ([]*resultColumn, error) {
var resultColumns []*resultColumn
eaggr := &engine.OrderedAggregate{}
newPlan := &orderedAggregate{
resultsBuilder: newResultsBuilder(rb, eaggr),
eaggr: eaggr,
}
pb.plan = newPlan
for _, aggrFunc := range horizon.aggregateFuncs {
rc, _, err := newPlan.pushAggr2(pb, aggrFunc.expr, aggrFunc.origin)
if err != nil {
return nil, err
}
resultColumns = append(resultColumns, rc)
}
return resultColumns, nil
}

func (pb *primitiveBuilder) pushProjection(in logicalPlan, expr *sqlparser.AliasedExpr, origin logicalPlan) (*resultColumn, int, error) {
switch node := in.(type) {
case *join:
var rc *resultColumn
if node.isOnLeft(origin.Order()) {
newLeft, col, colNumber, err := pb.pushProjection(node.Left, expr, origin)
col, colNumber, err := pb.pushProjection(node.Left, expr, origin)
if err != nil {
return nil, nil, 0, err
return nil, 0, err
}
node.ejoin.Cols = append(node.ejoin.Cols, -colNumber-1)
rc = col
node.Left = newLeft
} else {
// Pushing of non-trivial expressions not allowed for RHS of left joins.
if _, ok := expr.Expr.(*sqlparser.ColName); !ok && node.ejoin.Opcode == engine.LeftJoin {
return nil, nil, 0, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cross-shard left join and column expressions")
return nil, 0, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cross-shard left join and column expressions")
}

newRight, col, colNumber, err := pb.pushProjection(node.Right, expr, origin)
col, colNumber, err := pb.pushProjection(node.Right, expr, origin)
if err != nil {
return nil, nil, 0, err
return nil, 0, err
}
node.ejoin.Cols = append(node.ejoin.Cols, colNumber+1)
rc = col
node.Right = newRight
}
node.resultColumns = append(node.resultColumns, rc)
return in, rc, len(node.resultColumns) - 1, nil
return rc, len(node.resultColumns) - 1, nil

case *route:
sel := node.Select.(*sqlparser.Select)
sel.SelectExprs = append(sel.SelectExprs, expr)

rc := newResultColumn(expr, node)
node.resultColumns = append(node.resultColumns, rc)
return node, rc, len(node.resultColumns) - 1, nil
return rc, len(node.resultColumns) - 1, nil

default:
return nil, nil, 0, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "%T.pushProjection: unreachable", in)
return nil, 0, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "%T.pushProjection: unreachable", in)
}
}
30 changes: 30 additions & 0 deletions go/vt/vtgate/planbuilder/ordered_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,36 @@ func (oa *orderedAggregate) Primitive() engine.Primitive {
return oa.eaggr
}

func (oa *orderedAggregate) pushAggr2(pb *primitiveBuilder, expr *sqlparser.AliasedExpr, origin logicalPlan) (rc *resultColumn, colNumber int, err error) {
funcExpr := expr.Expr.(*sqlparser.FuncExpr)
opcode := engine.SupportedAggregates[funcExpr.Name.Lowered()]
if len(funcExpr.Exprs) != 1 {
return nil, 0, fmt.Errorf("unsupported: only one expression allowed inside aggregates: %s", sqlparser.String(funcExpr))
}
handleDistinct, _, err := oa.needDistinctHandling(pb, funcExpr, opcode)
if err != nil {
return nil, 0, err
}
if handleDistinct {
// TODO
} else {
_, innerCol, err := pb.pushProjection(oa.input, expr, origin)
if err != nil {
return nil, 0, err
}
oa.eaggr.Aggregates = append(oa.eaggr.Aggregates, engine.AggregateParams{
Opcode: opcode,
Col: innerCol,
})
}

// Build a new rc with oa as origin because it's semantically different
// from the expression we pushed down.
rc = newResultColumn(expr, oa)
oa.resultColumns = append(oa.resultColumns, rc)
return rc, len(oa.resultColumns) - 1, nil
}

func (oa *orderedAggregate) pushAggr(pb *primitiveBuilder, expr *sqlparser.AliasedExpr, origin logicalPlan) (rc *resultColumn, colNumber int, err error) {
funcExpr := expr.Expr.(*sqlparser.FuncExpr)
opcode := engine.SupportedAggregates[funcExpr.Name.Lowered()]
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtgate/planbuilder/pullout_subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ type pulloutSubquery struct {
order int
subquery logicalPlan
underlying logicalPlan
eSubquery *engine.PulloutSubquery
primitive *engine.PulloutSubquery
}

// newPulloutSubquery builds a new pulloutSubquery.
func newPulloutSubquery(opcode engine.PulloutOpcode, sqName, hasValues string, subquery logicalPlan) *pulloutSubquery {
return &pulloutSubquery{
subquery: subquery,
eSubquery: &engine.PulloutSubquery{
primitive: &engine.PulloutSubquery{
Opcode: opcode,
SubqueryResult: sqName,
HasValues: hasValues,
Expand Down Expand Up @@ -68,9 +68,9 @@ func (ps *pulloutSubquery) Reorder(order int) {

// Primitive implements the logicalPlan interface
func (ps *pulloutSubquery) Primitive() engine.Primitive {
ps.eSubquery.Subquery = ps.subquery.Primitive()
ps.eSubquery.Underlying = ps.underlying.Primitive()
return ps.eSubquery
ps.primitive.Subquery = ps.subquery.Primitive()
ps.primitive.Underlying = ps.underlying.Primitive()
return ps.primitive
}

// ResultColumns implements the logicalPlan interface
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/testdata/onecase.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
# Add your test case here for debugging and run go test -run=One.
# Add your test case here for debugging and run go test -run=One.

0 comments on commit b8ac5f5

Please sign in to comment.