Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gen4: Distinct plan refactor and support grouping and aggregation on join query #8506

Merged
merged 9 commits into from
Jul 22, 2021
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/abstract/queryprojection.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func CreateQPFromSelect(sel *sqlparser.Select) (*QueryProjection, error) {
qp.GroupByExprs = append(qp.GroupByExprs, GroupBy{Inner: expr, WeightStrExpr: weightStrExpr})
}

if qp.HasAggr {
if qp.HasAggr || len(qp.GroupByExprs) > 0 {
expr := qp.getNonAggrExprNotMatchingGroupByExprs()
// if we have aggregation functions, non aggregating columns and GROUP BY,
// the non-aggregating expressions must all be listed in the GROUP BY list
Expand Down
103 changes: 100 additions & 3 deletions go/vt/vtgate/planbuilder/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ func (hp *horizonPlanning) haveToTruncate(v bool) {
func (hp *horizonPlanning) planAggregations() error {
newPlan := hp.plan
var oa *orderedAggregate
if !hasUniqueVindex(hp.vschema, hp.semTable, hp.qp.GroupByExprs) {
uniqVindex := hasUniqueVindex(hp.vschema, hp.semTable, hp.qp.GroupByExprs)
_, joinPlan := hp.plan.(*joinGen4)
if !uniqVindex || joinPlan {
eaggr := &engine.OrderedAggregate{}
oa = &orderedAggregate{
resultsBuilder: resultsBuilder{
Expand Down Expand Up @@ -219,6 +221,9 @@ func planGroupByGen4(groupExpr abstract.GroupBy, plan logicalPlan, semTable *sem
sel := node.Select.(*sqlparser.Select)
sel.GroupBy = append(sel.GroupBy, groupExpr.Inner)
return false, nil
case *joinGen4:
_, _, added, err := wrapAndPushExpr(groupExpr.Inner, groupExpr.WeightStrExpr, node, semTable)
return added, err
case *orderedAggregate:
keyCol, weightStringOffset, colAdded, err := wrapAndPushExpr(groupExpr.Inner, groupExpr.WeightStrExpr, node.input, semTable)
if err != nil {
Expand Down Expand Up @@ -315,6 +320,8 @@ func planOrderByForRoute(orderExprs []abstract.OrderBy, plan *route, semTable *s
return plan, origColCount != plan.Select.GetColumnCount(), nil
}

// wrapAndPushExpr pushes the expression and weighted_string function to the plan using semantics.SemTable
// It returns (expr offset, weight_string offset, new_column added, error)
func wrapAndPushExpr(expr sqlparser.Expr, weightStrExpr sqlparser.Expr, plan logicalPlan, semTable *semantics.SemTable) (int, int, bool, error) {
offset, added, err := pushProjection(&sqlparser.AliasedExpr{Expr: expr}, plan, semTable, true, true)
if err != nil {
Expand Down Expand Up @@ -370,8 +377,6 @@ func (hp *horizonPlanning) planOrderByForJoin(orderExprs []abstract.OrderBy, pla
return nil, err
}
plan.Left = newLeft
hp.needsTruncation = false // since this is a join, we can safely
// add extra columns and not need to truncate them
return plan, nil
}
sortPlan, err := hp.createMemorySortPlan(plan, orderExprs)
Expand Down Expand Up @@ -457,3 +462,95 @@ func allLeft(orderExprs []abstract.OrderBy, semTable *semantics.SemTable, lhsTab
}
return true
}

func (hp *horizonPlanning) planDistinct() error {
if !hp.qp.NeedsDistinct() {
return nil
}
switch p := hp.plan.(type) {
case *route:
// we always make the underlying query distinct,
// and then we might also add a distinct operator on top if it is needed
p.Select.MakeDistinct()
if !p.isSingleShard() && !selectHasUniqueVindex(hp.vschema, hp.semTable, hp.qp.SelectExprs) {
return hp.pushDistinct()
}
return nil
case *joinGen4:
return hp.pushDistinct()
case *orderedAggregate:
return hp.planDistinctOA(p)
default:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unknown plan type for DISTINCT %T", hp.plan)
}
}

func (hp *horizonPlanning) planDistinctOA(currPlan *orderedAggregate) error {
eaggr := &engine.OrderedAggregate{}
oa := &orderedAggregate{
resultsBuilder: resultsBuilder{
logicalPlanCommon: newBuilderCommon(hp.plan),
weightStrings: make(map[*resultColumn]int),
truncater: eaggr,
},
eaggr: eaggr,
}
for _, sExpr := range hp.qp.SelectExprs {
found := false
for _, grpParam := range currPlan.eaggr.GroupByKeys {
if sqlparser.EqualsExpr(sExpr.Col.Expr, grpParam.Expr) {
found = true
eaggr.GroupByKeys = append(eaggr.GroupByKeys, grpParam)
break
}
}
if found {
continue
}
for _, aggrParam := range currPlan.eaggr.Aggregates {
if sqlparser.EqualsExpr(sExpr.Col.Expr, aggrParam.Expr) {
found = true
eaggr.GroupByKeys = append(eaggr.GroupByKeys, engine.GroupByParams{KeyCol: aggrParam.Col, WeightStringCol: -1})
break
}
}
if !found {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unable to plan distinct query as the column is not projected: %s", sqlparser.String(sExpr.Col))
}
}
hp.plan = oa
return nil
}

func (hp *horizonPlanning) pushDistinct() error {
eaggr := &engine.OrderedAggregate{}
oa := &orderedAggregate{
resultsBuilder: resultsBuilder{
logicalPlanCommon: newBuilderCommon(hp.plan),
weightStrings: make(map[*resultColumn]int),
truncater: eaggr,
},
eaggr: eaggr,
}
for index, sExpr := range hp.qp.SelectExprs {
grpParam := engine.GroupByParams{KeyCol: index, WeightStringCol: -1}
_, wOffset, added, err := wrapAndPushExpr(sExpr.Col.Expr, sExpr.Col.Expr, hp.plan, hp.semTable)
if err != nil {
return err
}
hp.needsTruncation = hp.needsTruncation || added
grpParam.WeightStringCol = wOffset
eaggr.GroupByKeys = append(eaggr.GroupByKeys, grpParam)
}
hp.plan = oa
return nil
}

func selectHasUniqueVindex(vschema ContextVSchema, semTable *semantics.SemTable, sel []abstract.SelectExpr) bool {
for _, expr := range sel {
if exprHasUniqueVindex(vschema, semTable, expr.Col.Expr) {
return true
}
}
return false
}
26 changes: 11 additions & 15 deletions go/vt/vtgate/planbuilder/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,15 +464,19 @@ func (vw *vschemaWrapper) ErrorIfShardedF(keyspace *vindexes.Keyspace, _, errFmt
return nil
}

func (vw *vschemaWrapper) currentDb() string {
ksName := ""
if vw.keyspace != nil {
ksName = vw.keyspace.Name
}
return ksName
}

func escapeNewLines(in string) string {
return strings.ReplaceAll(in, "\n", "\\n")
}

func testFile(t *testing.T, filename, tempDir string, vschema *vschemaWrapper, checkGen4equalPlan bool) {
ksName := ""
if vschema.keyspace != nil {
ksName = vschema.keyspace.Name
}
var checkAllTests = false
t.Run(filename, func(t *testing.T) {
expected := &strings.Builder{}
Expand All @@ -481,7 +485,7 @@ func testFile(t *testing.T, filename, tempDir string, vschema *vschemaWrapper, c
for tcase := range iterateExecFile(filename) {
t.Run(fmt.Sprintf("%d V3: %s", tcase.lineno, tcase.comments), func(t *testing.T) {
vschema.version = V3
plan, err := TestBuilder(tcase.input, vschema, ksName)
plan, err := TestBuilder(tcase.input, vschema, vschema.currentDb())
out := getPlanOrErrorOutput(err, plan)

if out != tcase.output {
Expand Down Expand Up @@ -558,11 +562,7 @@ func getPlanOutput(tcase testCase, vschema *vschemaWrapper) (out string, err err
out = fmt.Sprintf("panicked: %v\n%s", r, string(debug.Stack()))
}
}()
ksName := ""
if vschema.keyspace != nil {
ksName = vschema.keyspace.Name
}
plan, err := TestBuilder(tcase.input, vschema, ksName)
plan, err := TestBuilder(tcase.input, vschema, vschema.currentDb())
out = getPlanOrErrorOutput(err, plan)
return out, err
}
Expand Down Expand Up @@ -747,16 +747,12 @@ func BenchmarkSelectVsDML(b *testing.B) {
}

func benchmarkPlanner(b *testing.B, version PlannerVersion, testCases []testCase, vschema *vschemaWrapper) {
ksName := ""
if vschema.keyspace != nil {
ksName = vschema.keyspace.Name
}
b.ReportAllocs()
for n := 0; n < b.N; n++ {
for _, tcase := range testCases {
if tcase.output2ndPlanner != "" {
vschema.version = version
_, _ = TestBuilder(tcase.input, vschema, ksName)
_, _ = TestBuilder(tcase.input, vschema, vschema.currentDb())
}
}
}
Expand Down
43 changes: 8 additions & 35 deletions go/vt/vtgate/planbuilder/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ type horizonPlanning struct {
vtgateGrouping bool
}

func (hp horizonPlanning) planHorizon() (logicalPlan, error) {
func (hp *horizonPlanning) planHorizon() (logicalPlan, error) {
rb, ok := hp.plan.(*route)
if !ok && hp.semTable.ProjectionErr != nil {
return nil, hp.semTable.ProjectionErr
Expand Down Expand Up @@ -238,29 +238,29 @@ func (hp horizonPlanning) planHorizon() (logicalPlan, error) {
}
}

err = hp.truncateColumnsIfNeeded()
err = hp.planDistinct()
if err != nil {
return nil, err
}

if hp.qp.NeedsDistinct() {
hp.plan, err = pushDistinct(hp.plan, hp.semTable, hp.vschema, hp.qp)
if err != nil {
return nil, err
}
err = hp.truncateColumnsIfNeeded()
if err != nil {
return nil, err
}

return hp.plan, nil
}

func (hp horizonPlanning) truncateColumnsIfNeeded() error {
func (hp *horizonPlanning) truncateColumnsIfNeeded() error {
if !hp.needsTruncation {
return nil
}

switch p := hp.plan.(type) {
case *route:
p.eroute.SetTruncateColumnCount(hp.sel.GetColumnCount())
case *joinGen4:
// since this is a join, we can safely add extra columns and not need to truncate them
case *orderedAggregate:
p.eaggr.SetTruncateColumnCount(hp.sel.GetColumnCount())
case *memorySort:
Expand All @@ -272,33 +272,6 @@ func (hp horizonPlanning) truncateColumnsIfNeeded() error {
return nil
}

func pushDistinct(plan logicalPlan, semTable *semantics.SemTable, vschema ContextVSchema, qp *abstract.QueryProjection) (logicalPlan, error) {
switch p := plan.(type) {
case *route:
// we always make the underlying query distinct,
// and then we might also add a distinct operator on top if it is needed
p.Select.MakeDistinct()
if !p.isSingleShard() && !selectHasUniqueVindex(vschema, semTable, qp.SelectExprs) {
plan = newDistinct(plan)
}
return plan, nil
case *orderedAggregate, *joinGen4:
return newDistinct(plan), nil

default:
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unknown plan type for DISTINCT %T", plan)
}
}

func selectHasUniqueVindex(vschema ContextVSchema, semTable *semantics.SemTable, sel []abstract.SelectExpr) bool {
for _, expr := range sel {
if exprHasUniqueVindex(vschema, semTable, expr.Col.Expr) {
return true
}
}
return false
}

func exprHasUniqueVindex(vschema ContextVSchema, semTable *semantics.SemTable, expr sqlparser.Expr) bool {
col, isCol := expr.(*sqlparser.ColName)
if !isCol {
Expand Down
Loading