Skip to content

Commit

Permalink
sql: replace sessionData in connExecutor with sessiondata.Stack
Browse files Browse the repository at this point in the history
A mechanical change, the SessionData on the connExecutor now is replaced
with the sessiondata.Stack object.

Release justification: low risk, high pri change

Release note: None
  • Loading branch information
otan committed Aug 25, 2021
1 parent 38eeb53 commit d71dd32
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 46 deletions.
76 changes: 42 additions & 34 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func (h ConnectionHandler) GetUnqualifiedIntSize() *types.T {
if h.ex != nil {
// The executor will be nil in certain testing situations where
// no server is actually present.
size = h.ex.sessionData.DefaultIntSize
size = h.ex.sessionData().DefaultIntSize
}
return parser.NakedIntTypeFromDefaultIntSize(size)
}
Expand Down Expand Up @@ -742,14 +742,14 @@ func (s *Server) newConnExecutor(
sdMutator := new(sessionDataMutator)
*sdMutator = s.makeSessionDataMutator(sd, sdDefaults)
ex := &connExecutor{
server: s,
metrics: srvMetrics,
stmtBuf: stmtBuf,
clientComm: clientComm,
mon: sessionRootMon,
sessionMon: sessionMon,
sessionData: sd,
dataMutator: sdMutator,
server: s,
metrics: srvMetrics,
stmtBuf: stmtBuf,
clientComm: clientComm,
mon: sessionRootMon,
sessionMon: sessionMon,
sessionDataStack: sessiondata.NewStack(sd),
dataMutator: sdMutator,
state: txnState{
mon: txnMon,
connCtx: ctx,
Expand Down Expand Up @@ -791,7 +791,7 @@ func (s *Server) newConnExecutor(
ex.hasCreatedTemporarySchema = true
}

ex.applicationName.Store(ex.sessionData.ApplicationName)
ex.applicationName.Store(ex.sessionData().ApplicationName)
ex.statsWriter = statsWriter
ex.statsCollector = sslocal.NewStatsCollector(statsWriter, ex.phaseTimes)
sdMutator.RegisterOnSessionDataChange("application_name", func(newName string) {
Expand Down Expand Up @@ -1224,8 +1224,8 @@ type connExecutor struct {
hasAdminRoleCache HasAdminRoleCache
}

// sessionData contains the user-configurable connection variables.
sessionData *sessiondata.SessionData
// sessionDataStack contains the user-configurable connection variables.
sessionDataStack *sessiondata.Stack
// dataMutator is nil for session-bound internal executors; we shouldn't issue
// statements that manipulate session state to an internal executor.
dataMutator *sessionDataMutator
Expand Down Expand Up @@ -1425,7 +1425,7 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) err
ex.extraTxnState.jobs = nil
ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{}
ex.extraTxnState.schemaChangerState = SchemaChangerState{
mode: ex.sessionData.NewSchemaChangerMode,
mode: ex.sessionData().NewSchemaChangerMode,
}

for k := range ex.extraTxnState.schemaChangeJobRecords {
Expand Down Expand Up @@ -1477,6 +1477,14 @@ func (ex *connExecutor) Ctx() context.Context {
return ctx
}

// sessionData returns the top SessionData on the executor.
func (ex *connExecutor) sessionData() *sessiondata.SessionData {
if ex.sessionDataStack == nil {
return nil
}
return ex.sessionDataStack.Top()
}

// activate engages the use of resources that must be cleaned up
// afterwards. after activate() completes, the close() method must be
// called.
Expand All @@ -1499,11 +1507,11 @@ func (ex *connExecutor) activate(
// Enable the trace if configured.
if traceSessionEventLogEnabled.Get(&ex.server.cfg.Settings.SV) {
remoteStr := "<admin>"
if ex.sessionData.RemoteAddr != nil {
remoteStr = ex.sessionData.RemoteAddr.String()
if ex.sessionData().RemoteAddr != nil {
remoteStr = ex.sessionData().RemoteAddr.String()
}
ex.eventLog = trace.NewEventLog(
fmt.Sprintf("sql session [%s]", ex.sessionData.User()), remoteStr)
fmt.Sprintf("sql session [%s]", ex.sessionData().User()), remoteStr)
}

ex.activated = true
Expand Down Expand Up @@ -1637,8 +1645,8 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
NeedRowDesc,
pos,
nil, /* formatCodes */
ex.sessionData.DataConversionConfig,
ex.sessionData.GetLocation(),
ex.sessionData().DataConversionConfig,
ex.sessionData().GetLocation(),
0, /* limit */
"", /* portalName */
ex.implicitTxn(),
Expand Down Expand Up @@ -1706,8 +1714,8 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
// needed.
DontNeedRowDesc,
pos, portal.OutFormats,
ex.sessionData.DataConversionConfig,
ex.sessionData.GetLocation(),
ex.sessionData().DataConversionConfig,
ex.sessionData().GetLocation(),
tcmd.Limit,
portalName,
ex.implicitTxn(),
Expand Down Expand Up @@ -2299,7 +2307,7 @@ func txnPriorityToProto(mode tree.UserPriority) roachpb.UserPriority {

func (ex *connExecutor) txnPriorityWithSessionDefault(mode tree.UserPriority) roachpb.UserPriority {
if mode == tree.UnspecifiedUserPriority {
mode = tree.UserPriority(ex.sessionData.DefaultTxnPriority)
mode = tree.UserPriority(ex.sessionData().DefaultTxnPriority)
}
return txnPriorityToProto(mode)
}
Expand All @@ -2308,7 +2316,7 @@ func (ex *connExecutor) readWriteModeWithSessionDefault(
mode tree.ReadWriteMode,
) tree.ReadWriteMode {
if mode == tree.UnspecifiedReadWriteMode {
if ex.sessionData.DefaultTxnReadOnly {
if ex.sessionData().DefaultTxnReadOnly {
return tree.ReadOnly
}
return tree.ReadWrite
Expand All @@ -2327,7 +2335,7 @@ var followerReadTimestampExpr = &tree.FuncExpr{

func (ex *connExecutor) asOfClauseWithSessionDefault(expr tree.AsOfClause) tree.AsOfClause {
if expr.Expr == nil {
if ex.sessionData.DefaultTxnUseFollowerReads {
if ex.sessionData().DefaultTxnUseFollowerReads {
return tree.AsOfClause{Expr: followerReadTimestampExpr}
}
return tree.AsOfClause{}
Expand All @@ -2345,7 +2353,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
ex.memMetrics,
ex.server.cfg.Settings,
)
ie.SetSessionData(ex.sessionData)
ie.SetSessionData(ex.sessionData())

*evalCtx = extendedEvalContext{
EvalContext: tree.EvalContext{
Expand All @@ -2357,7 +2365,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
Tenant: p,
JoinTokenCreator: p,
PreparedStatementState: &ex.extraTxnState.prepStmtsNamespace,
SessionData: ex.sessionData,
SessionData: ex.sessionDataStack.Top(),
Settings: ex.server.cfg.Settings,
TestingKnobs: ex.server.cfg.EvalContextTestingKnobs,
ClusterID: ex.server.cfg.ClusterID(),
Expand Down Expand Up @@ -2472,14 +2480,14 @@ func (ex *connExecutor) resetPlanner(
p.cancelChecker.Reset(ctx)

p.semaCtx = tree.MakeSemaContext()
p.semaCtx.SearchPath = ex.sessionData.SearchPath
p.semaCtx.IntervalStyleEnabled = ex.sessionData.IntervalStyleEnabled
p.semaCtx.DateStyleEnabled = ex.sessionData.DateStyleEnabled
p.semaCtx.SearchPath = ex.sessionData().SearchPath
p.semaCtx.IntervalStyleEnabled = ex.sessionData().IntervalStyleEnabled
p.semaCtx.DateStyleEnabled = ex.sessionData().DateStyleEnabled
p.semaCtx.Annotations = nil
p.semaCtx.TypeResolver = p
p.semaCtx.TableNameResolver = p
p.semaCtx.DateStyle = ex.sessionData.GetDateStyle()
p.semaCtx.IntervalStyle = ex.sessionData.GetIntervalStyle()
p.semaCtx.DateStyle = ex.sessionData().GetDateStyle()
p.semaCtx.IntervalStyle = ex.sessionData().GetIntervalStyle()

ex.resetEvalCtx(&p.extendedEvalCtx, txn, stmtTS)

Expand Down Expand Up @@ -2692,7 +2700,7 @@ func (ex *connExecutor) cancelSession() {

// user is part of the registrySession interface.
func (ex *connExecutor) user() security.SQLUsername {
return ex.sessionData.User()
return ex.sessionData().User()
}

// serialize is part of the registrySession interface.
Expand Down Expand Up @@ -2768,12 +2776,12 @@ func (ex *connExecutor) serialize() serverpb.Session {
}

remoteStr := "<admin>"
if ex.sessionData.RemoteAddr != nil {
remoteStr = ex.sessionData.RemoteAddr.String()
if ex.sessionData().RemoteAddr != nil {
remoteStr = ex.sessionData().RemoteAddr.String()
}

return serverpb.Session{
Username: ex.sessionData.User().Normalized(),
Username: ex.sessionData().User().Normalized(),
ClientAddress: remoteStr,
ApplicationName: ex.applicationName.Load().(string),
Start: ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionInit).UTC(),
Expand Down
22 changes: 11 additions & 11 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ func (ex *connExecutor) execStmt(
case stateOpen:
if ex.server.cfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels {
remoteAddr := "internal"
if rAddr := ex.sessionData.RemoteAddr; rAddr != nil {
if rAddr := ex.sessionData().RemoteAddr; rAddr != nil {
remoteAddr = rAddr.String()
}
labels := pprof.Labels(
"appname", ex.sessionData.ApplicationName,
"appname", ex.sessionData().ApplicationName,
"addr", remoteAddr,
"stmt.tag", ast.StatementTag(),
"stmt.anonymized", anonymizeStmt(ast),
Expand All @@ -141,23 +141,23 @@ func (ex *connExecutor) execStmt(
panic(errors.AssertionFailedf("unexpected txn state: %#v", ex.machine.CurState()))
}

if ex.sessionData.IdleInSessionTimeout > 0 {
if ex.sessionData().IdleInSessionTimeout > 0 {
// Cancel the session if the idle time exceeds the idle in session timeout.
ex.mu.IdleInSessionTimeout = timeout{time.AfterFunc(
ex.sessionData.IdleInSessionTimeout,
ex.sessionData().IdleInSessionTimeout,
ex.cancelSession,
)}
}

if ex.sessionData.IdleInTransactionSessionTimeout > 0 {
if ex.sessionData().IdleInTransactionSessionTimeout > 0 {
startIdleInTransactionSessionTimeout := func() {
switch ast.(type) {
case *tree.CommitTransaction, *tree.RollbackTransaction:
// Do nothing, the transaction is completed, we do not want to start
// an idle timer.
default:
ex.mu.IdleInTransactionSessionTimeout = timeout{time.AfterFunc(
ex.sessionData.IdleInTransactionSessionTimeout,
ex.sessionData().IdleInTransactionSessionTimeout,
ex.cancelSession,
)}
}
Expand Down Expand Up @@ -421,7 +421,7 @@ func (ex *connExecutor) execStmtInOpenState(
return makeErrEvent(err)
}
var err error
pinfo, err = fillInPlaceholders(ctx, ps, name, e.Params, ex.sessionData.SearchPath)
pinfo, err = fillInPlaceholders(ctx, ps, name, e.Params, ex.sessionData().SearchPath)
if err != nil {
return makeErrEvent(err)
}
Expand Down Expand Up @@ -465,9 +465,9 @@ func (ex *connExecutor) execStmtInOpenState(

// We exempt `SET` statements from the statement timeout, particularly so as
// not to block the `SET statement_timeout` command itself.
if ex.sessionData.StmtTimeout > 0 && ast.StatementTag() != "SET" {
if ex.sessionData().StmtTimeout > 0 && ast.StatementTag() != "SET" {
timerDuration :=
ex.sessionData.StmtTimeout - timeutil.Since(ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionQueryReceived))
ex.sessionData().StmtTimeout - timeutil.Since(ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionQueryReceived))
// There's no need to proceed with execution if the timer has already expired.
if timerDuration < 0 {
queryTimedOut = true
Expand All @@ -488,7 +488,7 @@ func (ex *connExecutor) execStmtInOpenState(
if perr, ok := retPayload.(payloadWithError); ok {
execErr = perr.errorCause()
}
filter(ctx, ex.sessionData, ast.String(), execErr)
filter(ctx, ex.sessionData(), ast.String(), execErr)
}

// Do the auto-commit, if necessary.
Expand Down Expand Up @@ -961,7 +961,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(

ex.sessionTracing.TracePlanCheckStart(ctx)
distributePlan := getPlanDistribution(
ctx, planner, planner.execCfg.NodeID, ex.sessionData.DistSQLMode, planner.curPlan.main,
ctx, planner, planner.execCfg.NodeID, ex.sessionData().DistSQLMode, planner.curPlan.main,
)
ex.sessionTracing.TracePlanCheckEnd(ctx, nil, distributePlan.WillDistribute())

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_savepoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (ex *connExecutor) execRollbackToSavepointInAbortedState(
// isCommitOnReleaseSavepoint returns true if the savepoint name implies special
// release semantics: releasing it commits the underlying KV txn.
func (ex *connExecutor) isCommitOnReleaseSavepoint(savepoint tree.Name) bool {
if ex.sessionData.ForceSavepointRestart {
if ex.sessionData().ForceSavepointRestart {
// The session setting force_savepoint_restart implies that all
// uses of the SAVEPOINT statement are targeting restarts.
return true
Expand Down

0 comments on commit d71dd32

Please sign in to comment.