Skip to content

Commit

Permalink
executor: refactor the BaseExecutor and remove session context in `…
Browse files Browse the repository at this point in the history
…ProjectionExec` (#54614)

close #54613
  • Loading branch information
YangKeao authored Jul 17, 2024
1 parent e1626a9 commit b5bbbf4
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 24 deletions.
7 changes: 4 additions & 3 deletions pkg/executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1803,9 +1803,10 @@ func benchmarkLimitExec(b *testing.B, cas *testutil.LimitCase) {
}
}
proj := &ProjectionExec{
BaseExecutor: exec.NewBaseExecutor(cas.Ctx, expression.NewSchema(usedCols...), 0, limit),
numWorkers: 1,
evaluatorSuit: expression.NewEvaluatorSuite(exprs, false),
projectionExecutorContext: newProjectionExecutorContext(cas.Ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(cas.Ctx.GetSessionVars(), expression.NewSchema(usedCols...), 0, limit),
numWorkers: 1,
evaluatorSuit: expression.NewEvaluatorSuite(exprs, false),
}
exe = proj
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1975,10 +1975,11 @@ func (b *executorBuilder) buildProjection(v *plannercore.PhysicalProjection) exe
return nil
}
e := &ProjectionExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec),
numWorkers: int64(b.ctx.GetSessionVars().ProjectionConcurrency()),
evaluatorSuit: expression.NewEvaluatorSuite(v.Exprs, v.AvoidColumnEvaluator),
calculateNoDelay: v.CalculateNoDelay,
projectionExecutorContext: newProjectionExecutorContext(b.ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID(), childExec),
numWorkers: int64(b.ctx.GetSessionVars().ProjectionConcurrency()),
evaluatorSuit: expression.NewEvaluatorSuite(v.Exprs, v.AvoidColumnEvaluator),
calculateNoDelay: v.CalculateNoDelay,
}

// If the calculation row count for this Projection operator is smaller
Expand Down Expand Up @@ -4732,10 +4733,11 @@ func (builder *dataReaderBuilder) buildProjectionForIndexJoin(
}()

e := &ProjectionExec{
BaseExecutor: exec.NewBaseExecutor(builder.ctx, v.Schema(), v.ID(), childExec),
numWorkers: int64(builder.ctx.GetSessionVars().ProjectionConcurrency()),
evaluatorSuit: expression.NewEvaluatorSuite(v.Exprs, v.AvoidColumnEvaluator),
calculateNoDelay: v.CalculateNoDelay,
projectionExecutorContext: newProjectionExecutorContext(builder.ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(builder.ctx.GetSessionVars(), v.Schema(), v.ID(), childExec),
numWorkers: int64(builder.ctx.GetSessionVars().ProjectionConcurrency()),
evaluatorSuit: expression.NewEvaluatorSuite(v.Exprs, v.AvoidColumnEvaluator),
calculateNoDelay: v.CalculateNoDelay,
}

// If the calculation row count for this Projection operator is smaller
Expand Down
7 changes: 4 additions & 3 deletions pkg/executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,10 @@ func TestProjectionParallelRequiredRows(t *testing.T) {

func buildProjectionExec(ctx sessionctx.Context, exprs []expression.Expression, src exec.Executor, numWorkers int) exec.Executor {
return &ProjectionExec{
BaseExecutor: exec.NewBaseExecutor(ctx, src.Schema(), 0, src),
numWorkers: int64(numWorkers),
evaluatorSuit: expression.NewEvaluatorSuite(exprs, false),
projectionExecutorContext: newProjectionExecutorContext(ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(ctx.GetSessionVars(), src.Schema(), 0, src),
numWorkers: int64(numWorkers),
evaluatorSuit: expression.NewEvaluatorSuite(exprs, false),
}
}

Expand Down
38 changes: 28 additions & 10 deletions pkg/executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,28 @@ type projectionOutput struct {
done chan error
}

// projectionExecutorContext is the execution context for the `ProjectionExec`
type projectionExecutorContext struct {
stmtMemTracker *memory.Tracker
stmtRuntimeStatsColl *execdetails.RuntimeStatsColl
evalCtx expression.EvalContext
enableVectorizedExpression bool
}

func newProjectionExecutorContext(sctx sessionctx.Context) projectionExecutorContext {
return projectionExecutorContext{
stmtMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker,
stmtRuntimeStatsColl: sctx.GetSessionVars().StmtCtx.RuntimeStatsColl,
evalCtx: sctx.GetExprCtx().GetEvalCtx(),
enableVectorizedExpression: sctx.GetSessionVars().EnableVectorizedExpression,
}
}

// ProjectionExec implements the physical Projection Operator:
// https://en.wikipedia.org/wiki/Projection_(relational_algebra)
type ProjectionExec struct {
exec.BaseExecutor
projectionExecutorContext
exec.BaseExecutorV2

evaluatorSuit *expression.EvaluatorSuite

Expand All @@ -85,7 +103,7 @@ type ProjectionExec struct {

// Open implements the Executor Open interface.
func (e *ProjectionExec) Open(ctx context.Context) error {
if err := e.BaseExecutor.Open(ctx); err != nil {
if err := e.BaseExecutorV2.Open(ctx); err != nil {
return err
}
failpoint.Inject("mockProjectionExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
Expand All @@ -105,7 +123,7 @@ func (e *ProjectionExec) open(_ context.Context) error {
} else {
e.memTracker = memory.NewTracker(e.ID(), -1)
}
e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker)
e.memTracker.AttachTo(e.stmtMemTracker)

// For now a Projection can not be executed vectorially only because it
// contains "SetVar" or "GetVar" functions, in this scenario this
Expand Down Expand Up @@ -204,7 +222,7 @@ func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk
if e.childResult.NumRows() == 0 {
return nil
}
err = e.evaluatorSuit.Run(e.Ctx().GetExprCtx().GetEvalCtx(), e.Ctx().GetSessionVars().EnableVectorizedExpression, e.childResult, chk)
err = e.evaluatorSuit.Run(e.evalCtx, e.enableVectorizedExpression, e.childResult, chk)
return err
}

Expand Down Expand Up @@ -251,7 +269,7 @@ func (e *ProjectionExec) prepare(ctx context.Context) {
for i := int64(0); i < e.numWorkers; i++ {
e.workers = append(e.workers, &projectionWorker{
proj: e,
sctx: e.Ctx(),
ctx: e.projectionExecutorContext,
evaluatorSuit: e.evaluatorSuit,
globalFinishCh: e.finishCh,
inputGiveBackCh: e.fetcher.inputCh,
Expand Down Expand Up @@ -324,16 +342,16 @@ func (e *ProjectionExec) Close() error {
e.drainOutputCh(w.outputCh)
}
}
if e.BaseExecutor.RuntimeStats() != nil {
if e.BaseExecutorV2.RuntimeStats() != nil {
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{}
if e.isUnparallelExec() {
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0))
} else {
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", int(e.numWorkers)))
}
e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), runtimeStats)
e.stmtRuntimeStatsColl.RegisterStats(e.ID(), runtimeStats)
}
return e.BaseExecutor.Close()
return e.BaseExecutorV2.Close()
}

type projectionInputFetcher struct {
Expand Down Expand Up @@ -403,7 +421,7 @@ func (f *projectionInputFetcher) run(ctx context.Context) {

type projectionWorker struct {
proj *ProjectionExec
sctx sessionctx.Context
ctx projectionExecutorContext
evaluatorSuit *expression.EvaluatorSuite
globalFinishCh <-chan struct{}
inputGiveBackCh chan<- *projectionInput
Expand Down Expand Up @@ -448,7 +466,7 @@ func (w *projectionWorker) run(ctx context.Context) {
}

mSize := output.chk.MemoryUsage() + input.chk.MemoryUsage()
err := w.evaluatorSuit.Run(w.sctx.GetExprCtx().GetEvalCtx(), w.sctx.GetSessionVars().EnableVectorizedExpression, input.chk, output.chk)
err := w.evaluatorSuit.Run(w.ctx.evalCtx, w.ctx.enableVectorizedExpression, input.chk, output.chk)
failpoint.Inject("ConsumeRandomPanic", nil)
w.proj.memTracker.Consume(output.chk.MemoryUsage() + input.chk.MemoryUsage() - mSize)
output.done <- err
Expand Down

0 comments on commit b5bbbf4

Please sign in to comment.