Skip to content

Commit

Permalink
sql: remove eval.Context.Ctx()
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
yuzefovich committed Oct 6, 2022
1 parent fc1fa24 commit 3f87f2f
Show file tree
Hide file tree
Showing 59 changed files with 479 additions and 399 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (e *exprEval) evalProjection(
if err != nil {
return cdcevent.Row{}, err
}
if err := e.projection.SetValueDatumAt(e.evalCtx, i, d); err != nil {
if err := e.projection.SetValueDatumAt(ctx, e.evalCtx, i, d); err != nil {
return cdcevent.Row{}, err
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func TestingGetFamilyIDFromKey(
// MakeRowFromTuple converts a SQL datum produced by, for example, SELECT ROW(foo.*),
// into the same kind of cdcevent.Row you'd get as a result of an insert, but without
// the primary key.
func MakeRowFromTuple(evalCtx *eval.Context, t *tree.DTuple) Row {
func MakeRowFromTuple(ctx context.Context, evalCtx *eval.Context, t *tree.DTuple) Row {
r := Projection{EventDescriptor: &EventDescriptor{}}
names := t.ResolvedType().TupleLabels()
for i, d := range t.D {
Expand All @@ -568,10 +568,10 @@ func MakeRowFromTuple(evalCtx *eval.Context, t *tree.DTuple) Row {
name = names[i]
}
r.AddValueColumn(name, d.ResolvedType())
if err := r.SetValueDatumAt(evalCtx, i, d); err != nil {
if err := r.SetValueDatumAt(ctx, evalCtx, i, d); err != nil {
if build.IsRelease() {
log.Warningf(context.Background(), "failed to set row value from tuple due to error %v", err)
_ = r.SetValueDatumAt(evalCtx, i, tree.DNull)
log.Warningf(ctx, "failed to set row value from tuple due to error %v", err)
_ = r.SetValueDatumAt(ctx, evalCtx, i, tree.DNull)
} else {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/cdcevent/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func TestMakeRowFromTuple(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
evalCtx := eval.MakeTestingEvalContext(st)

rowFromUnlabeledTuple := MakeRowFromTuple(&evalCtx, unlabeledTuple)
rowFromUnlabeledTuple := MakeRowFromTuple(context.Background(), &evalCtx, unlabeledTuple)
expectedCols := []struct {
name string
typ *types.T
Expand Down Expand Up @@ -446,7 +446,7 @@ func TestMakeRowFromTuple(t *testing.T) {

remainingCols = expectedCols

rowFromLabeledTuple := MakeRowFromTuple(&evalCtx, labeledTuple)
rowFromLabeledTuple := MakeRowFromTuple(context.Background(), &evalCtx, labeledTuple)

require.NoError(t, rowFromLabeledTuple.ForEachColumn().Datum(func(d tree.Datum, col ResultColumn) error {
current := remainingCols[0]
Expand Down
8 changes: 6 additions & 2 deletions pkg/ccl/changefeedccl/cdcevent/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package cdcevent

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -66,7 +68,9 @@ func (p *Projection) AddValueColumn(name string, typ *types.T) {
}

// SetValueDatumAt sets value datum at specified position.
func (p *Projection) SetValueDatumAt(evalCtx *eval.Context, pos int, d tree.Datum) error {
func (p *Projection) SetValueDatumAt(
ctx context.Context, evalCtx *eval.Context, pos int, d tree.Datum,
) error {
pos += len(p.keyCols)
if pos >= len(p.datums) {
return errors.AssertionFailedf("%d out of bounds", pos)
Expand All @@ -78,7 +82,7 @@ func (p *Projection) SetValueDatumAt(evalCtx *eval.Context, pos int, d tree.Datu
return pgerror.Newf(pgcode.DatatypeMismatch,
"expected type %s for column %s@%d, found %s", col.Typ, col.Name, pos, d.ResolvedType())
}
cd, err := eval.PerformCast(evalCtx, d, col.Typ)
cd, err := eval.PerformCast(ctx, evalCtx, d, col.Typ)
if err != nil {
return errors.Wrapf(err, "expected type %s for column %s@%d, found %s",
col.Typ, col.Name, pos, d.ResolvedType())
Expand Down
13 changes: 7 additions & 6 deletions pkg/ccl/changefeedccl/cdcevent/projection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ func TestProjection(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())
defer s.Stopper().Stop(ctx)

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`)
Expand Down Expand Up @@ -61,7 +62,7 @@ CREATE TABLE foo (
idx := 0
require.NoError(t, input.ForEachColumn().Datum(func(d tree.Datum, col ResultColumn) error {
p.AddValueColumn(col.Name, col.Typ)
err := p.SetValueDatumAt(&evalCtx, idx, d)
err := p.SetValueDatumAt(ctx, &evalCtx, idx, d)
idx++
return err
}))
Expand All @@ -76,9 +77,9 @@ CREATE TABLE foo (
input := TestingMakeEventRow(desc, 0, encDatums, false)
p := MakeProjection(input.EventDescriptor)
p.AddValueColumn("wrong_type", types.Int)
require.Regexp(t, "expected type int", p.SetValueDatumAt(&evalCtx, 0, tree.NewDString("fail")))
require.Regexp(t, "expected type int", p.SetValueDatumAt(ctx, &evalCtx, 0, tree.NewDString("fail")))
// But we allow NULL.
require.NoError(t, p.SetValueDatumAt(&evalCtx, 0, tree.DNull))
require.NoError(t, p.SetValueDatumAt(ctx, &evalCtx, 0, tree.DNull))
})

t.Run("project_extra_column", func(t *testing.T) {
Expand All @@ -87,12 +88,12 @@ CREATE TABLE foo (
idx := 0
require.NoError(t, input.ForEachColumn().Datum(func(d tree.Datum, col ResultColumn) error {
p.AddValueColumn(col.Name, col.Typ)
err := p.SetValueDatumAt(&evalCtx, idx, d)
err := p.SetValueDatumAt(ctx, &evalCtx, idx, d)
idx++
return err
}))
p.AddValueColumn("test", types.Int)
require.NoError(t, p.SetValueDatumAt(&evalCtx, idx, tree.NewDInt(5)))
require.NoError(t, p.SetValueDatumAt(ctx, &evalCtx, idx, tree.NewDInt(5)))

pr, err := p.Project(input)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/encoder_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func init() {
Types: tree.VariadicType{FixedTypes: []*types.T{types.AnyTuple}, VarType: types.String},
ReturnType: tree.FixedReturnType(types.Bytes),
Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
row := cdcevent.MakeRowFromTuple(evalCtx, tree.MustBeDTuple(args[0]))
row := cdcevent.MakeRowFromTuple(ctx, evalCtx, tree.MustBeDTuple(args[0]))
flags := make([]string, len(args)-1)
for i, d := range args[1:] {
flags[i] = string(tree.MustBeDString(d))
Expand Down
12 changes: 8 additions & 4 deletions pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,20 @@ type streamIngestManagerImpl struct{}

// CompleteStreamIngestion implements streaming.StreamIngestManager interface.
func (r *streamIngestManagerImpl) CompleteStreamIngestion(
evalCtx *eval.Context, txn *kv.Txn, ingestionJobID jobspb.JobID, cutoverTimestamp hlc.Timestamp,
ctx context.Context,
evalCtx *eval.Context,
txn *kv.Txn,
ingestionJobID jobspb.JobID,
cutoverTimestamp hlc.Timestamp,
) error {
return completeStreamIngestion(evalCtx, txn, ingestionJobID, cutoverTimestamp)
return completeStreamIngestion(ctx, evalCtx, txn, ingestionJobID, cutoverTimestamp)
}

// GetStreamIngestionStats implements streaming.StreamIngestManager interface.
func (r *streamIngestManagerImpl) GetStreamIngestionStats(
evalCtx *eval.Context, txn *kv.Txn, ingestionJobID jobspb.JobID,
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, ingestionJobID jobspb.JobID,
) (*streampb.StreamIngestionStats, error) {
return getStreamIngestionStats(evalCtx, txn, ingestionJobID)
return getStreamIngestionStats(ctx, evalCtx, txn, ingestionJobID)
}

func newStreamIngestManagerWithPrivilegesCheck(
Expand Down
18 changes: 11 additions & 7 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ import (

// completeStreamIngestion terminates the stream as of specified time.
func completeStreamIngestion(
evalCtx *eval.Context, txn *kv.Txn, ingestionJobID jobspb.JobID, cutoverTimestamp hlc.Timestamp,
ctx context.Context,
evalCtx *eval.Context,
txn *kv.Txn,
ingestionJobID jobspb.JobID,
cutoverTimestamp hlc.Timestamp,
) error {
jobRegistry := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig).JobRegistry
return jobRegistry.UpdateJobWithTxn(evalCtx.Ctx(), ingestionJobID, txn, false, /* useReadLock */
return jobRegistry.UpdateJobWithTxn(ctx, ingestionJobID, txn, false, /* useReadLock */
func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
// TODO(adityamaru): This should change in the future, a user should be
// allowed to correct their cutover time if the process of reverting the job
Expand All @@ -56,10 +60,10 @@ func completeStreamIngestion(
}

func getStreamIngestionStats(
evalCtx *eval.Context, txn *kv.Txn, ingestionJobID jobspb.JobID,
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, ingestionJobID jobspb.JobID,
) (*streampb.StreamIngestionStats, error) {
registry := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig).JobRegistry
j, err := registry.LoadJobWithTxn(evalCtx.Ctx(), ingestionJobID, txn)
j, err := registry.LoadJobWithTxn(ctx, ingestionJobID, txn)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -95,17 +99,17 @@ func getStreamIngestionStats(
stats.ReplicationLagInfo = lagInfo
}

client, err := streamclient.GetFirstActiveClient(evalCtx.Ctx(), progress.GetStreamIngest().StreamAddresses)
client, err := streamclient.GetFirstActiveClient(ctx, progress.GetStreamIngest().StreamAddresses)
if err != nil {
return nil, err
}
streamStatus, err := client.Heartbeat(evalCtx.Ctx(), streaming.StreamID(details.StreamID), hlc.MaxTimestamp)
streamStatus, err := client.Heartbeat(ctx, streaming.StreamID(details.StreamID), hlc.MaxTimestamp)
if err != nil {
stats.ProducerError = err.Error()
} else {
stats.ProducerStatus = &streamStatus
}
return stats, client.Close(evalCtx.Ctx())
return stats, client.Close(ctx)
}

type streamIngestionResumer struct {
Expand Down
26 changes: 17 additions & 9 deletions pkg/ccl/streamingccl/streamproducer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@ type replicationStreamManagerImpl struct{}

// StartReplicationStream implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) StartReplicationStream(
evalCtx *eval.Context, txn *kv.Txn, tenantID uint64,
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantID uint64,
) (streaming.StreamID, error) {
return startReplicationStreamJob(evalCtx, txn, tenantID)
return startReplicationStreamJob(ctx, evalCtx, txn, tenantID)
}

// HeartbeatReplicationStream implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) HeartbeatReplicationStream(
evalCtx *eval.Context, streamID streaming.StreamID, frontier hlc.Timestamp, txn *kv.Txn,
ctx context.Context,
evalCtx *eval.Context,
streamID streaming.StreamID,
frontier hlc.Timestamp,
txn *kv.Txn,
) (streampb.StreamReplicationStatus, error) {
return heartbeatReplicationStream(evalCtx, streamID, frontier, txn)
return heartbeatReplicationStream(ctx, evalCtx, streamID, frontier, txn)
}

// StreamPartition implements streaming.ReplicationStreamManager interface.
Expand All @@ -45,18 +49,22 @@ func (r *replicationStreamManagerImpl) StreamPartition(
return streamPartition(evalCtx, streamID, opaqueSpec)
}

// GetReplicationStreamSpec implements ReplicationStreamManager interface.
// GetReplicationStreamSpec implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) GetReplicationStreamSpec(
evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID,
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID,
) (*streampb.ReplicationStreamSpec, error) {
return getReplicationStreamSpec(evalCtx, txn, streamID)
return getReplicationStreamSpec(ctx, evalCtx, txn, streamID)
}

// CompleteReplicationStream implements ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) CompleteReplicationStream(
evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID, successfulIngestion bool,
ctx context.Context,
evalCtx *eval.Context,
txn *kv.Txn,
streamID streaming.StreamID,
successfulIngestion bool,
) error {
return completeReplicationStream(evalCtx, txn, streamID, successfulIngestion)
return completeReplicationStream(ctx, evalCtx, txn, streamID, successfulIngestion)
}

func newReplicationStreamManagerWithPrivilegesCheck(
Expand Down
36 changes: 22 additions & 14 deletions pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ import (
// 1. Tracks the liveness of the replication stream consumption
// 2. TODO(casper): Updates the protected timestamp for spans being replicated
func startReplicationStreamJob(
evalCtx *eval.Context, txn *kv.Txn, tenantID uint64,
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantID uint64,
) (streaming.StreamID, error) {
execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
hasAdminRole, err := evalCtx.SessionAccessor.HasAdminRole(evalCtx.Ctx())
hasAdminRole, err := evalCtx.SessionAccessor.HasAdminRole(ctx)

if err != nil {
return streaming.InvalidStreamID, err
Expand All @@ -52,7 +52,7 @@ func startReplicationStreamJob(
timeout := streamingccl.StreamReplicationJobLivenessTimeout.Get(&evalCtx.Settings.SV)
ptsID := uuid.MakeV4()
jr := makeProducerJobRecord(registry, tenantID, timeout, evalCtx.SessionData().User(), ptsID)
if _, err := registry.CreateAdoptableJobWithTxn(evalCtx.Ctx(), jr, jr.JobID, txn); err != nil {
if _, err := registry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil {
return streaming.InvalidStreamID, err
}

Expand All @@ -67,7 +67,7 @@ func startReplicationStreamJob(
pts := jobsprotectedts.MakeRecord(ptsID, int64(jr.JobID), statementTime,
deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect)

if err := ptp.Protect(evalCtx.Ctx(), txn, pts); err != nil {
if err := ptp.Protect(ctx, txn, pts); err != nil {
return streaming.InvalidStreamID, err
}
return streaming.StreamID(jr.JobID), nil
Expand Down Expand Up @@ -155,7 +155,11 @@ func updateReplicationStreamProgress(
// record to the specified frontier. If 'frontier' is hlc.MaxTimestamp, returns the producer job
// progress without updating it.
func heartbeatReplicationStream(
evalCtx *eval.Context, streamID streaming.StreamID, frontier hlc.Timestamp, txn *kv.Txn,
ctx context.Context,
evalCtx *eval.Context,
streamID streaming.StreamID,
frontier hlc.Timestamp,
txn *kv.Txn,
) (streampb.StreamReplicationStatus, error) {
execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
timeout := streamingccl.StreamReplicationJobLivenessTimeout.Get(&evalCtx.Settings.SV)
Expand All @@ -164,7 +168,7 @@ func heartbeatReplicationStream(
// job progress.
if frontier == hlc.MaxTimestamp {
var status streampb.StreamReplicationStatus
pj, err := execConfig.JobRegistry.LoadJob(evalCtx.Ctx(), jobspb.JobID(streamID))
pj, err := execConfig.JobRegistry.LoadJob(ctx, jobspb.JobID(streamID))
if jobs.HasJobNotFoundError(err) || testutils.IsError(err, "not found in system.jobs table") {
status.StreamStatus = streampb.StreamReplicationStatus_STREAM_INACTIVE
return status, nil
Expand All @@ -174,7 +178,7 @@ func heartbeatReplicationStream(
}
status.StreamStatus = convertProducerJobStatusToStreamStatus(pj.Status())
payload := pj.Payload()
ptsRecord, err := execConfig.ProtectedTimestampProvider.GetRecord(evalCtx.Ctx(), txn,
ptsRecord, err := execConfig.ProtectedTimestampProvider.GetRecord(ctx, txn,
payload.GetStreamReplication().ProtectedTimestampRecordID)
// Nil protected timestamp indicates it was not created or has been released.
if errors.Is(err, protectedts.ErrNotExists) {
Expand All @@ -187,18 +191,18 @@ func heartbeatReplicationStream(
return status, nil
}

return updateReplicationStreamProgress(evalCtx.Ctx(),
return updateReplicationStreamProgress(ctx,
expirationTime, execConfig.ProtectedTimestampProvider, execConfig.JobRegistry,
streamID, frontier, txn)
}

// getReplicationStreamSpec gets a replication stream specification for the specified stream.
func getReplicationStreamSpec(
evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID,
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID,
) (*streampb.ReplicationStreamSpec, error) {
jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext)
// Returns error if the replication stream is not active
j, err := jobExecCtx.ExecCfg().JobRegistry.LoadJob(evalCtx.Ctx(), jobspb.JobID(streamID))
j, err := jobExecCtx.ExecCfg().JobRegistry.LoadJob(ctx, jobspb.JobID(streamID))
if err != nil {
return nil, errors.Wrapf(err, "replication stream %d has error", streamID)
}
Expand All @@ -209,7 +213,7 @@ func getReplicationStreamSpec(
// Partition the spans with SQLPlanner
var noTxn *kv.Txn
dsp := jobExecCtx.DistSQLPlanner()
planCtx := dsp.NewPlanningCtx(evalCtx.Ctx(), jobExecCtx.ExtendedEvalContext(),
planCtx := dsp.NewPlanningCtx(ctx, jobExecCtx.ExtendedEvalContext(),
nil /* planner */, noTxn, sql.DistributionTypeSystemTenantOnly)

details, ok := j.Details().(jobspb.StreamReplicationDetails)
Expand All @@ -221,7 +225,7 @@ func getReplicationStreamSpec(
for _, span := range replicatedSpans {
spans = append(spans, *span)
}
spanPartitions, err := dsp.PartitionSpans(evalCtx.Ctx(), planCtx, spans)
spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, spans)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -250,11 +254,15 @@ func getReplicationStreamSpec(
}

func completeReplicationStream(
evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID, successfulIngestion bool,
ctx context.Context,
evalCtx *eval.Context,
txn *kv.Txn,
streamID streaming.StreamID,
successfulIngestion bool,
) error {
registry := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig).JobRegistry
const useReadLock = false
return registry.UpdateJobWithTxn(evalCtx.Ctx(), jobspb.JobID(streamID), txn, useReadLock,
return registry.UpdateJobWithTxn(ctx, jobspb.JobID(streamID), txn, useReadLock,
func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
// Updates the stream ingestion status, make the job resumer exit running
// when picking up the new status.
Expand Down
4 changes: 1 addition & 3 deletions pkg/security/password.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ func GetConfiguredPasswordCost(

// GetConfiguredPasswordHashMethod returns the configured hash method
// to use before storing passwords provided in cleartext from clients.
func GetConfiguredPasswordHashMethod(
ctx context.Context, sv *settings.Values,
) (method password.HashMethod) {
func GetConfiguredPasswordHashMethod(sv *settings.Values) (method password.HashMethod) {
return password.HashMethod(PasswordHashMethod.Get(sv))
}

Expand Down
Loading

0 comments on commit 3f87f2f

Please sign in to comment.