Skip to content

Commit

Permalink
fix issue with aggregation inside of derived tables (#16366)
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay authored Jul 15, 2024
1 parent 8a59865 commit 1f881fd
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,9 @@ select COUNT(*)
from (select 1 as one
FROM `t3`
WHERE `t3`.`name` = 'B'
ORDER BY id DESC LIMIT 25
OFFSET 0) subquery_for_count;
ORDER BY id DESC
LIMIT 25 OFFSET 0) subquery_for_count;

select u.id, u.t1_id, t.num_segments
from (select id, count(*) as num_segments from t1 group by 1 order by 2 desc limit 20) t
join t2 u on u.id = t.id;
8 changes: 8 additions & 0 deletions go/vt/vtgate/planbuilder/operators/SQL_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,14 @@ func buildAggregation(op *Aggregator, qb *queryBuilder) {
if op.WithRollup {
qb.setWithRollup()
}

if op.DT != nil {
sel := qb.asSelectStatement()
qb.stmt = nil
qb.addTableExpr(op.DT.Alias, op.DT.Alias, TableID(op), &sqlparser.DerivedTable{
Select: sel,
}, nil, op.DT.Columns)
}
}

func buildOrdering(op *Ordering, qb *queryBuilder) {
Expand Down
28 changes: 25 additions & 3 deletions go/vt/vtgate/planbuilder/operators/horizon_expanding.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func expandSelectHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, sel
// if we are dealing with a derived table, we need to make sure that the ordering columns
// are available outside the derived table
for _, order := range horizon.Query.GetOrderBy() {
qp.addColumn(ctx, order.Expr)
qp.addDerivedColumn(ctx, order.Expr)
}
}

Expand All @@ -108,7 +108,7 @@ func expandSelectHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, sel
}

if len(qp.OrderExprs) > 0 {
op = expandOrderBy(ctx, op, qp)
op = expandOrderBy(ctx, op, qp, horizon.Alias)
extracted = append(extracted, "Ordering")
}

Expand All @@ -124,7 +124,7 @@ func expandSelectHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, sel
return op, Rewrote(fmt.Sprintf("expand SELECT horizon into (%s)", strings.Join(extracted, ", ")))
}

func expandOrderBy(ctx *plancontext.PlanningContext, op Operator, qp *QueryProjection) Operator {
func expandOrderBy(ctx *plancontext.PlanningContext, op Operator, qp *QueryProjection, derived string) Operator {
var newOrder []OrderBy
sqc := &SubQueryBuilder{}
proj, ok := op.(*Projection)
Expand All @@ -134,6 +134,9 @@ func expandOrderBy(ctx *plancontext.PlanningContext, op Operator, qp *QueryProje
newExpr, subqs := sqc.pullOutValueSubqueries(ctx, expr.SimplifiedExpr, TableID(op), false)
if newExpr == nil {
// If no subqueries are found, retain the original order expression
if derived != "" {
expr = exposeOrderingColumn(ctx, qp, expr, derived)
}
newOrder = append(newOrder, expr)
continue
}
Expand Down Expand Up @@ -167,6 +170,25 @@ func expandOrderBy(ctx *plancontext.PlanningContext, op Operator, qp *QueryProje
}
}

// exposeOrderingColumn will expose the ordering column to the outer query
func exposeOrderingColumn(ctx *plancontext.PlanningContext, qp *QueryProjection, orderBy OrderBy, derived string) OrderBy {
for _, se := range qp.SelectExprs {
aliasedExpr, err := se.GetAliasedExpr()
if err != nil {
panic(vterrors.VT13001("unexpected expression in select"))
}
if ctx.SemTable.EqualsExprWithDeps(aliasedExpr.Expr, orderBy.SimplifiedExpr) {
newExpr := sqlparser.NewColNameWithQualifier(aliasedExpr.ColumnName(), sqlparser.NewTableName(derived))
ctx.SemTable.CopySemanticInfo(orderBy.SimplifiedExpr, newExpr)
orderBy.SimplifiedExpr = newExpr
orderBy.Inner = &sqlparser.Order{Expr: newExpr, Direction: orderBy.Inner.Direction}
break
}
}

return orderBy
}

func createProjectionFromSelect(ctx *plancontext.PlanningContext, horizon *Horizon) Operator {
qp := horizon.getQP(ctx)

Expand Down
5 changes: 3 additions & 2 deletions go/vt/vtgate/planbuilder/operators/queryprojection.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,9 @@ func (qp *QueryProjection) useGroupingOverDistinct(ctx *plancontext.PlanningCont
return true
}

// addColumn adds a column to the QueryProjection if it is not already present
func (qp *QueryProjection) addColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) {
// addColumn adds a column to the QueryProjection if it is not already present.
// It will use a column name that is available on the outside of the derived table
func (qp *QueryProjection) addDerivedColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) {
for _, selectExpr := range qp.SelectExprs {
getExpr, err := selectExpr.GetExpr()
if err != nil {
Expand Down
56 changes: 54 additions & 2 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,58 @@
]
}
},
{
"comment": "Aggregation with derived table",
"query": "select u.id, u.name, t.num_segments from (select id, count(*) as num_segments from user group by 1 order by 2 desc limit 20) t join unsharded u on u.id = t.id",
"plan": {
"QueryType": "SELECT",
"Original": "select u.id, u.name, t.num_segments from (select id, count(*) as num_segments from user group by 1 order by 2 desc limit 20) t join unsharded u on u.id = t.id",
"Instructions": {
"OperatorType": "Join",
"Variant": "Join",
"JoinColumnIndexes": "R:0,R:1,L:0",
"JoinVars": {
"t_id": 1
},
"TableName": "`user`_unsharded",
"Inputs": [
{
"OperatorType": "Limit",
"Count": "20",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select t.num_segments, t.id from (select id, count(*) as num_segments from `user` where 1 != 1 group by id) as t where 1 != 1",
"OrderBy": "0 DESC",
"Query": "select t.num_segments, t.id from (select id, count(*) as num_segments from `user` group by id) as t order by t.num_segments desc limit 20",
"Table": "`user`"
}
]
},
{
"OperatorType": "Route",
"Variant": "Unsharded",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select u.id, u.`name` from unsharded as u where 1 != 1",
"Query": "select u.id, u.`name` from unsharded as u where u.id = :t_id",
"Table": "unsharded"
}
]
},
"TablesUsed": [
"main.unsharded",
"user.user"
]
}
},
{
"comment": "scatter aggregate multiple group by (numbers)",
"query": "select a, b, count(*) from user group by 2, 1",
Expand Down Expand Up @@ -3650,7 +3702,7 @@
},
"FieldQuery": "select x.id, x.val1, 1, weight_string(x.val1) from (select id, val1 from `user` where 1 != 1) as x where 1 != 1",
"OrderBy": "(1|3) ASC",
"Query": "select x.id, x.val1, 1, weight_string(x.val1) from (select id, val1 from `user` where val2 < 4) as x order by `user`.val1 asc limit 2",
"Query": "select x.id, x.val1, 1, weight_string(x.val1) from (select id, val1 from `user` where val2 < 4) as x order by x.val1 asc limit 2",
"Table": "`user`"
}
]
Expand Down Expand Up @@ -7173,7 +7225,7 @@
},
"FieldQuery": "select subquery_for_count.one, subquery_for_count.id, 1, weight_string(subquery_for_count.id) from (select 1 as one, id from `user` where 1 != 1) as subquery_for_count where 1 != 1",
"OrderBy": "(1|3) DESC",
"Query": "select subquery_for_count.one, subquery_for_count.id, 1, weight_string(subquery_for_count.id) from (select 1 as one, id from `user` where `user`.is_not_deleted = true) as subquery_for_count order by id desc limit 25",
"Query": "select subquery_for_count.one, subquery_for_count.id, 1, weight_string(subquery_for_count.id) from (select 1 as one, id from `user` where `user`.is_not_deleted = true) as subquery_for_count order by subquery_for_count.id desc limit 25",
"Table": "`user`"
}
]
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/testdata/cte_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@
},
"FieldQuery": "select x.id, x.val1, 1, weight_string(x.val1) from (select id, val1 from `user` where 1 != 1) as x where 1 != 1",
"OrderBy": "(1|3) ASC",
"Query": "select x.id, x.val1, 1, weight_string(x.val1) from (select id, val1 from `user` where val2 < 4) as x order by `user`.val1 asc limit 2",
"Query": "select x.id, x.val1, 1, weight_string(x.val1) from (select id, val1 from `user` where val2 < 4) as x order by x.val1 asc limit 2",
"Table": "`user`"
}
]
Expand Down

0 comments on commit 1f881fd

Please sign in to comment.