Skip to content

Commit

Permalink
sql: thread SessionDataStack to EvalContext
Browse files Browse the repository at this point in the history
Replace SessionData with sessiondata.Stack in EvalContext, and replace
every caller of SessionData with a supplied SessionData() method, which
fetches the top of the stack. This, by default, ensures sessions are
using the "local" SessionData to do their work.

Note as of this commit, there is only ever one element in the stack.

Release justification: low risk, high pri change

Release note: None
  • Loading branch information
otan committed Aug 25, 2021
1 parent d71dd32 commit 7e83c05
Show file tree
Hide file tree
Showing 83 changed files with 307 additions and 292 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,9 @@ func TestAvroSchema(t *testing.T) {
require.NoError(t, err)

for _, row := range rows {
evalCtx := &tree.EvalContext{SessionData: &sessiondata.SessionData{}}
evalCtx := &tree.EvalContext{
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
}
serialized, err := origSchema.textualFromRow(row)
require.NoError(t, err)
roundtripped, err := roundtrippedSchema.rowFromTextual(serialized)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ func benchmarkConvertToKVs(b *testing.B, g workload.Generator) {
wc := importccl.NewWorkloadKVConverter(
0, tableDesc, t.InitialRows, 0, t.InitialRows.NumBatches, kvCh)
evalCtx := &tree.EvalContext{
SessionData: &sessiondata.SessionData{},
Codec: keys.SystemSQLCodec,
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
Codec: keys.SystemSQLCodec,
}
return wc.Worker(ctx, evalCtx)
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/import_table_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func MakeSimpleTableDescriptor(
evalCtx := tree.EvalContext{
Context: ctx,
Sequence: &importSequenceOperators{},
SessionData: &sessiondata.SessionData{},
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{},
Settings: st,
}
Expand All @@ -226,7 +226,7 @@ func MakeSimpleTableDescriptor(
affected,
semaCtx,
&evalCtx,
evalCtx.SessionData, /* sessionData */
evalCtx.SessionData(), /* sessionData */
tree.PersistencePermanent,
// We need to bypass the LOCALITY on non multi-region check here because
// we cannot access the database region config at import level.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func (p *parallelImporter) importWorker(
if err != nil {
return err
}
if conv.EvalCtx.SessionData == nil {
if conv.EvalCtx.SessionData() == nil {
panic("uninitialized session data")
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/importccl/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ func descForTable(
}

var testEvalCtx = &tree.EvalContext{
SessionData: &sessiondata.SessionData{
Location: time.UTC,
},
SessionDataStack: sessiondata.NewStack(
&sessiondata.SessionData{
Location: time.UTC,
},
),
StmtTimestamp: timeutil.Unix(100000000, 0),
Settings: cluster.MakeTestingClusterSettings(),
Codec: keys.SystemSQLCodec,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/workloadccl/format/sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func ToSSTable(t workload.Table, tableID descpb.ID, ts time.Time) ([]byte, error
g.GoCtx(func(ctx context.Context) error {
defer close(kvCh)
evalCtx := &tree.EvalContext{
SessionData: &sessiondata.SessionData{},
Codec: keys.SystemSQLCodec,
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
Codec: keys.SystemSQLCodec,
}
return wc.Worker(ctx, evalCtx)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/alter_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ func (p *planner) AlterDatabasePlacement(
return nil, err
}

if !p.EvalContext().SessionData.PlacementEnabled {
if !p.EvalContext().SessionData().PlacementEnabled {
return nil, errors.WithHint(pgerror.New(
pgcode.FeatureNotSupported,
"ALTER DATABASE PLACEMENT requires that the session setting "+
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/alter_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (n *alterIndexNode) startExec(params runParams) error {
"cannot ALTER INDEX PARTITION BY on an index which already has implicit column partitioning",
)
}
allowImplicitPartitioning := params.p.EvalContext().SessionData.ImplicitColumnPartitioningEnabled ||
allowImplicitPartitioning := params.p.EvalContext().SessionData().ImplicitColumnPartitioningEnabled ||
n.tableDesc.IsLocalityRegionalByRow()
alteredIndexDesc := n.index.IndexDescDeepCopy()
newImplicitCols, newPartitioning, err := CreatePartitioning(
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/alter_primary_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (p *planner) AlterPrimaryKey(
}

if alterPKNode.Sharded != nil {
if !p.EvalContext().SessionData.HashShardedIndexesEnabled {
if !p.EvalContext().SessionData().HashShardedIndexesEnabled {
return hashShardedIndexesDisabledError
}
if alterPKNode.Interleave != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ func (n *alterTableNode) startExec(params runParams) error {
newPrimaryIndexDesc,
t.PartitionBy,
nil, /* allowedNewColumnNames */
params.p.EvalContext().SessionData.ImplicitColumnPartitioningEnabled ||
params.p.EvalContext().SessionData().ImplicitColumnPartitioningEnabled ||
n.tableDesc.IsLocalityRegionalByRow(),
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -2486,7 +2486,7 @@ func indexTruncateInTxn(
for done := false; !done; done = sp.Key == nil {
rd := row.MakeDeleter(
execCfg.Codec, tableDesc, nil /* requestedCols */, &execCfg.Settings.SV,
evalCtx.SessionData.Internal,
evalCtx.SessionData().Internal,
)
td := tableDeleter{rd: rd, alloc: alloc}
if err := td.init(ctx, txn, evalCtx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
row.UpdaterOnlyColumns,
&cb.alloc,
&cb.evalCtx.Settings.SV,
cb.evalCtx.SessionData.Internal,
cb.evalCtx.SessionData().Internal,
)
if err != nil {
return roachpb.Key{}, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/buffer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (c *rowContainerHelper) init(
) {
distSQLCfg := &evalContext.DistSQLPlanner.distSQLSrv.ServerConfig
c.memMonitor = execinfra.NewLimitedMonitorNoFlowCtx(
evalContext.Context, evalContext.Mon, distSQLCfg, evalContext.SessionData,
evalContext.Context, evalContext.Mon, distSQLCfg, evalContext.SessionData(),
fmt.Sprintf("%s-limited", opName),
)
c.diskMonitor = execinfra.NewMonitor(evalContext.Context, distSQLCfg.ParentDiskMonitor, fmt.Sprintf("%s-disk", opName))
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/schemaexpr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func newNameResolver(
// unresolved names replaced with IndexedVars.
func (nr *nameResolver) resolveNames(expr tree.Expr) (tree.Expr, error) {
var v NameResolutionVisitor
return ResolveNamesUsingVisitor(&v, expr, nr.source, *nr.ivarHelper, nr.evalCtx.SessionData.SearchPath)
return ResolveNamesUsingVisitor(&v, expr, nr.source, *nr.ivarHelper, nr.evalCtx.SessionData().SearchPath)
}

// addColumn adds a new column to the nameResolver so that it can be resolved in
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (r opResult) createAndWrapRowSource(
return errors.New("processorConstructor is nil")
}
log.VEventf(ctx, 1, "planning a row-execution processor in the vectorized flow: %v", causeToWrap)
if err := canWrap(flowCtx.EvalCtx.SessionData.VectorizeMode, spec); err != nil {
if err := canWrap(flowCtx.EvalCtx.SessionData().VectorizeMode, spec); err != nil {
log.VEventf(ctx, 1, "planning a wrapped processor failed: %v", err)
// Return the original error for why we don't support this spec
// natively since it is more interesting.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1654,7 +1654,7 @@ func initCFetcher(

if err := fetcher.Init(
flowCtx.Codec(), allocator, args.memoryLimit, args.reverse, args.lockingStrength,
args.lockingWaitPolicy, flowCtx.EvalCtx.SessionData.LockTimeout, tableArgs,
args.lockingWaitPolicy, flowCtx.EvalCtx.SessionData().LockTimeout, tableArgs,
); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ func (s *vectorizedFlowCreator) setupFlow(
err = errors.Wrapf(err, "unable to vectorize execution plan")
return
}
if flowCtx.EvalCtx.SessionData.TestingVectorizeInjectPanics {
if flowCtx.EvalCtx.SessionData().TestingVectorizeInjectPanics {
result.Root = newPanicInjector(result.Root)
}
if util.CrdbTestBuild {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2365,7 +2365,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
Tenant: p,
JoinTokenCreator: p,
PreparedStatementState: &ex.extraTxnState.prepStmtsNamespace,
SessionData: ex.sessionDataStack.Top(),
SessionDataStack: ex.sessionDataStack,
Settings: ex.server.cfg.Settings,
TestingKnobs: ex.server.cfg.EvalContextTestingKnobs,
ClusterID: ex.server.cfg.ClusterID(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) erro
flags := planner.curPlan.flags

if flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan) {
if ex.executorType == executorTypeExec && planner.EvalContext().SessionData.DisallowFullTableScans {
if ex.executorType == executorTypeExec && planner.EvalContext().SessionData().DisallowFullTableScans {
// We don't execute the statement if:
// - plan contains a full table or full index scan.
// - the session setting disallows full table/index scans.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/create_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (p *planner) CreateDatabase(ctx context.Context, n *tree.CreateDatabase) (p
}

if n.Placement != tree.DataPlacementUnspecified {
if !p.EvalContext().SessionData.PlacementEnabled {
if !p.EvalContext().SessionData().PlacementEnabled {
return nil, errors.WithHint(pgerror.New(
pgcode.FeatureNotSupported,
"PLACEMENT requires that the session setting enable_multiregion_placement_policy "+
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/create_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ func (p *planner) configureIndexDescForNewIndexPartitioning(
return indexDesc, err
}
}
allowImplicitPartitioning := p.EvalContext().SessionData.ImplicitColumnPartitioningEnabled ||
allowImplicitPartitioning := p.EvalContext().SessionData().ImplicitColumnPartitioningEnabled ||
tableDesc.IsLocalityRegionalByRow()
if partitionBy != nil {
newImplicitCols, newPartitioning, err := CreatePartitioning(
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/create_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,8 +611,8 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er
0, /* depth: use event_log=2 for vmodule filtering */
eventLogOptions{dst: LogEverywhere},
sqlEventCommonExecPayload{
user: evalCtx.SessionData.User(),
appName: evalCtx.SessionData.ApplicationName,
user: evalCtx.SessionData().User(),
appName: evalCtx.SessionData().ApplicationName,
stmt: redact.Sprint(details.Statement),
stmtTag: "CREATE STATISTICS",
placeholders: nil, /* no placeholders known at this point */
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1511,7 +1511,7 @@ func NewTableDesc(
}
}
if n.PartitionByTable.All {
if !evalCtx.SessionData.ImplicitColumnPartitioningEnabled {
if !evalCtx.SessionData().ImplicitColumnPartitioningEnabled {
return nil, errors.WithHint(
pgerror.New(
pgcode.FeatureNotSupported,
Expand Down Expand Up @@ -1541,7 +1541,7 @@ func NewTableDesc(
for i, def := range n.Defs {
if d, ok := def.(*tree.ColumnTableDef); ok {
if d.IsComputed() {
d.Computed.Expr = schemaexpr.MaybeRewriteComputedColumn(d.Computed.Expr, evalCtx.SessionData)
d.Computed.Expr = schemaexpr.MaybeRewriteComputedColumn(d.Computed.Expr, evalCtx.SessionData())
}
// NewTableDesc is called sometimes with a nil SemaCtx (for example
// during bootstrapping). In order to not panic, pass a nil TypeResolver
Expand Down Expand Up @@ -1998,7 +1998,7 @@ func NewTableDesc(

// If explicit primary keys are required, error out since a primary key was not supplied.
if desc.GetPrimaryIndex().NumKeyColumns() == 0 && desc.IsPhysicalTable() && evalCtx != nil &&
evalCtx.SessionData != nil && evalCtx.SessionData.RequireExplicitPrimaryKeys {
evalCtx.SessionData() != nil && evalCtx.SessionData().RequireExplicitPrimaryKeys {
return nil, errors.Errorf(
"no primary key specified for table %s (require_explicit_primary_keys = true)", desc.Name)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/delegate/show_default_privileges.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (d *delegator) delegateShowDefaultPrivileges(
query += ")"
} else {
query = fmt.Sprintf("%s AND for_all_roles=false AND role = '%s'",
query, d.evalCtx.SessionData.User())
query, d.evalCtx.SessionData().User())
}
query += " ORDER BY 1,2,3,4"
return parse(query)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/delegate/show_enums.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (d *delegator) delegateShowEnums(n *tree.ShowEnums) (tree.Statement, error)
if n.ExplicitSchema {
schema := lexbase.EscapeSQLString(name.Schema())
if name.Schema() == catconstants.PgTempSchemaName {
schema = lexbase.EscapeSQLString(d.evalCtx.SessionData.SearchPath.GetTemporarySchemaName())
schema = lexbase.EscapeSQLString(d.evalCtx.SessionData().SearchPath.GetTemporarySchemaName())
}
schemaClause = fmt.Sprintf("AND nsp.nspname = %s", schema)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/delegate/show_grants.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ FROM "".information_schema.type_privileges`
fmt.Fprintf(&cond, `WHERE database_name IN (%s)`, strings.Join(params, ","))
}
} else if n.Targets != nil && len(n.Targets.Schemas) > 0 {
currDB := d.evalCtx.SessionData.Database
currDB := d.evalCtx.SessionData().Database

for _, schema := range n.Targets.Schemas {
_, _, err := d.catalog.ResolveSchema(d.ctx, cat.Flags{AvoidDescriptorCaches: true}, &schema)
Expand Down Expand Up @@ -144,7 +144,7 @@ FROM "".information_schema.type_privileges`
if len(params) == 0 {
dbNameClause := "true"
// If the current database is set, restrict the command to it.
if currDB := d.evalCtx.SessionData.Database; currDB != "" {
if currDB := d.evalCtx.SessionData().Database; currDB != "" {
dbNameClause = fmt.Sprintf("database_name = %s", lexbase.EscapeSQLString(currDB))
}
cond.WriteString(fmt.Sprintf(`WHERE %s`, dbNameClause))
Expand Down Expand Up @@ -215,7 +215,7 @@ FROM "".information_schema.type_privileges`
source.WriteString(typePrivQuery)
source.WriteByte(')')
// If the current database is set, restrict the command to it.
if currDB := d.evalCtx.SessionData.Database; currDB != "" {
if currDB := d.evalCtx.SessionData().Database; currDB != "" {
fmt.Fprintf(&cond, ` WHERE database_name = %s`, lexbase.EscapeSQLString(currDB))
} else {
cond.WriteString(`WHERE true`)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/delegate/show_regions.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ ORDER BY database_name
sqltelemetry.IncrementShowCounter(sqltelemetry.RegionsFromDatabase)
dbName := string(n.DatabaseName)
if dbName == "" {
dbName = d.evalCtx.SessionData.Database
dbName = d.evalCtx.SessionData().Database
}
// Note the LEFT JOIN here -- in the case where regions no longer exist on the cluster
// but still exist on the database config, we want to still see this database region
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/delegate/show_survival_goal.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (d *delegator) delegateShowSurvivalGoal(n *tree.ShowSurvivalGoal) (tree.Sta
sqltelemetry.IncrementShowCounter(sqltelemetry.SurvivalGoal)
dbName := string(n.DatabaseName)
if dbName == "" {
dbName = d.evalCtx.SessionData.Database
dbName = d.evalCtx.SessionData().Database
}
query := fmt.Sprintf(
`SELECT
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/delegate/show_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (d *delegator) delegateShowCreateAllTables() (tree.Statement, error) {
const showCreateAllTablesQuery = `
SELECT crdb_internal.show_create_all_tables(%[1]s) AS create_statement;
`
databaseLiteral := d.evalCtx.SessionData.Database
databaseLiteral := d.evalCtx.SessionData().Database

query := fmt.Sprintf(showCreateAllTablesQuery,
lexbase.EscapeSQLString(databaseLiteral),
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/delegate/show_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (d *delegator) delegateShowTables(n *tree.ShowTables) (tree.Statement, erro
if name.ExplicitSchema {
schema := lexbase.EscapeSQLString(name.Schema())
if name.Schema() == catconstants.PgTempSchemaName {
schema = lexbase.EscapeSQLString(d.evalCtx.SessionData.SearchPath.GetTemporarySchemaName())
schema = lexbase.EscapeSQLString(d.evalCtx.SessionData().SearchPath.GetTemporarySchemaName())
}
schemaClause = fmt.Sprintf("AND ns.nspname = %s", schema)
} else {
Expand Down
20 changes: 10 additions & 10 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,15 @@ func (ds *ServerImpl) setupFlow(
return nil, nil, nil, err
}
evalCtx = &tree.EvalContext{
Settings: ds.ServerConfig.Settings,
SessionData: sd,
ClusterID: ds.ServerConfig.ClusterID.Get(),
ClusterName: ds.ServerConfig.ClusterName,
NodeID: ds.ServerConfig.NodeID,
Codec: ds.ServerConfig.Codec,
ReCache: ds.regexpCache,
Mon: monitor,
Locality: ds.ServerConfig.Locality,
Settings: ds.ServerConfig.Settings,
SessionDataStack: sessiondata.NewStack(sd),
ClusterID: ds.ServerConfig.ClusterID.Get(),
ClusterName: ds.ServerConfig.ClusterName,
NodeID: ds.ServerConfig.NodeID,
Codec: ds.ServerConfig.Codec,
ReCache: ds.regexpCache,
Mon: monitor,
Locality: ds.ServerConfig.Locality,
// Most processors will override this Context with their own context in
// ProcessorBase. StartInternal().
Context: ctx,
Expand Down Expand Up @@ -445,7 +445,7 @@ func (ds *ServerImpl) newFlowContext(
// If we weren't passed a descs.Collection, then make a new one. We are
// responsible for cleaning it up and releasing any accessed descriptors
// on flow cleanup.
collection := ds.CollectionFactory.NewCollection(evalCtx.SessionData)
collection := ds.CollectionFactory.NewCollection(evalCtx.SessionData())
flowCtx.TypeResolverFactory = &descs.DistSQLTypeResolverFactory{
Descriptors: collection,
CleanupFunc: func(ctx context.Context) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (dsp *DistSQLPlanner) setupFlows(
resultChan = make(chan runnerResult, len(flows)-1)
}

if vectorizeMode := evalCtx.SessionData.VectorizeMode; vectorizeMode != sessiondatapb.VectorizeOff {
if vectorizeMode := evalCtx.SessionData().VectorizeMode; vectorizeMode != sessiondatapb.VectorizeOff {
// Now we determine whether the vectorized engine supports the flow
// specs.
for _, spec := range flows {
Expand Down Expand Up @@ -1351,7 +1351,7 @@ func (dsp *DistSQLPlanner) PlanAndRunCascadesAndChecks(
// In cyclical reference situations, the number of cascading operations can
// be arbitrarily large. To avoid OOM, we enforce a limit. This is also a
// safeguard in case we have a bug that results in an infinite cascade loop.
if limit := int(evalCtx.SessionData.OptimizerFKCascadesLimit); len(plan.cascades) > limit {
if limit := int(evalCtx.SessionData().OptimizerFKCascadesLimit); len(plan.cascades) > limit {
telemetry.Inc(sqltelemetry.CascadesLimitReached)
err := pgerror.Newf(pgcode.TriggeredActionException, "cascades limit (%d) reached", limit)
recv.SetError(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (e *distSQLSpecExecFactory) getPlanCtx(recommendation distRecommendation) *
distribute := false
if e.singleTenant && e.planningMode != distSQLLocalOnlyPlanning {
distribute = shouldDistributeGivenRecAndMode(
recommendation, e.planner.extendedEvalCtx.SessionData.DistSQLMode,
recommendation, e.planner.extendedEvalCtx.SessionData().DistSQLMode,
)
}
e.planCtx.isLocal = !distribute
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (p *planner) maybeLogStatementInternal(
// Compute the pieces of data that are going to be included in logged events.

// The session's application_name.
appName := p.EvalContext().SessionData.ApplicationName
appName := p.EvalContext().SessionData().ApplicationName
// The duration of the query so far. Age is the duration expressed in milliseconds.
queryDuration := timeutil.Now().Sub(startTime)
age := float32(queryDuration.Nanoseconds()) / 1e6
Expand Down
Loading

0 comments on commit 7e83c05

Please sign in to comment.