Skip to content

Commit

Permalink
executor: add CTEExec and CTETableReaderExec (#24809)
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Jun 1, 2021
1 parent 8dc2119 commit ccaefa2
Show file tree
Hide file tree
Showing 12 changed files with 984 additions and 19 deletions.
100 changes: 100 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/cteutil"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -83,6 +84,14 @@ type executorBuilder struct {
hasLock bool
}

// CTEStorages stores resTbl and iterInTbl for CTEExec.
// There will be a map[CTEStorageID]*CTEStorages in StmtCtx,
// which will store all CTEStorages to make all shared CTEs use same the CTEStorages.
type CTEStorages struct {
ResTbl cteutil.Storage
IterInTbl cteutil.Storage
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema) *executorBuilder {
return &executorBuilder{
ctx: ctx,
Expand Down Expand Up @@ -235,6 +244,10 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildAdminShowTelemetry(v)
case *plannercore.AdminResetTelemetryID:
return b.buildAdminResetTelemetryID(v)
case *plannercore.PhysicalCTE:
return b.buildCTE(v)
case *plannercore.PhysicalCTETable:
return b.buildCTETableReader(v)
default:
if mp, ok := p.(MockPhysicalPlan); ok {
return mp.GetExecutor()
Expand Down Expand Up @@ -4072,3 +4085,90 @@ func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) *
}
return e
}

func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor {
// 1. Build seedPlan.
seedExec := b.build(v.SeedPlan)
if b.err != nil {
return nil
}

// 2. Build iterInTbl.
chkSize := b.ctx.GetSessionVars().MaxChunkSize
tps := seedExec.base().retFieldTypes
iterOutTbl := cteutil.NewStorageRowContainer(tps, chkSize)
if err := iterOutTbl.OpenAndRef(); err != nil {
b.err = err
return nil
}

var resTbl cteutil.Storage
var iterInTbl cteutil.Storage
storageMap, ok := b.ctx.GetSessionVars().StmtCtx.CTEStorageMap.(map[int]*CTEStorages)
if !ok {
b.err = errors.New("type assertion for CTEStorageMap failed")
return nil
}
storages, ok := storageMap[v.CTE.IDForStorage]
if ok {
// Storage already setup.
resTbl = storages.ResTbl
iterInTbl = storages.IterInTbl
} else {
resTbl = cteutil.NewStorageRowContainer(tps, chkSize)
if err := resTbl.OpenAndRef(); err != nil {
b.err = err
return nil
}
iterInTbl = cteutil.NewStorageRowContainer(tps, chkSize)
if err := iterInTbl.OpenAndRef(); err != nil {
b.err = err
return nil
}
storageMap[v.CTE.IDForStorage] = &CTEStorages{ResTbl: resTbl, IterInTbl: iterInTbl}
}

// 3. Build recursive part.
recursiveExec := b.build(v.RecurPlan)
if b.err != nil {
return nil
}

var sel []int
if v.CTE.IsDistinct {
sel = make([]int, chkSize)
for i := 0; i < chkSize; i++ {
sel[i] = i
}
}

return &CTEExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
seedExec: seedExec,
recursiveExec: recursiveExec,
resTbl: resTbl,
iterInTbl: iterInTbl,
iterOutTbl: iterOutTbl,
chkIdx: 0,
isDistinct: v.CTE.IsDistinct,
sel: sel,
}
}

func (b *executorBuilder) buildCTETableReader(v *plannercore.PhysicalCTETable) Executor {
storageMap, ok := b.ctx.GetSessionVars().StmtCtx.CTEStorageMap.(map[int]*CTEStorages)
if !ok {
b.err = errors.New("type assertion for CTEStorageMap failed")
return nil
}
storages, ok := storageMap[v.IDForStorage]
if !ok {
b.err = errors.Errorf("iterInTbl should already be set up by CTEExec(id: %d)", v.IDForStorage)
return nil
}
return &CTETableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
iterInTbl: storages.IterInTbl,
chkIdx: 0,
}
}
Loading

0 comments on commit ccaefa2

Please sign in to comment.