Skip to content

Commit

Permalink
planner: update IsCompleteModeAgg and transform function of RuleInjec…
Browse files Browse the repository at this point in the history
…tProjectionBelowAgg to fix distinct agg bug (pingcap#24449)
  • Loading branch information
zoomxi committed May 27, 2021
1 parent 96ab4a3 commit 8a4a07d
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 12 deletions.
3 changes: 2 additions & 1 deletion planner/cascades/testdata/integration_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@
"select /*+ STREAM_AGG() */ count(distinct c) from t;", // should push down after stream agg implemented
"select /*+ HASH_AGG() */ count(distinct c) from t;",
"select count(distinct c) from t group by c;",
"select count(distinct c) from t;"
"select count(distinct c) from t;",
"select count(*) from t group by a having avg(distinct a)>1;" // #24449 Projection should be add between HashAgg and TableReader
]
},
{
Expand Down
15 changes: 15 additions & 0 deletions planner/cascades/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,21 @@
"Result": [
"2"
]
},
{
"SQL": "select count(*) from t group by a having avg(distinct a)>1;",
"Plan": [
"Projection_14 6400.00 root Column#5",
"└─Selection_15 6400.00 root gt(Column#6, 1)",
" └─HashAgg_20 8000.00 root group by:test.t.a, funcs:count(Column#8)->Column#5, funcs:avg(distinct Column#10)->Column#6",
" └─Projection_21 8000.00 root Column#8, cast(test.t.a, decimal(15,4) BINARY)->Column#10, test.t.a",
" └─TableReader_22 8000.00 root data:HashAgg_23",
" └─HashAgg_23 8000.00 cop[tikv] group by:test.t.a, funcs:count(1)->Column#8",
" └─TableFullScan_19 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
],
"Result": [
"1"
]
}
]
},
Expand Down
14 changes: 9 additions & 5 deletions planner/cascades/transformation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1882,7 +1882,7 @@ func (*outerJoinEliminator) prepareForEliminateOuterJoin(joinExpr *memo.GroupExp
return
}

// check whether one of unique keys sets is contained by inner join keys.
// isInnerJoinKeysContainUniqueKey check whether one of unique keys sets is contained by inner join keys.
func (*outerJoinEliminator) isInnerJoinKeysContainUniqueKey(innerGroup *memo.Group, joinKeys *expression.Schema) (bool, error) {
// builds UniqueKey info of innerGroup.
innerGroup.BuildKeyInfo()
Expand Down Expand Up @@ -2129,7 +2129,7 @@ func (r *TransformAggregateCaseToSelection) isOnlyOneNotNull(ctx sessionctx.Cont
return !args[outputIdx].Equal(ctx, expression.NewNull()) && (argsNum == 2 || args[3-outputIdx].Equal(ctx, expression.NewNull()))
}

// TransformAggregateCaseToSelection only support `case when cond then var end` and `case when cond then var1 else var2 end`.
// isTwoOrThreeArgCase only support `case when cond then var end` and `case when cond then var1 else var2 end`.
func (r *TransformAggregateCaseToSelection) isTwoOrThreeArgCase(expr expression.Expression) bool {
scalarFunc, ok := expr.(*expression.ScalarFunction)
if !ok {
Expand Down Expand Up @@ -2326,9 +2326,14 @@ func (r *InjectProjectionBelowAgg) OnTransform(old *memo.ExprIter) (newExprs []*
hasScalarFunc := false
copyFuncs := make([]*aggregation.AggFuncDesc, 0, len(agg.AggFuncs))
for _, aggFunc := range agg.AggFuncs {
copyFunc := aggFunc.Clone()
// WrapCastForAggArgs will modify AggFunc, so we should clone AggFunc.
copyFunc.WrapCastForAggArgs(agg.SCtx())
copyFunc := aggFunc.Clone()

// if aggFunc input is from 'partial data', no need to wrap cast for agg args
copyFunc.WrapCastAsDecimalForAggArgs(agg.SCtx())
if copyFunc.Mode != aggregation.FinalMode && copyFunc.Mode != aggregation.Partial2Mode {
copyFunc.WrapCastForAggArgs(agg.SCtx())
}
copyFuncs = append(copyFuncs, copyFunc)
for _, arg := range copyFunc.Args {
_, isScalarFunc := arg.(*expression.ScalarFunction)
Expand Down Expand Up @@ -2386,7 +2391,6 @@ func (r *InjectProjectionBelowAgg) OnTransform(old *memo.ExprIter) (newExprs []*
newGroupByItems[i] = newArg
}
}

// Construct GroupExpr, Group (Agg -> Proj -> Child).
proj := plannercore.LogicalProjection{
Exprs: projExprs,
Expand Down
19 changes: 13 additions & 6 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,15 @@ func (la *LogicalAggregation) IsPartialModeAgg() bool {

// IsCompleteModeAgg returns if all of the AggFuncs are CompleteMode.
func (la *LogicalAggregation) IsCompleteModeAgg() bool {
// Since all of the AggFunc share the same AggMode, we only need to check the first one.
return la.AggFuncs[0].Mode == aggregation.CompleteMode
// not all of the AggFunctions has the same AggMode
// for example: when cascades planner on, after PushAggDownGather transformed,
// some aggFunctions are CompleteMode, and the others are FinalMode
for _, aggFunc := range la.AggFuncs {
if aggFunc.Mode == aggregation.CompleteMode {
return true
}
}
return false
}

// GetGroupByCols returns the columns that are group-by items.
Expand Down Expand Up @@ -391,7 +398,7 @@ func (la *LogicalAggregation) GetUsedCols() (usedCols []*expression.Column) {
type LogicalSelection struct {
baseLogicalPlan

// Originally the WHERE or ON condition is parsed into a single expression,
// Conditions Originally the WHERE or ON condition is parsed into a single expression,
// but after we converted to CNF(Conjunctive normal form), it can be
// split into a list of AND conditions.
Conditions []expression.Expression
Expand Down Expand Up @@ -494,12 +501,12 @@ type DataSource struct {
// possibleAccessPaths stores all the possible access path for physical plan, including table scan.
possibleAccessPaths []*util.AccessPath

// The data source may be a partition, rather than a real table.
// isPartition The data source may be a partition, rather than a real table.
isPartition bool
physicalTableID int64
partitionNames []model.CIStr

// handleCol represents the handle column for the datasource, either the
// handleCols represents the handle column for the datasource, either the
// int primary key column or extra handle column.
// handleCol *expression.Column
handleCols HandleCols
Expand Down Expand Up @@ -557,7 +564,7 @@ type LogicalTableScan struct {
// LogicalIndexScan is the logical index scan operator for TiKV.
type LogicalIndexScan struct {
logicalSchemaProducer
// DataSource should be read-only here.
// Source should be read-only here.
Source *DataSource
IsDoubleRead bool

Expand Down
26 changes: 26 additions & 0 deletions planner/core/logical_plans_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -207,3 +208,28 @@ func (s *testUnitTestSuit) TestIndexPathSplitCorColCond(c *C) {
}
collate.SetNewCollationEnabledForTest(false)
}

func (s *testUnitTestSuit) TestIsCompleteModeAgg(c *C) {
defer testleak.AfterTest(c)()

aggFuncs := make([]*aggregation.AggFuncDesc, 2)
aggFuncs[0] = &aggregation.AggFuncDesc{
Mode: aggregation.FinalMode,
HasDistinct: true,
}
aggFuncs[1] = &aggregation.AggFuncDesc{
Mode: aggregation.CompleteMode,
HasDistinct: true,
}

newAgg := &LogicalAggregation{
AggFuncs: aggFuncs,
}
c.Assert(newAgg.IsCompleteModeAgg(), Equals, true)

aggFuncs[1] = &aggregation.AggFuncDesc{
Mode: aggregation.FinalMode,
HasDistinct: true,
}
c.Assert(newAgg.IsCompleteModeAgg(), Equals, false)
}

0 comments on commit 8a4a07d

Please sign in to comment.