Skip to content

Commit

Permalink
evalctx: remove some redundant uses of Txn on ExtendedEvalContext
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
RichardJCai committed Apr 22, 2022
1 parent e4a9f66 commit 090d163
Show file tree
Hide file tree
Showing 20 changed files with 40 additions and 35 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
if details.URI == "" {
initialDetails := details
backupDetails, m, err := getBackupDetailAndManifest(
ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, details, p.User(),
ctx, p.ExecCfg(), p.Txn(), details, p.User(),
)
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func backupPlanHook(
return sqlDescIDs
}(),
}
plannerTxn := p.ExtendedEvalContext().Txn
plannerTxn := p.Txn()

if backupStmt.Options.Detached {
// When running inside an explicit transaction, we simply create the job
Expand Down Expand Up @@ -791,7 +791,7 @@ func backupPlanHook(

// TODO(dt): delete this in 22.2.
backupDetails, backupManifest, err := getBackupDetailAndManifest(
ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, initialDetails, p.User(),
ctx, p.ExecCfg(), p.Txn(), initialDetails, p.User(),
)
if err != nil {
return err
Expand All @@ -804,7 +804,7 @@ func backupPlanHook(

// We create the job record in the planner's transaction to ensure that
// the job record creation happens transactionally.
plannerTxn := p.ExtendedEvalContext().Txn
plannerTxn := p.Txn()

// Write backup manifest into a temporary checkpoint file.
// This accomplishes 2 purposes:
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/backupccl/create_scheduled_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func doCreateBackupSchedules(
inc.Pause()
inc.SetScheduleStatus("Waiting for initial backup to complete")

if err := inc.Create(ctx, ex, p.ExtendedEvalContext().Txn); err != nil {
if err := inc.Create(ctx, ex, p.Txn()); err != nil {
return err
}
if err := emitSchedule(inc, backupNode, destinations, nil, /* incrementalFrom */
Expand Down Expand Up @@ -491,20 +491,20 @@ func doCreateBackupSchedules(
}

// Create the schedule (we need its ID to link dependent schedules below).
if err := full.Create(ctx, ex, p.ExtendedEvalContext().Txn); err != nil {
if err := full.Create(ctx, ex, p.Txn()); err != nil {
return err
}

// If schedule creation has resulted in a full and incremental schedule then
// we update both the schedules with the ID of the other "dependent" schedule.
if incRecurrence != nil {
if err := setDependentSchedule(ctx, ex, fullScheduledBackupArgs, full, inc.ScheduleID(),
p.ExtendedEvalContext().Txn); err != nil {
p.Txn()); err != nil {
return errors.Wrap(err,
"failed to update full schedule with dependent incremental schedule id")
}
if err := setDependentSchedule(ctx, ex, incScheduledBackupArgs, inc, full.ScheduleID(),
p.ExtendedEvalContext().Txn); err != nil {
p.Txn()); err != nil {
return errors.Wrap(err,
"failed to update incremental schedule with dependent full schedule id")
}
Expand Down Expand Up @@ -664,7 +664,7 @@ func checkScheduleAlreadyExists(
) (bool, error) {

row, err := p.ExecCfg().InternalExecutor.QueryRowEx(ctx, "check-sched",
p.ExtendedEvalContext().Txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()},
p.Txn(), sessiondata.InternalExecutorOverride{User: security.RootUserName()},
fmt.Sprintf("SELECT count(schedule_name) FROM %s WHERE schedule_name = '%s'",
scheduledjobs.ProdJobSchedulerEnv.ScheduledJobsTableName(), scheduleLabel))

Expand All @@ -677,12 +677,12 @@ func checkScheduleAlreadyExists(
// dryRunBackup executes backup in dry-run mode: we simply execute backup
// under transaction savepoint, and then rollback to that save point.
func dryRunBackup(ctx context.Context, p sql.PlanHookState, backupNode *tree.Backup) error {
sp, err := p.ExtendedEvalContext().Txn.CreateSavepoint(ctx)
sp, err := p.Txn().CreateSavepoint(ctx)
if err != nil {
return err
}
err = dryRunInvokeBackup(ctx, p, backupNode)
if rollbackErr := p.ExtendedEvalContext().Txn.RollbackToSavepoint(ctx, sp); rollbackErr != nil {
if rollbackErr := p.Txn().RollbackToSavepoint(ctx, sp); rollbackErr != nil {
return rollbackErr
}
return err
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1625,7 +1625,7 @@ func doRestorePlan(
return errors.Errorf("%q option can only be used when restoring a single tenant", restoreOptAsTenant)
}
res, err := p.ExecCfg().InternalExecutor.QueryRow(
ctx, "restore-lookup-tenant", p.ExtendedEvalContext().Txn,
ctx, "restore-lookup-tenant", p.Txn(),
`SELECT active FROM system.tenants WHERE id = $1`, newTenantID.ToUint64(),
)
if err != nil {
Expand All @@ -1640,7 +1640,7 @@ func doRestorePlan(
} else {
for _, i := range tenants {
res, err := p.ExecCfg().InternalExecutor.QueryRow(
ctx, "restore-lookup-tenant", p.ExtendedEvalContext().Txn,
ctx, "restore-lookup-tenant", p.Txn(),
`SELECT active FROM system.tenants WHERE id = $1`, i.ID,
)
if err != nil {
Expand Down Expand Up @@ -1831,7 +1831,7 @@ func doRestorePlan(
// We do not wait for the job to finish.
jobID := p.ExecCfg().JobRegistry.MakeJobID()
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, jobID, p.ExtendedEvalContext().Txn)
ctx, jr, jobID, p.Txn())
if err != nil {
return err
}
Expand All @@ -1842,7 +1842,7 @@ func doRestorePlan(

// We create the job record in the planner's transaction to ensure that
// the job record creation happens transactionally.
plannerTxn := p.ExtendedEvalContext().Txn
plannerTxn := p.Txn()

// Construct the job and commit the transaction. Perform this work in a
// closure to ensure that the job is cleaned up if an error occurs.
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func alterChangefeedPlanHook(
}
jobID := jobspb.JobID(tree.MustBeDInt(typedExpr))

job, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.ExtendedEvalContext().Txn)
job, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.Txn())
if err != nil {
err = errors.Wrapf(err, `could not load job with job id %d`, jobID)
return err
Expand Down Expand Up @@ -139,7 +139,7 @@ func alterChangefeedPlanHook(
newPayload.Description = jobRecord.Description
newPayload.DescriptorIDs = jobRecord.DescriptorIDs

err = p.ExecCfg().JobRegistry.UpdateJobWithTxn(ctx, jobID, p.ExtendedEvalContext().Txn, lockForUpdate, func(
err = p.ExecCfg().JobRegistry.UpdateJobWithTxn(ctx, jobID, p.Txn(), lockForUpdate, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
ju.UpdatePayload(&newPayload)
Expand Down Expand Up @@ -337,7 +337,7 @@ func generateNewTargets(
k := targetKey{TableID: targetSpec.TableID, FamilyName: tree.Name(targetSpec.FamilyName)}
desc := descResolver.DescByID[targetSpec.TableID].(catalog.TableDescriptor)

tbName, err := getQualifiedTableNameObj(ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, desc)
tbName, err := getQualifiedTableNameObj(ctx, p.ExecCfg(), p.Txn(), desc)
if err != nil {
return nil, nil, hlc.Timestamp{}, nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func getTargetsAndTables(
}
} else {
_, qualified := opts[changefeedbase.OptFullTableName]
name, err := getChangefeedTargetName(ctx, td, p.ExecCfg(), p.ExtendedEvalContext().Txn, qualified)
name, err := getChangefeedTargetName(ctx, td, p.ExecCfg(), p.Txn(), qualified)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -935,7 +935,7 @@ func (b *changefeedResumer) handleChangefeedError(
const errorFmt = "job failed (%v) but is being paused because of %s=%s"
errorMessage := fmt.Sprintf(errorFmt, changefeedErr,
changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
return b.job.PauseRequested(ctx, jobExec.ExtendedEvalContext().Txn, func(ctx context.Context,
return b.job.PauseRequested(ctx, jobExec.Txn(), func(ctx context.Context,
planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress) error {
err := b.OnPauseRequest(ctx, jobExec, txn, progress)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func ingestionPlanHook(

jobID := p.ExecCfg().JobRegistry.MakeJobID()
sj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, jr,
jobID, p.ExtendedEvalContext().Txn)
jobID, p.Txn())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/explain_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func convertToVecTree(
creator := newVectorizedFlowCreator(
newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false, false,
nil, &execinfra.RowChannel{}, &fakeBatchReceiver{}, flowCtx.Cfg.PodNodeDialer, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{},
flowCtx.Cfg.VecFDSemaphore, flowCtx.NewTypeResolver(flowCtx.EvalCtx.Txn),
flowCtx.Cfg.VecFDSemaphore, flowCtx.NewTypeResolver(flowCtx.Txn),
admission.WorkInfo{},
)
// We create an unlimited memory account because we're interested whether the
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (f *vectorizedFlow) Setup(
f.GetID(),
diskQueueCfg,
f.countingSemaphore,
flowCtx.NewTypeResolver(flowCtx.EvalCtx.Txn),
flowCtx.NewTypeResolver(flowCtx.Txn),
f.FlowBase.GetAdmissionInfo(),
)
if f.testingKnobs.onSetupFlow != nil {
Expand Down Expand Up @@ -1114,7 +1114,7 @@ func (s *vectorizedFlowCreator) setupFlow(
}
numOldMonitors := len(s.monitorRegistry.GetMonitors())
if args.ExprHelper.SemaCtx == nil {
args.ExprHelper.SemaCtx = flowCtx.NewSemaContext(flowCtx.EvalCtx.Txn)
args.ExprHelper.SemaCtx = flowCtx.NewSemaContext(flowCtx.Txn)
}
var result *colexecargs.NewColOperatorResult
result, err = colbuilder.NewColOperator(ctx, flowCtx, args)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/control_schedules.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (n *controlSchedulesNode) startExec(params runParams) error {
scheduleControllerEnv,
scheduledjobs.ProdJobSchedulerEnv,
schedule,
params.extendedEvalCtx.Txn,
params.p.Txn(),
params.p.Descriptors(),
); err != nil {
return errors.Wrap(err, "failed to run OnDrop")
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/importer/import_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func importPlanHook(
} else {
// No target table means we're importing whatever we find into the session
// database, so it must exist.
txn := p.ExtendedEvalContext().Txn
txn := p.Txn()
db, err = p.Accessor().GetDatabaseDesc(ctx, txn, p.SessionData().Database, tree.DatabaseLookupFlags{
AvoidLeased: true,
Required: true,
Expand Down Expand Up @@ -944,7 +944,7 @@ func importPlanHook(
// record. We do not wait for the job to finish.
jobID := p.ExecCfg().JobRegistry.MakeJobID()
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, jobID, p.ExtendedEvalContext().Txn)
ctx, jr, jobID, p.Txn())
if err != nil {
return err
}
Expand All @@ -956,7 +956,7 @@ func importPlanHook(

// We create the job record in the planner's transaction to ensure that
// the job record creation happens transactionally.
plannerTxn := p.ExtendedEvalContext().Txn
plannerTxn := p.Txn()

// Construct the job and commit the transaction. Perform this work in a
// closure to ensure that the job is cleaned up if an error occurs.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/job_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package sql

import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
Expand Down Expand Up @@ -64,6 +65,7 @@ func (e *plannerJobExecContext) MigrationJobDeps() migration.JobDeps {
func (e *plannerJobExecContext) SpanConfigReconciler() spanconfig.Reconciler {
return e.p.SpanConfigReconciler()
}
func (e *plannerJobExecContext) Txn() *kv.Txn { return e.p.Txn() }

// JobExecContext provides the execution environment for a job. It is what is
// passed to the Resume/OnFailOrCancel/OnPauseRequested methods of a jobs's
Expand All @@ -85,4 +87,5 @@ type JobExecContext interface {
User() security.SQLUsername
MigrationJobDeps() migration.JobDeps
SpanConfigReconciler() spanconfig.Reconciler
Txn() *kv.Txn
}
2 changes: 2 additions & 0 deletions pkg/sql/planhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
Expand Down Expand Up @@ -108,6 +109,7 @@ type PlanHookState interface {
MigrationJobDeps() migration.JobDeps
SpanConfigReconciler() spanconfig.Reconciler
BufferClientNotice(ctx context.Context, notice pgnotice.Notice)
Txn() *kv.Txn
}

// AddPlanHook adds a hook used to short-circuit creating a planNode from a
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/region_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func GetLiveClusterRegions(ctx context.Context, p PlanHookState) (LiveClusterReg
it, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryIteratorEx(
ctx,
"get_live_cluster_regions",
p.ExtendedEvalContext().Txn,
p.Txn(),
override,
"SELECT region FROM [SHOW REGIONS FROM CLUSTER]",
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (ag *aggregatorBase) init(
// grouped-by values for each bucket. ag.funcs is updated to contain all
// the functions which need to be fed values.
ag.inputTypes = input.OutputTypes()
semaCtx := flowCtx.NewSemaContext(flowCtx.EvalCtx.Txn)
semaCtx := flowCtx.NewSemaContext(flowCtx.Txn)
for i, aggInfo := range spec.Aggregations {
if aggInfo.FilterColIdx != nil {
col := *aggInfo.FilterColIdx
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/inverted_filterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func newInvertedFilterer(
}

if spec.PreFiltererSpec != nil {
semaCtx := flowCtx.NewSemaContext(flowCtx.EvalCtx.Txn)
semaCtx := flowCtx.NewSemaContext(flowCtx.Txn)
var exprHelper execinfrapb.ExprHelper
colTypes := []*types.T{spec.PreFiltererSpec.Type}
if err := exprHelper.Init(spec.PreFiltererSpec.Expression, colTypes, semaCtx, ifr.EvalCtx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/inverted_joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func newInvertedJoiner(
return nil, err
}

semaCtx := flowCtx.NewSemaContext(flowCtx.EvalCtx.Txn)
semaCtx := flowCtx.NewSemaContext(flowCtx.Txn)
onExprColTypes := make([]*types.T, 0, len(ij.inputTypes)+len(rightColTypes))
onExprColTypes = append(onExprColTypes, ij.inputTypes...)
onExprColTypes = append(onExprColTypes, rightColTypes...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/joinerbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (jb *joinerBase) init(
); err != nil {
return err
}
semaCtx := flowCtx.NewSemaContext(flowCtx.EvalCtx.Txn)
semaCtx := flowCtx.NewSemaContext(flowCtx.Txn)
return jb.onCond.Init(onExpr, onCondTypes, semaCtx, jb.EvalCtx)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func newJoinReader(
lookupExprTypes = append(lookupExprTypes, leftTypes...)
lookupExprTypes = append(lookupExprTypes, rightTypes...)

semaCtx := flowCtx.NewSemaContext(flowCtx.EvalCtx.Txn)
semaCtx := flowCtx.NewSemaContext(flowCtx.Txn)
if err := jr.lookupExpr.Init(spec.LookupExpr, lookupExprTypes, semaCtx, jr.EvalCtx); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/show_create_clauses.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type comment struct {
func selectComment(ctx context.Context, p PlanHookState, tableID descpb.ID) (tc *tableComments) {
query := fmt.Sprintf("SELECT type, object_id, sub_id, comment FROM system.comments WHERE object_id = %d", tableID)

txn := p.ExtendedEvalContext().Txn
txn := p.Txn()
it, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryIterator(
ctx, "show-tables-with-comment", txn, query)
if err != nil {
Expand Down

0 comments on commit 090d163

Please sign in to comment.