Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: support push window function down to tiflash #31601

Merged
merged 54 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
e11202f
just stage
LittleFall Jan 12, 2022
1250746
rename PhysicalProperty.IsEmpty to PhysicalProperty.IsSortItemEmpty
LittleFall Jan 12, 2022
3ed087b
stage
LittleFall Jan 13, 2022
4f566ee
exhaustPhysicalPlans done
LittleFall Jan 17, 2022
5f6b138
explain select *, row_number() over (partition by deptid+1) FROM em…
LittleFall Jan 17, 2022
877d402
add basic test
LittleFall Jan 17, 2022
844e137
stage, need another mpp partition type.
LittleFall Jan 18, 2022
2cc1625
Merge branch 'track/master' into dev/window
LittleFall Jan 22, 2022
91382d9
resolve conflict
LittleFall Jan 24, 2022
e7ea0ee
a bit ready.
LittleFall Jan 24, 2022
fd10473
clone logic
LittleFall Jan 24, 2022
df7553f
2pb done
LittleFall Jan 26, 2022
2176d7c
fix multi same window in plan bug
LittleFall Jan 26, 2022
d50bc89
enhance robustness of plan select (need a independent pr)
LittleFall Jan 26, 2022
6430e8b
use local tipb
LittleFall Jan 26, 2022
5482136
tmp method
LittleFall Jan 26, 2022
b7255dd
stage tests
LittleFall Jan 26, 2022
0853b99
fix empty sort item bug
LittleFall Jan 26, 2022
9232025
add new tests
LittleFall Jan 26, 2022
d16043a
frame to pb
LittleFall Feb 7, 2022
285d740
add push down list
LittleFall Feb 7, 2022
48282f8
make fmt
LittleFall Feb 7, 2022
fef1f12
add warning for frames
LittleFall Feb 8, 2022
1836971
Merge branch 'track/master' into dev/window
LittleFall Feb 8, 2022
eb97e7a
fix a push down issue.
LittleFall Feb 9, 2022
d52b75f
check todo
LittleFall Mar 14, 2022
df5956b
Merge branch 'track/master' into dev/window
LittleFall Mar 14, 2022
1000c22
Merge branch 'master' into dev/window
LittleFall Mar 15, 2022
18bdc74
remove check
LittleFall Mar 15, 2022
45577b1
tidy
LittleFall Mar 15, 2022
7b15842
update tipb
LittleFall Mar 15, 2022
1badcf2
refine tests.
LittleFall Mar 15, 2022
1b2d5ad
Merge branch 'master' into dev/window
LittleFall Mar 15, 2022
3654901
ban aggs
LittleFall Mar 15, 2022
ea5edeb
fix a little bug
LittleFall Mar 15, 2022
88f9dea
Merge branch 'track/master' into dev/window
LittleFall Mar 15, 2022
04d9cd5
Merge remote-tracking branch 'qizhi/dev/window' into dev/window
LittleFall Mar 15, 2022
20d1473
Merge branch 'master' into dev/window
LittleFall Mar 15, 2022
f6b9a96
fmt
LittleFall Mar 15, 2022
c800484
add blacklist
LittleFall Mar 15, 2022
d8fd0d6
fix typo.
LittleFall Mar 15, 2022
b4f435c
Merge branch 'track/master' into dev/window
LittleFall Mar 15, 2022
bd10a4a
add more tests
LittleFall Mar 15, 2022
543b157
add test stash
LittleFall Mar 21, 2022
2280c33
Merge branch 'track/master' into dev/window
LittleFall Apr 11, 2022
f36efc0
go mod tidy
LittleFall Apr 11, 2022
3132f87
Merge branch 'track/master' into dev/window
LittleFall Apr 12, 2022
76546f5
try fix mod
LittleFall Apr 12, 2022
7f32010
Merge branch 'track/master' into dev/window
LittleFall May 13, 2022
dac8a78
fix compile
LittleFall May 13, 2022
b0a571d
fmt
LittleFall May 13, 2022
1d34a5b
Merge branch 'track/master' into dev/window
LittleFall May 13, 2022
09473a6
fix lint
LittleFall May 13, 2022
b7ac3c5
fix test
LittleFall May 13, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions executor/partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func updateExecutorTableID(ctx context.Context, exec *tipb.Executor, recursive b
child = exec.Join.Children[1-exec.Join.InnerIdx]
case tipb.ExecType_TypeProjection:
child = exec.Projection.Child
case tipb.ExecType_TypeWindow:
child = exec.Window.Child
case tipb.ExecType_TypeSort:
child = exec.Sort.Child
default:
return errors.Trace(fmt.Errorf("unknown new tipb protocol %d", exec.Tp))
}
Expand Down
44 changes: 39 additions & 5 deletions expression/aggregation/agg_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@ import (
"github.com/pingcap/tipb/go-tipb"
)

// AggFuncToPBExpr converts aggregate function to pb.
func AggFuncToPBExpr(sctx sessionctx.Context, client kv.Client, aggFunc *AggFuncDesc) *tipb.Expr {
pc := expression.NewPBConverter(client, sctx.GetSessionVars().StmtCtx)
var tp tipb.ExprType
switch aggFunc.Name {
// GetTiPBExpr return the TiPB ExprType of desc.
func (desc *baseFuncDesc) GetTiPBExpr(tryWindowDesc bool) (tp tipb.ExprType) {
switch desc.Name {
LittleFall marked this conversation as resolved.
Show resolved Hide resolved
case ast.AggFuncCount:
tp = tipb.ExprType_Count
case ast.AggFuncApproxCountDistinct:
Expand Down Expand Up @@ -68,6 +66,42 @@ func AggFuncToPBExpr(sctx sessionctx.Context, client kv.Client, aggFunc *AggFunc
case ast.AggFuncStddevSamp:
tp = tipb.ExprType_StddevSamp
}

if tp != tipb.ExprType_Null || !tryWindowDesc {
return
}

switch desc.Name {
case ast.WindowFuncRowNumber:
tp = tipb.ExprType_RowNumber
case ast.WindowFuncRank:
tp = tipb.ExprType_Rank
case ast.WindowFuncDenseRank:
tp = tipb.ExprType_DenseRank
case ast.WindowFuncCumeDist:
tp = tipb.ExprType_CumeDist
case ast.WindowFuncPercentRank:
tp = tipb.ExprType_PercentRank
case ast.WindowFuncNtile:
tp = tipb.ExprType_Ntile
case ast.WindowFuncLead:
tp = tipb.ExprType_Lead
case ast.WindowFuncLag:
tp = tipb.ExprType_Lag
case ast.WindowFuncFirstValue:
tp = tipb.ExprType_FirstValue
case ast.WindowFuncLastValue:
tp = tipb.ExprType_LastValue
case ast.WindowFuncNthValue:
tp = tipb.ExprType_NthValue
}
return tp
}

// AggFuncToPBExpr converts aggregate function to pb.
func AggFuncToPBExpr(sctx sessionctx.Context, client kv.Client, aggFunc *AggFuncDesc) *tipb.Expr {
pc := expression.NewPBConverter(client, sctx.GetSessionVars().StmtCtx)
tp := aggFunc.GetTiPBExpr(false)
if !client.IsRequestTypeSupported(kv.ReqTypeSelect, int64(tp)) {
return nil
}
Expand Down
40 changes: 40 additions & 0 deletions expression/aggregation/window_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"strings"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tipb/go-tipb"
)

// WindowFuncDesc describes a window function signature, only used in planner.
Expand Down Expand Up @@ -92,3 +94,41 @@ func NeedFrame(name string) bool {
_, ok := noFrameWindowFuncs[strings.ToLower(name)]
return !ok
}

// Clone makes a copy of SortItem.
func (s *WindowFuncDesc) Clone() *WindowFuncDesc {
return &WindowFuncDesc{*s.baseFuncDesc.clone()}
}

// WindowFuncToPBExpr converts aggregate function to pb.
func WindowFuncToPBExpr(sctx sessionctx.Context, client kv.Client, desc *WindowFuncDesc) *tipb.Expr {
pc := expression.NewPBConverter(client, sctx.GetSessionVars().StmtCtx)
tp := desc.GetTiPBExpr(true)
if !client.IsRequestTypeSupported(kv.ReqTypeSelect, int64(tp)) {
return nil
}

children := make([]*tipb.Expr, 0, len(desc.Args))
for _, arg := range desc.Args {
pbArg := pc.ExprToPB(arg)
if pbArg == nil {
return nil
}
children = append(children, pbArg)
}
return &tipb.Expr{Tp: tp, Children: children, FieldType: expression.ToPBFieldType(desc.RetTp)}
}

// CanPushDownToTiFlash control whether a window function desc can be push down to tiflash.
func (s *WindowFuncDesc) CanPushDownToTiFlash() bool {
// window functions
switch s.Name {
case ast.WindowFuncRowNumber, ast.WindowFuncRank, ast.WindowFuncDenseRank, ast.WindowFuncLead, ast.WindowFuncLag:
return true
// TODO: support aggregate functions
//case ast.AggFuncSum, ast.AggFuncCount, ast.AggFuncAvg, ast.AggFuncMax, ast.AggFuncMin:
// return true
}

return false
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
github.com/pingcap/log v1.1.0
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
github.com/pingcap/tipb v0.0.0-20220215045658-d12dec7a7609
github.com/pingcap/tipb v0.0.0-20220314125451-bfb5c2c55188
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.32.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,8 @@ github.com/pingcap/log v1.1.0 h1:ELiPxACz7vdo1qAvvaWJg1NrYFoY6gqAh/+Uo6aXdD8=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 h1:HYbcxtnkN3s5tqrZ/z3eJS4j3Db8wMphEm1q10lY/TM=
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4/go.mod h1:sDCsM39cGiv2vwunZkaFA917vVkqDTGSPbbV7z4Oops=
github.com/pingcap/tipb v0.0.0-20220215045658-d12dec7a7609 h1:BiCS1ZRnW0szOvTAa3gCqWIhyo+hv83SVaBgrUghXIU=
github.com/pingcap/tipb v0.0.0-20220215045658-d12dec7a7609/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pingcap/tipb v0.0.0-20220314125451-bfb5c2c55188 h1:+46isFI9fR9R+nJVDMI55tCC/TCwp+bvVA4HLGEv1rY=
github.com/pingcap/tipb v0.0.0-20220314125451-bfb5c2c55188/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
12 changes: 11 additions & 1 deletion kv/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package kv

import "github.com/pingcap/tipb/go-tipb"
import (
"github.com/pingcap/tipb/go-tipb"
)

// RequestTypeSupportedChecker is used to check expression can be pushed down.
type RequestTypeSupportedChecker struct{}
Expand All @@ -37,6 +39,10 @@ func (d RequestTypeSupportedChecker) IsRequestTypeSupported(reqType, subType int
return false
}

// TODO: deprecate this function, because:
// 1. we have more than one storage engine available for push-down.
// 2. we'd better do an accurate push-down check at the planner stage, instead of here.
// currently, we do aggregation push-down check in `CheckAggCanPushCop`.
func (d RequestTypeSupportedChecker) supportExpr(exprType tipb.ExprType) bool {
switch exprType {
case tipb.ExprType_Null, tipb.ExprType_Int64, tipb.ExprType_Uint64, tipb.ExprType_String, tipb.ExprType_Bytes,
Expand All @@ -48,6 +54,10 @@ func (d RequestTypeSupportedChecker) supportExpr(exprType tipb.ExprType) bool {
case tipb.ExprType_Count, tipb.ExprType_First, tipb.ExprType_Max, tipb.ExprType_Min, tipb.ExprType_Sum, tipb.ExprType_Avg,
tipb.ExprType_Agg_BitXor, tipb.ExprType_Agg_BitAnd, tipb.ExprType_Agg_BitOr, tipb.ExprType_ApproxCountDistinct, tipb.ExprType_GroupConcat:
return true
// window functions.
case tipb.ExprType_RowNumber, tipb.ExprType_Rank, tipb.ExprType_DenseRank, tipb.ExprType_CumeDist, tipb.ExprType_PercentRank,
tipb.ExprType_Ntile, tipb.ExprType_Lead, tipb.ExprType_Lag, tipb.ExprType_FirstValue, tipb.ExprType_LastValue, tipb.ExprType_NthValue:
return true
case ReqSubTypeDesc:
return true
case ReqSubTypeSignature:
Expand Down
2 changes: 1 addition & 1 deletion planner/cascades/enforcer_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func GetEnforcerRules(g *memo.Group, prop *property.PhysicalProperty) (enforcers
if g.EngineType != memo.EngineTiDB {
return
}
if !prop.IsEmpty() {
if !prop.IsSortItemEmpty() {
enforcers = append(enforcers, orderEnforcer)
}
return
Expand Down
26 changes: 13 additions & 13 deletions planner/cascades/implementation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type ImplTableDual struct {

// Match implements ImplementationRule Match interface.
func (r *ImplTableDual) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
return prop.IsEmpty()
return prop.IsSortItemEmpty()
}

// OnImplement implements ImplementationRule OnImplement interface.
Expand All @@ -114,7 +114,7 @@ type ImplMemTableScan struct {

// Match implements ImplementationRule Match interface.
func (r *ImplMemTableScan) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
return prop.IsEmpty()
return prop.IsSortItemEmpty()
}

// OnImplement implements ImplementationRule OnImplement interface.
Expand Down Expand Up @@ -189,15 +189,15 @@ type ImplTableScan struct {
// Match implements ImplementationRule Match interface.
func (r *ImplTableScan) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
ts := expr.ExprNode.(*plannercore.LogicalTableScan)
return prop.IsEmpty() || (len(prop.SortItems) == 1 && ts.HandleCols != nil && prop.SortItems[0].Col.Equal(nil, ts.HandleCols.GetCol(0)))
return prop.IsSortItemEmpty() || (len(prop.SortItems) == 1 && ts.HandleCols != nil && prop.SortItems[0].Col.Equal(nil, ts.HandleCols.GetCol(0)))
}

// OnImplement implements ImplementationRule OnImplement interface.
func (r *ImplTableScan) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) ([]memo.Implementation, error) {
logicProp := expr.Group.Prop
logicalScan := expr.ExprNode.(*plannercore.LogicalTableScan)
ts := logicalScan.GetPhysicalScan(logicProp.Schema, logicProp.Stats.ScaleByExpectCnt(reqProp.ExpectedCnt))
if !reqProp.IsEmpty() {
if !reqProp.IsSortItemEmpty() {
ts.KeepOrder = true
ts.Desc = reqProp.SortItems[0].Desc
}
Expand All @@ -219,7 +219,7 @@ func (r *ImplIndexScan) Match(expr *memo.GroupExpr, prop *property.PhysicalPrope
func (r *ImplIndexScan) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) ([]memo.Implementation, error) {
logicalScan := expr.ExprNode.(*plannercore.LogicalIndexScan)
is := logicalScan.GetPhysicalIndexScan(expr.Group.Prop.Schema, expr.Group.Prop.Stats.ScaleByExpectCnt(reqProp.ExpectedCnt))
if !reqProp.IsEmpty() {
if !reqProp.IsSortItemEmpty() {
is.KeepOrder = true
if reqProp.SortItems[0].Desc {
is.Desc = true
Expand All @@ -235,7 +235,7 @@ type ImplShow struct {

// Match implements ImplementationRule Match interface.
func (r *ImplShow) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
return prop.IsEmpty()
return prop.IsSortItemEmpty()
}

// OnImplement implements ImplementationRule OnImplement interface.
Expand Down Expand Up @@ -319,7 +319,7 @@ type ImplHashAgg struct {
// Match implements ImplementationRule Match interface.
func (r *ImplHashAgg) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
// TODO: deal with the hints when we have implemented StreamAgg.
return prop.IsEmpty()
return prop.IsSortItemEmpty()
}

// OnImplement implements ImplementationRule OnImplement interface.
Expand Down Expand Up @@ -348,7 +348,7 @@ type ImplLimit struct {

// Match implements ImplementationRule Match interface.
func (r *ImplLimit) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
return prop.IsEmpty()
return prop.IsSortItemEmpty()
}

// OnImplement implements ImplementationRule OnImplement interface.
Expand All @@ -371,7 +371,7 @@ type ImplTopN struct {
func (r *ImplTopN) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
topN := expr.ExprNode.(*plannercore.LogicalTopN)
if expr.Group.EngineType != memo.EngineTiDB {
return prop.IsEmpty()
return prop.IsSortItemEmpty()
}
return plannercore.MatchItems(prop, topN.ByItems)
}
Expand Down Expand Up @@ -446,7 +446,7 @@ type ImplHashJoinBuildLeft struct {
func (r *ImplHashJoinBuildLeft) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
switch expr.ExprNode.(*plannercore.LogicalJoin).JoinType {
case plannercore.InnerJoin, plannercore.LeftOuterJoin, plannercore.RightOuterJoin:
return prop.IsEmpty()
return prop.IsSortItemEmpty()
default:
return false
}
Expand All @@ -473,7 +473,7 @@ type ImplHashJoinBuildRight struct {

// Match implements ImplementationRule Match interface.
func (r *ImplHashJoinBuildRight) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
return prop.IsEmpty()
return prop.IsSortItemEmpty()
}

// OnImplement implements ImplementationRule OnImplement interface.
Expand Down Expand Up @@ -520,7 +520,7 @@ type ImplUnionAll struct {

// Match implements ImplementationRule Match interface.
func (r *ImplUnionAll) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
return prop.IsEmpty()
return prop.IsSortItemEmpty()
}

// OnImplement implements ImplementationRule OnImplement interface.
Expand Down Expand Up @@ -572,7 +572,7 @@ type ImplMaxOneRow struct {

// Match implements ImplementationRule Match interface.
func (r *ImplMaxOneRow) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
return prop.IsEmpty()
return prop.IsSortItemEmpty()
}

// OnImplement implements ImplementationRule OnImplement interface
Expand Down
Loading