diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 21e946b269b3..97ee9c6538a2 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -407,7 +407,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { if details.URI == "" { initialDetails := details backupDetails, m, err := getBackupDetailAndManifest( - ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, details, p.User(), + ctx, p.ExecCfg(), p.Txn(), details, p.User(), ) if err != nil { return err diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 1d34bcb8ed06..26dc714d3a8b 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -746,7 +746,7 @@ func backupPlanHook( return sqlDescIDs }(), } - plannerTxn := p.ExtendedEvalContext().Txn + plannerTxn := p.Txn() if backupStmt.Options.Detached { // When running inside an explicit transaction, we simply create the job @@ -791,7 +791,7 @@ func backupPlanHook( // TODO(dt): delete this in 22.2. backupDetails, backupManifest, err := getBackupDetailAndManifest( - ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, initialDetails, p.User(), + ctx, p.ExecCfg(), p.Txn(), initialDetails, p.User(), ) if err != nil { return err @@ -804,7 +804,7 @@ func backupPlanHook( // We create the job record in the planner's transaction to ensure that // the job record creation happens transactionally. - plannerTxn := p.ExtendedEvalContext().Txn + plannerTxn := p.Txn() // Write backup manifest into a temporary checkpoint file. // This accomplishes 2 purposes: diff --git a/pkg/ccl/backupccl/create_scheduled_backup.go b/pkg/ccl/backupccl/create_scheduled_backup.go index 4085e1ae99c7..85fbea9803ea 100644 --- a/pkg/ccl/backupccl/create_scheduled_backup.go +++ b/pkg/ccl/backupccl/create_scheduled_backup.go @@ -458,7 +458,7 @@ func doCreateBackupSchedules( inc.Pause() inc.SetScheduleStatus("Waiting for initial backup to complete") - if err := inc.Create(ctx, ex, p.ExtendedEvalContext().Txn); err != nil { + if err := inc.Create(ctx, ex, p.Txn()); err != nil { return err } if err := emitSchedule(inc, backupNode, destinations, nil, /* incrementalFrom */ @@ -491,7 +491,7 @@ func doCreateBackupSchedules( } // Create the schedule (we need its ID to link dependent schedules below). - if err := full.Create(ctx, ex, p.ExtendedEvalContext().Txn); err != nil { + if err := full.Create(ctx, ex, p.Txn()); err != nil { return err } @@ -499,12 +499,12 @@ func doCreateBackupSchedules( // we update both the schedules with the ID of the other "dependent" schedule. if incRecurrence != nil { if err := setDependentSchedule(ctx, ex, fullScheduledBackupArgs, full, inc.ScheduleID(), - p.ExtendedEvalContext().Txn); err != nil { + p.Txn()); err != nil { return errors.Wrap(err, "failed to update full schedule with dependent incremental schedule id") } if err := setDependentSchedule(ctx, ex, incScheduledBackupArgs, inc, full.ScheduleID(), - p.ExtendedEvalContext().Txn); err != nil { + p.Txn()); err != nil { return errors.Wrap(err, "failed to update incremental schedule with dependent full schedule id") } @@ -664,7 +664,7 @@ func checkScheduleAlreadyExists( ) (bool, error) { row, err := p.ExecCfg().InternalExecutor.QueryRowEx(ctx, "check-sched", - p.ExtendedEvalContext().Txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, + p.Txn(), sessiondata.InternalExecutorOverride{User: security.RootUserName()}, fmt.Sprintf("SELECT count(schedule_name) FROM %s WHERE schedule_name = '%s'", scheduledjobs.ProdJobSchedulerEnv.ScheduledJobsTableName(), scheduleLabel)) @@ -677,12 +677,12 @@ func checkScheduleAlreadyExists( // dryRunBackup executes backup in dry-run mode: we simply execute backup // under transaction savepoint, and then rollback to that save point. func dryRunBackup(ctx context.Context, p sql.PlanHookState, backupNode *tree.Backup) error { - sp, err := p.ExtendedEvalContext().Txn.CreateSavepoint(ctx) + sp, err := p.Txn().CreateSavepoint(ctx) if err != nil { return err } err = dryRunInvokeBackup(ctx, p, backupNode) - if rollbackErr := p.ExtendedEvalContext().Txn.RollbackToSavepoint(ctx, sp); rollbackErr != nil { + if rollbackErr := p.Txn().RollbackToSavepoint(ctx, sp); rollbackErr != nil { return rollbackErr } return err diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index 9566b8709753..cbfb1f521019 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -1625,7 +1625,7 @@ func doRestorePlan( return errors.Errorf("%q option can only be used when restoring a single tenant", restoreOptAsTenant) } res, err := p.ExecCfg().InternalExecutor.QueryRow( - ctx, "restore-lookup-tenant", p.ExtendedEvalContext().Txn, + ctx, "restore-lookup-tenant", p.Txn(), `SELECT active FROM system.tenants WHERE id = $1`, newTenantID.ToUint64(), ) if err != nil { @@ -1640,7 +1640,7 @@ func doRestorePlan( } else { for _, i := range tenants { res, err := p.ExecCfg().InternalExecutor.QueryRow( - ctx, "restore-lookup-tenant", p.ExtendedEvalContext().Txn, + ctx, "restore-lookup-tenant", p.Txn(), `SELECT active FROM system.tenants WHERE id = $1`, i.ID, ) if err != nil { @@ -1831,7 +1831,7 @@ func doRestorePlan( // We do not wait for the job to finish. jobID := p.ExecCfg().JobRegistry.MakeJobID() _, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn( - ctx, jr, jobID, p.ExtendedEvalContext().Txn) + ctx, jr, jobID, p.Txn()) if err != nil { return err } @@ -1842,7 +1842,7 @@ func doRestorePlan( // We create the job record in the planner's transaction to ensure that // the job record creation happens transactionally. - plannerTxn := p.ExtendedEvalContext().Txn + plannerTxn := p.Txn() // Construct the job and commit the transaction. Perform this work in a // closure to ensure that the job is cleaned up if an error occurs. diff --git a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go index d506756205e5..fcf5551a02e2 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go @@ -62,7 +62,7 @@ func alterChangefeedPlanHook( } jobID := jobspb.JobID(tree.MustBeDInt(typedExpr)) - job, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.ExtendedEvalContext().Txn) + job, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.Txn()) if err != nil { err = errors.Wrapf(err, `could not load job with job id %d`, jobID) return err @@ -139,7 +139,7 @@ func alterChangefeedPlanHook( newPayload.Description = jobRecord.Description newPayload.DescriptorIDs = jobRecord.DescriptorIDs - err = p.ExecCfg().JobRegistry.UpdateJobWithTxn(ctx, jobID, p.ExtendedEvalContext().Txn, lockForUpdate, func( + err = p.ExecCfg().JobRegistry.UpdateJobWithTxn(ctx, jobID, p.Txn(), lockForUpdate, func( txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, ) error { ju.UpdatePayload(&newPayload) @@ -337,7 +337,7 @@ func generateNewTargets( k := targetKey{TableID: targetSpec.TableID, FamilyName: tree.Name(targetSpec.FamilyName)} desc := descResolver.DescByID[targetSpec.TableID].(catalog.TableDescriptor) - tbName, err := getQualifiedTableNameObj(ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, desc) + tbName, err := getQualifiedTableNameObj(ctx, p.ExecCfg(), p.Txn(), desc) if err != nil { return nil, nil, hlc.Timestamp{}, nil, err } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index fa7cbbdfd94f..ba26261183b7 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -625,7 +625,7 @@ func getTargetsAndTables( } } else { _, qualified := opts[changefeedbase.OptFullTableName] - name, err := getChangefeedTargetName(ctx, td, p.ExecCfg(), p.ExtendedEvalContext().Txn, qualified) + name, err := getChangefeedTargetName(ctx, td, p.ExecCfg(), p.Txn(), qualified) if err != nil { return nil, nil, err } @@ -935,7 +935,7 @@ func (b *changefeedResumer) handleChangefeedError( const errorFmt = "job failed (%v) but is being paused because of %s=%s" errorMessage := fmt.Sprintf(errorFmt, changefeedErr, changefeedbase.OptOnError, changefeedbase.OptOnErrorPause) - return b.job.PauseRequested(ctx, jobExec.ExtendedEvalContext().Txn, func(ctx context.Context, + return b.job.PauseRequested(ctx, jobExec.Txn(), func(ctx context.Context, planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress) error { err := b.OnPauseRequest(ctx, jobExec, txn, progress) if err != nil { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index 30a7d68858cf..36423775fd99 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -197,7 +197,7 @@ func ingestionPlanHook( jobID := p.ExecCfg().JobRegistry.MakeJobID() sj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, jr, - jobID, p.ExtendedEvalContext().Txn) + jobID, p.Txn()) if err != nil { return err } diff --git a/pkg/sql/colflow/explain_vec.go b/pkg/sql/colflow/explain_vec.go index 39c565b93989..b18962b83fc1 100644 --- a/pkg/sql/colflow/explain_vec.go +++ b/pkg/sql/colflow/explain_vec.go @@ -57,7 +57,7 @@ func convertToVecTree( creator := newVectorizedFlowCreator( newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false, false, nil, &execinfra.RowChannel{}, &fakeBatchReceiver{}, flowCtx.Cfg.PodNodeDialer, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{}, - flowCtx.Cfg.VecFDSemaphore, flowCtx.NewTypeResolver(flowCtx.EvalCtx.Txn), + flowCtx.Cfg.VecFDSemaphore, flowCtx.NewTypeResolver(flowCtx.Txn), admission.WorkInfo{}, ) // We create an unlimited memory account because we're interested whether the diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 942b17811bc8..1009d297398a 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -208,7 +208,7 @@ func (f *vectorizedFlow) Setup( f.GetID(), diskQueueCfg, f.countingSemaphore, - flowCtx.NewTypeResolver(flowCtx.EvalCtx.Txn), + flowCtx.NewTypeResolver(flowCtx.Txn), f.FlowBase.GetAdmissionInfo(), ) if f.testingKnobs.onSetupFlow != nil { @@ -1114,7 +1114,7 @@ func (s *vectorizedFlowCreator) setupFlow( } numOldMonitors := len(s.monitorRegistry.GetMonitors()) if args.ExprHelper.SemaCtx == nil { - args.ExprHelper.SemaCtx = flowCtx.NewSemaContext(flowCtx.EvalCtx.Txn) + args.ExprHelper.SemaCtx = flowCtx.NewSemaContext(flowCtx.Txn) } var result *colexecargs.NewColOperatorResult result, err = colbuilder.NewColOperator(ctx, flowCtx, args) diff --git a/pkg/sql/control_schedules.go b/pkg/sql/control_schedules.go index c64896c2a74b..76cee9badf70 100644 --- a/pkg/sql/control_schedules.go +++ b/pkg/sql/control_schedules.go @@ -163,7 +163,7 @@ func (n *controlSchedulesNode) startExec(params runParams) error { scheduleControllerEnv, scheduledjobs.ProdJobSchedulerEnv, schedule, - params.extendedEvalCtx.Txn, + params.p.Txn(), params.p.Descriptors(), ); err != nil { return errors.Wrap(err, "failed to run OnDrop") diff --git a/pkg/sql/importer/import_planning.go b/pkg/sql/importer/import_planning.go index a7b8aa01085f..9270d8c01fe9 100644 --- a/pkg/sql/importer/import_planning.go +++ b/pkg/sql/importer/import_planning.go @@ -495,7 +495,7 @@ func importPlanHook( } else { // No target table means we're importing whatever we find into the session // database, so it must exist. - txn := p.ExtendedEvalContext().Txn + txn := p.Txn() db, err = p.Accessor().GetDatabaseDesc(ctx, txn, p.SessionData().Database, tree.DatabaseLookupFlags{ AvoidLeased: true, Required: true, @@ -944,7 +944,7 @@ func importPlanHook( // record. We do not wait for the job to finish. jobID := p.ExecCfg().JobRegistry.MakeJobID() _, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn( - ctx, jr, jobID, p.ExtendedEvalContext().Txn) + ctx, jr, jobID, p.Txn()) if err != nil { return err } @@ -956,7 +956,7 @@ func importPlanHook( // We create the job record in the planner's transaction to ensure that // the job record creation happens transactionally. - plannerTxn := p.ExtendedEvalContext().Txn + plannerTxn := p.Txn() // Construct the job and commit the transaction. Perform this work in a // closure to ensure that the job is cleaned up if an error occurs. diff --git a/pkg/sql/job_exec_context.go b/pkg/sql/job_exec_context.go index c6df2280ddf3..5b15fb3f37b5 100644 --- a/pkg/sql/job_exec_context.go +++ b/pkg/sql/job_exec_context.go @@ -11,6 +11,7 @@ package sql import ( + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/migration" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -64,6 +65,7 @@ func (e *plannerJobExecContext) MigrationJobDeps() migration.JobDeps { func (e *plannerJobExecContext) SpanConfigReconciler() spanconfig.Reconciler { return e.p.SpanConfigReconciler() } +func (e *plannerJobExecContext) Txn() *kv.Txn { return e.p.Txn() } // JobExecContext provides the execution environment for a job. It is what is // passed to the Resume/OnFailOrCancel/OnPauseRequested methods of a jobs's @@ -85,4 +87,5 @@ type JobExecContext interface { User() security.SQLUsername MigrationJobDeps() migration.JobDeps SpanConfigReconciler() spanconfig.Reconciler + Txn() *kv.Txn } diff --git a/pkg/sql/planhook.go b/pkg/sql/planhook.go index 70255dae647e..e74ce3a45908 100644 --- a/pkg/sql/planhook.go +++ b/pkg/sql/planhook.go @@ -13,6 +13,7 @@ package sql import ( "context" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/migration" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" @@ -108,6 +109,7 @@ type PlanHookState interface { MigrationJobDeps() migration.JobDeps SpanConfigReconciler() spanconfig.Reconciler BufferClientNotice(ctx context.Context, notice pgnotice.Notice) + Txn() *kv.Txn } // AddPlanHook adds a hook used to short-circuit creating a planNode from a diff --git a/pkg/sql/region_util.go b/pkg/sql/region_util.go index 3ff17db3bea3..d45dd22a804a 100644 --- a/pkg/sql/region_util.go +++ b/pkg/sql/region_util.go @@ -74,7 +74,7 @@ func GetLiveClusterRegions(ctx context.Context, p PlanHookState) (LiveClusterReg it, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryIteratorEx( ctx, "get_live_cluster_regions", - p.ExtendedEvalContext().Txn, + p.Txn(), override, "SELECT region FROM [SHOW REGIONS FROM CLUSTER]", ) diff --git a/pkg/sql/rowexec/aggregator.go b/pkg/sql/rowexec/aggregator.go index c0ed8fc3aeca..aa39527522e5 100644 --- a/pkg/sql/rowexec/aggregator.go +++ b/pkg/sql/rowexec/aggregator.go @@ -116,7 +116,7 @@ func (ag *aggregatorBase) init( // grouped-by values for each bucket. ag.funcs is updated to contain all // the functions which need to be fed values. ag.inputTypes = input.OutputTypes() - semaCtx := flowCtx.NewSemaContext(flowCtx.EvalCtx.Txn) + semaCtx := flowCtx.NewSemaContext(flowCtx.Txn) for i, aggInfo := range spec.Aggregations { if aggInfo.FilterColIdx != nil { col := *aggInfo.FilterColIdx diff --git a/pkg/sql/rowexec/inverted_filterer.go b/pkg/sql/rowexec/inverted_filterer.go index 6ac6ee709f78..c8dec993a2f1 100644 --- a/pkg/sql/rowexec/inverted_filterer.go +++ b/pkg/sql/rowexec/inverted_filterer.go @@ -125,7 +125,7 @@ func newInvertedFilterer( } if spec.PreFiltererSpec != nil { - semaCtx := flowCtx.NewSemaContext(flowCtx.EvalCtx.Txn) + semaCtx := flowCtx.NewSemaContext(flowCtx.Txn) var exprHelper execinfrapb.ExprHelper colTypes := []*types.T{spec.PreFiltererSpec.Type} if err := exprHelper.Init(spec.PreFiltererSpec.Expression, colTypes, semaCtx, ifr.EvalCtx); err != nil { diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 370b7c17bbd6..ad5fa109efeb 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -235,7 +235,7 @@ func newInvertedJoiner( return nil, err } - semaCtx := flowCtx.NewSemaContext(flowCtx.EvalCtx.Txn) + semaCtx := flowCtx.NewSemaContext(flowCtx.Txn) onExprColTypes := make([]*types.T, 0, len(ij.inputTypes)+len(rightColTypes)) onExprColTypes = append(onExprColTypes, ij.inputTypes...) onExprColTypes = append(onExprColTypes, rightColTypes...) diff --git a/pkg/sql/rowexec/joinerbase.go b/pkg/sql/rowexec/joinerbase.go index 2cd54484f677..6c59bceeaaa6 100644 --- a/pkg/sql/rowexec/joinerbase.go +++ b/pkg/sql/rowexec/joinerbase.go @@ -86,7 +86,7 @@ func (jb *joinerBase) init( ); err != nil { return err } - semaCtx := flowCtx.NewSemaContext(flowCtx.EvalCtx.Txn) + semaCtx := flowCtx.NewSemaContext(flowCtx.Txn) return jb.onCond.Init(onExpr, onCondTypes, semaCtx, jb.EvalCtx) } diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 5aa34de7cd08..8d92da7db2b4 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -407,7 +407,7 @@ func newJoinReader( lookupExprTypes = append(lookupExprTypes, leftTypes...) lookupExprTypes = append(lookupExprTypes, rightTypes...) - semaCtx := flowCtx.NewSemaContext(flowCtx.EvalCtx.Txn) + semaCtx := flowCtx.NewSemaContext(flowCtx.Txn) if err := jr.lookupExpr.Init(spec.LookupExpr, lookupExprTypes, semaCtx, jr.EvalCtx); err != nil { return nil, err } diff --git a/pkg/sql/show_create_clauses.go b/pkg/sql/show_create_clauses.go index d61d86f59212..a161aa066702 100644 --- a/pkg/sql/show_create_clauses.go +++ b/pkg/sql/show_create_clauses.go @@ -50,7 +50,7 @@ type comment struct { func selectComment(ctx context.Context, p PlanHookState, tableID descpb.ID) (tc *tableComments) { query := fmt.Sprintf("SELECT type, object_id, sub_id, comment FROM system.comments WHERE object_id = %d", tableID) - txn := p.ExtendedEvalContext().Txn + txn := p.Txn() it, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryIterator( ctx, "show-tables-with-comment", txn, query) if err != nil {