Skip to content

Commit

Permalink
sql: remove processCleanupFunc in execStmtInOpenState
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
mgartner committed Dec 11, 2024
1 parent 11f5bac commit 03492a6
Showing 1 changed file with 34 additions and 51 deletions.
85 changes: 34 additions & 51 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,19 +314,13 @@ func (ex *connExecutor) execStmtInOpenState(
// requires only a single heap allocation.
var vars localVars

processCleanupFunc := func(fName string, f func()) {
f()
}

vars.ast = parserStmt.AST
var sp *tracing.Span
ctx, sp = tracing.ChildSpan(ctx, "sql query")
// TODO(andrei): Consider adding the placeholders as tags too.
sp.SetTag("statement", attribute.StringValue(parserStmt.SQL))
ctx = withStatement(ctx, vars.ast)
defer func() {
processCleanupFunc("cleanup span", sp.Finish)
}()
defer sp.Finish()

makeErrEvent := func(err error) (fsm.Event, fsm.EventPayload, error) {
ev, payload := ex.makeErrEvent(err, vars.ast)
Expand Down Expand Up @@ -367,24 +361,15 @@ func (ex *connExecutor) execStmtInOpenState(
ex.state.mu.Unlock()
ex.addActiveQuery(parserStmt, pinfo, queryID, vars.cancelQuery)
defer func() {
processCleanupFunc(
"increment executed stmt cnt",
func() {
// We need to check the latest errors rather than the ones evaluated
// when this function is created.
if retErr == nil && !payloadHasError(retPayload) {
ex.incrementExecutedStmtCounter(vars.ast)
}
},
)
if retErr == nil && !payloadHasError(retPayload) {
ex.incrementExecutedStmtCounter(vars.ast)
}
}()

// Make sure that we always unregister the query.
defer func() {
processCleanupFunc("cancel query", func() {
ex.removeActiveQuery(queryID, vars.ast)
vars.cancelQuery()
})
ex.removeActiveQuery(queryID, vars.ast)
vars.cancelQuery()

// Note ex.metrics is Server.Metrics for the connExecutor that serves the
// client connection, and is Server.InternalMetrics for internal executors.
Expand Down Expand Up @@ -537,7 +522,7 @@ func (ex *connExecutor) execStmtInOpenState(
// of finishing the instrumentation helper. This is needed since in order to
// support plan-gist-matching of the statement diagnostics we might not know
// right now whether Finish needs to happen.
defer processCleanupFunc("finish instrumentation helper", func() {
defer func() {
if ih.needFinish {
retErr = ih.Finish(
ex.server.cfg,
Expand All @@ -552,7 +537,7 @@ func (ex *connExecutor) execStmtInOpenState(
retErr,
)
}
})
}()

if ex.executorType != executorTypeInternal && ex.sessionData().TransactionTimeout > 0 && !ex.implicitTxn() {
timerDuration :=
Expand Down Expand Up @@ -653,7 +638,7 @@ func (ex *connExecutor) execStmtInOpenState(
// some special plan initialization for logging.
dispatchToExecEngine := false

defer processCleanupFunc("log statement", func() {
defer func() {
// If we did not dispatch to the execution engine, we need to initialize
// the plan here.
if !dispatchToExecEngine {
Expand Down Expand Up @@ -689,7 +674,7 @@ func (ex *connExecutor) execStmtInOpenState(
ex.implicitTxn(),
ex.statsCollector,
ex.extraTxnState.shouldLogToTelemetry)
})
}()

// Overwrite res.Error to a more user-friendly message in case of query
// cancellation.
Expand All @@ -709,33 +694,31 @@ func (ex *connExecutor) execStmtInOpenState(
}
}

processCleanupFunc("set query error", func() {
cancelQueryCtx := ctx
resToPushErr := res
vars.logErr = resToPushErr.ErrAllowReleased()
// Detect context cancelation and overwrite whatever error might have been
// set on the result before. The idea is that once the query's context is
// canceled, all sorts of actors can detect the cancelation and set all
// sorts of errors on the result. Rather than trying to impose discipline
// in that jungle, we just overwrite them all here with an error that's
// nicer to look at for the client.
if resToPushErr != nil && cancelQueryCtx.Err() != nil && resToPushErr.ErrAllowReleased() != nil {
// Even in the cases where the error is a retryable error, we want to
// intercept the event and payload returned here to ensure that the query
// is not retried.
retEv = eventNonRetriableErr{
IsCommit: fsm.FromBool(isCommit(vars.ast)),
}
errToPush := cancelchecker.QueryCanceledError
resToPushErr.SetError(errToPush)
retPayload = eventNonRetriableErrPayload{err: errToPush}
vars.logErr = errToPush
// Cancel the txn if we are inside an implicit txn too.
if ex.implicitTxn() && ex.state.txnCancelFn != nil {
ex.state.txnCancelFn()
}
cancelQueryCtx := ctx
resToPushErr := res
vars.logErr = resToPushErr.ErrAllowReleased()
// Detect context cancelation and overwrite whatever error might have been
// set on the result before. The idea is that once the query's context is
// canceled, all sorts of actors can detect the cancelation and set all
// sorts of errors on the result. Rather than trying to impose discipline
// in that jungle, we just overwrite them all here with an error that's
// nicer to look at for the client.
if resToPushErr != nil && cancelQueryCtx.Err() != nil && resToPushErr.ErrAllowReleased() != nil {
// Even in the cases where the error is a retryable error, we want to
// intercept the event and payload returned here to ensure that the query
// is not retried.
retEv = eventNonRetriableErr{
IsCommit: fsm.FromBool(isCommit(vars.ast)),
}
})
errToPush := cancelchecker.QueryCanceledError
resToPushErr.SetError(errToPush)
retPayload = eventNonRetriableErrPayload{err: errToPush}
vars.logErr = errToPush
// Cancel the txn if we are inside an implicit txn too.
if ex.implicitTxn() && ex.state.txnCancelFn != nil {
ex.state.txnCancelFn()
}
}

// If the query timed out, we intercept the error, payload, and event here
// for the same reasons we intercept them for canceled queries above.
Expand Down

0 comments on commit 03492a6

Please sign in to comment.