Skip to content

Commit

Permalink
sql: remove namedFunc struct
Browse files Browse the repository at this point in the history
The `fName` field of the `namedFunc` struct was never read. The entire
struct has been removed and replaced with the `func(context.Context)`
type.

Release note: None
  • Loading branch information
mgartner committed Dec 19, 2024
1 parent 68cd18a commit 27454c2
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 48 deletions.
55 changes: 21 additions & 34 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,8 @@ func (ex *connExecutor) execPortal(
defer func() {
if portal.isPausable() {
if !portal.pauseInfo.exhaustPortal.cleanup.isComplete {
portal.pauseInfo.exhaustPortal.cleanup.appendFunc(namedFunc{
fName: "exhaust portal",
f: func(_ context.Context) {
ex.exhaustPortal(portalName)
},
portal.pauseInfo.exhaustPortal.cleanup.appendFunc(func(_ context.Context) {
ex.exhaustPortal(portalName)
})
portal.pauseInfo.exhaustPortal.cleanup.isComplete = true
}
Expand Down Expand Up @@ -1184,18 +1181,15 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
// For pausable portals, we delay the clean-up until closing the portal by
// adding the function to the execStmtInOpenStateCleanup.
// Otherwise, perform the clean-up step within every execution.
processCleanupFunc := func(fName string, f func()) {
processCleanupFunc := func(f func()) {
if !portal.isPausable() {
f()
} else if !portal.pauseInfo.execStmtInOpenState.cleanup.isComplete {
portal.pauseInfo.execStmtInOpenState.cleanup.appendFunc(namedFunc{
fName: fName,
f: func(_ context.Context) {
f()
// Some cleanup steps modify the retErr and retPayload. We need to
// ensure that cleanup after them can see the update.
updateRetErrAndPayload(retErr, retPayload)
},
portal.pauseInfo.execStmtInOpenState.cleanup.appendFunc(func(_ context.Context) {
f()
// Some cleanup steps modify the retErr and retPayload. We need to
// ensure that cleanup after them can see the update.
updateRetErrAndPayload(retErr, retPayload)
})
}
}
Expand Down Expand Up @@ -1235,7 +1229,7 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
portal.pauseInfo.execStmtInOpenState.spCtx = ctx
}
defer func() {
processCleanupFunc("cleanup span", sp.Finish)
processCleanupFunc(sp.Finish)
}()
} else {
ctx = portal.pauseInfo.execStmtInOpenState.spCtx
Expand Down Expand Up @@ -1297,7 +1291,6 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
}
defer func() {
processCleanupFunc(
"increment executed stmt cnt",
func() {
// We need to check the latest errors rather than the ones evaluated
// when this function is created.
Expand All @@ -1318,7 +1311,7 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(

// Make sure that we always unregister the query.
defer func() {
processCleanupFunc("cancel query", func() {
processCleanupFunc(func() {
ex.removeActiveQuery(queryID, vars.ast)
vars.cancelQuery()
})
Expand Down Expand Up @@ -1502,7 +1495,7 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
// of finishing the instrumentation helper. This is needed since in order to
// support plan-gist-matching of the statement diagnostics we might not know
// right now whether Finish needs to happen.
defer processCleanupFunc("finish instrumentation helper", func() {
defer processCleanupFunc(func() {
// We need this weird thing because we need to make sure we're
// closing the correct instrumentation helper for the paused portal.
ihToFinish := ih
Expand Down Expand Up @@ -1628,7 +1621,7 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
// some special plan initialization for logging.
dispatchToExecEngine := false

defer processCleanupFunc("log statement", func() {
defer processCleanupFunc(func() {
// If we did not dispatch to the execution engine, we need to initialize
// the plan here.
if !dispatchToExecEngine {
Expand Down Expand Up @@ -1696,7 +1689,7 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
}
}

processCleanupFunc("set query error", func() {
processCleanupFunc(func() {
cancelQueryCtx := ctx
if portal.isPausable() {
cancelQueryCtx = portal.pauseInfo.execStmtInOpenState.cancelQueryCtx
Expand Down Expand Up @@ -2718,11 +2711,8 @@ func (ex *connExecutor) dispatchToExecutionEngine(
defer planner.curPlan.close(ctx)
} else {
ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{
fName: "close planTop",
f: func(ctx context.Context) {
ppInfo.dispatchToExecutionEngine.planTop.close(ctx)
},
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
ppInfo.dispatchToExecutionEngine.planTop.close(ctx)
})
}
} else {
Expand Down Expand Up @@ -2899,15 +2889,12 @@ func (ex *connExecutor) dispatchToExecutionEngine(
// We need to ensure that we're using the planner bound to the first-time
// execution of a portal.
curPlanner := *planner
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{
fName: "populate query level stats and regions",
f: func(ctx context.Context) {
populateQueryLevelStats(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector)
ppInfo.dispatchToExecutionEngine.stmtFingerprintID = ex.recordStatementSummary(
ctx, &curPlanner,
int(ex.state.mu.autoRetryCounter), ppInfo.dispatchToExecutionEngine.rowsAffected, ppInfo.curRes.ErrAllowReleased(), *ppInfo.dispatchToExecutionEngine.queryStats,
)
},
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
populateQueryLevelStats(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector)
ppInfo.dispatchToExecutionEngine.stmtFingerprintID = ex.recordStatementSummary(
ctx, &curPlanner,
int(ex.state.mu.autoRetryCounter), ppInfo.dispatchToExecutionEngine.rowsAffected, ppInfo.curRes.ErrAllowReleased(), *ppInfo.dispatchToExecutionEngine.queryStats,
)
})
} else {
populateQueryLevelStats(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector)
Expand Down
6 changes: 2 additions & 4 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -1746,10 +1746,8 @@ func (dsp *DistSQLPlanner) PlanAndRunAll(
}
}
if !p.resumableFlow.cleanup.isComplete {
p.resumableFlow.cleanup.appendFunc(namedFunc{
fName: "cleanup flow", f: func(ctx context.Context) {
p.resumableFlow.flow.Cleanup(ctx)
},
p.resumableFlow.cleanup.appendFunc(func(ctx context.Context) {
p.resumableFlow.flow.Cleanup(ctx)
})
}
}
Expand Down
13 changes: 3 additions & 10 deletions pkg/sql/prepared_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,28 +334,21 @@ func (p *PreparedPortal) isPausable() bool {
// functions are added during the first-time execution of a portal. When the
// first-time execution is finished, we mark isComplete to true.
type cleanupFuncStack struct {
stack []namedFunc
stack []func(context.Context)
isComplete bool
}

func (n *cleanupFuncStack) appendFunc(f namedFunc) {
func (n *cleanupFuncStack) appendFunc(f func(context.Context)) {
n.stack = append(n.stack, f)
}

func (n *cleanupFuncStack) run(ctx context.Context) {
for i := 0; i < len(n.stack); i++ {
n.stack[i].f(ctx)
n.stack[i](ctx)
}
*n = cleanupFuncStack{}
}

// namedFunc is function with name, which makes the debugging easier. It is
// used just for clean up functions of a pausable portal.
type namedFunc struct {
fName string
f func(context.Context)
}

// instrumentationHelperWrapper wraps the instrumentation helper.
// We need to maintain it for a paused portal.
type instrumentationHelperWrapper struct {
Expand Down

0 comments on commit 27454c2

Please sign in to comment.