diff --git a/pkg/executor/benchmark_test.go b/pkg/executor/benchmark_test.go index f73298b3c4dec..c459c18ebbb1f 100644 --- a/pkg/executor/benchmark_test.go +++ b/pkg/executor/benchmark_test.go @@ -1803,9 +1803,10 @@ func benchmarkLimitExec(b *testing.B, cas *testutil.LimitCase) { } } proj := &ProjectionExec{ - BaseExecutor: exec.NewBaseExecutor(cas.Ctx, expression.NewSchema(usedCols...), 0, limit), - numWorkers: 1, - evaluatorSuit: expression.NewEvaluatorSuite(exprs, false), + projectionExecutorContext: newProjectionExecutorContext(cas.Ctx), + BaseExecutorV2: exec.NewBaseExecutorV2(cas.Ctx.GetSessionVars(), expression.NewSchema(usedCols...), 0, limit), + numWorkers: 1, + evaluatorSuit: expression.NewEvaluatorSuite(exprs, false), } exe = proj } diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index e9351171f7243..8f05eca6298ab 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -1975,10 +1975,11 @@ func (b *executorBuilder) buildProjection(v *plannercore.PhysicalProjection) exe return nil } e := &ProjectionExec{ - BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec), - numWorkers: int64(b.ctx.GetSessionVars().ProjectionConcurrency()), - evaluatorSuit: expression.NewEvaluatorSuite(v.Exprs, v.AvoidColumnEvaluator), - calculateNoDelay: v.CalculateNoDelay, + projectionExecutorContext: newProjectionExecutorContext(b.ctx), + BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID(), childExec), + numWorkers: int64(b.ctx.GetSessionVars().ProjectionConcurrency()), + evaluatorSuit: expression.NewEvaluatorSuite(v.Exprs, v.AvoidColumnEvaluator), + calculateNoDelay: v.CalculateNoDelay, } // If the calculation row count for this Projection operator is smaller @@ -4732,10 +4733,11 @@ func (builder *dataReaderBuilder) buildProjectionForIndexJoin( }() e := &ProjectionExec{ - BaseExecutor: exec.NewBaseExecutor(builder.ctx, v.Schema(), v.ID(), childExec), - numWorkers: int64(builder.ctx.GetSessionVars().ProjectionConcurrency()), - evaluatorSuit: expression.NewEvaluatorSuite(v.Exprs, v.AvoidColumnEvaluator), - calculateNoDelay: v.CalculateNoDelay, + projectionExecutorContext: newProjectionExecutorContext(builder.ctx), + BaseExecutorV2: exec.NewBaseExecutorV2(builder.ctx.GetSessionVars(), v.Schema(), v.ID(), childExec), + numWorkers: int64(builder.ctx.GetSessionVars().ProjectionConcurrency()), + evaluatorSuit: expression.NewEvaluatorSuite(v.Exprs, v.AvoidColumnEvaluator), + calculateNoDelay: v.CalculateNoDelay, } // If the calculation row count for this Projection operator is smaller diff --git a/pkg/executor/executor_required_rows_test.go b/pkg/executor/executor_required_rows_test.go index b2782935b5cdf..8ae247704890e 100644 --- a/pkg/executor/executor_required_rows_test.go +++ b/pkg/executor/executor_required_rows_test.go @@ -600,9 +600,10 @@ func TestProjectionParallelRequiredRows(t *testing.T) { func buildProjectionExec(ctx sessionctx.Context, exprs []expression.Expression, src exec.Executor, numWorkers int) exec.Executor { return &ProjectionExec{ - BaseExecutor: exec.NewBaseExecutor(ctx, src.Schema(), 0, src), - numWorkers: int64(numWorkers), - evaluatorSuit: expression.NewEvaluatorSuite(exprs, false), + projectionExecutorContext: newProjectionExecutorContext(ctx), + BaseExecutorV2: exec.NewBaseExecutorV2(ctx.GetSessionVars(), src.Schema(), 0, src), + numWorkers: int64(numWorkers), + evaluatorSuit: expression.NewEvaluatorSuite(exprs, false), } } diff --git a/pkg/executor/projection.go b/pkg/executor/projection.go index 66b3f252829cc..ace32ec96f88e 100644 --- a/pkg/executor/projection.go +++ b/pkg/executor/projection.go @@ -55,10 +55,28 @@ type projectionOutput struct { done chan error } +// projectionExecutorContext is the execution context for the `ProjectionExec` +type projectionExecutorContext struct { + stmtMemTracker *memory.Tracker + stmtRuntimeStatsColl *execdetails.RuntimeStatsColl + evalCtx expression.EvalContext + enableVectorizedExpression bool +} + +func newProjectionExecutorContext(sctx sessionctx.Context) projectionExecutorContext { + return projectionExecutorContext{ + stmtMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker, + stmtRuntimeStatsColl: sctx.GetSessionVars().StmtCtx.RuntimeStatsColl, + evalCtx: sctx.GetExprCtx().GetEvalCtx(), + enableVectorizedExpression: sctx.GetSessionVars().EnableVectorizedExpression, + } +} + // ProjectionExec implements the physical Projection Operator: // https://en.wikipedia.org/wiki/Projection_(relational_algebra) type ProjectionExec struct { - exec.BaseExecutor + projectionExecutorContext + exec.BaseExecutorV2 evaluatorSuit *expression.EvaluatorSuite @@ -85,7 +103,7 @@ type ProjectionExec struct { // Open implements the Executor Open interface. func (e *ProjectionExec) Open(ctx context.Context) error { - if err := e.BaseExecutor.Open(ctx); err != nil { + if err := e.BaseExecutorV2.Open(ctx); err != nil { return err } failpoint.Inject("mockProjectionExecBaseExecutorOpenReturnedError", func(val failpoint.Value) { @@ -105,7 +123,7 @@ func (e *ProjectionExec) open(_ context.Context) error { } else { e.memTracker = memory.NewTracker(e.ID(), -1) } - e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker) + e.memTracker.AttachTo(e.stmtMemTracker) // For now a Projection can not be executed vectorially only because it // contains "SetVar" or "GetVar" functions, in this scenario this @@ -204,7 +222,7 @@ func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk if e.childResult.NumRows() == 0 { return nil } - err = e.evaluatorSuit.Run(e.Ctx().GetExprCtx().GetEvalCtx(), e.Ctx().GetSessionVars().EnableVectorizedExpression, e.childResult, chk) + err = e.evaluatorSuit.Run(e.evalCtx, e.enableVectorizedExpression, e.childResult, chk) return err } @@ -251,7 +269,7 @@ func (e *ProjectionExec) prepare(ctx context.Context) { for i := int64(0); i < e.numWorkers; i++ { e.workers = append(e.workers, &projectionWorker{ proj: e, - sctx: e.Ctx(), + ctx: e.projectionExecutorContext, evaluatorSuit: e.evaluatorSuit, globalFinishCh: e.finishCh, inputGiveBackCh: e.fetcher.inputCh, @@ -324,16 +342,16 @@ func (e *ProjectionExec) Close() error { e.drainOutputCh(w.outputCh) } } - if e.BaseExecutor.RuntimeStats() != nil { + if e.BaseExecutorV2.RuntimeStats() != nil { runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{} if e.isUnparallelExec() { runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0)) } else { runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", int(e.numWorkers))) } - e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), runtimeStats) + e.stmtRuntimeStatsColl.RegisterStats(e.ID(), runtimeStats) } - return e.BaseExecutor.Close() + return e.BaseExecutorV2.Close() } type projectionInputFetcher struct { @@ -403,7 +421,7 @@ func (f *projectionInputFetcher) run(ctx context.Context) { type projectionWorker struct { proj *ProjectionExec - sctx sessionctx.Context + ctx projectionExecutorContext evaluatorSuit *expression.EvaluatorSuite globalFinishCh <-chan struct{} inputGiveBackCh chan<- *projectionInput @@ -448,7 +466,7 @@ func (w *projectionWorker) run(ctx context.Context) { } mSize := output.chk.MemoryUsage() + input.chk.MemoryUsage() - err := w.evaluatorSuit.Run(w.sctx.GetExprCtx().GetEvalCtx(), w.sctx.GetSessionVars().EnableVectorizedExpression, input.chk, output.chk) + err := w.evaluatorSuit.Run(w.ctx.evalCtx, w.ctx.enableVectorizedExpression, input.chk, output.chk) failpoint.Inject("ConsumeRandomPanic", nil) w.proj.memTracker.Consume(output.chk.MemoryUsage() + input.chk.MemoryUsage() - mSize) output.done <- err