Skip to content

Commit

Permalink
distsql: eliminate ctx allocation in setupFlow
Browse files Browse the repository at this point in the history
The `ctx` parameter of `setupFlow` is no longer captured by closures,
preventing it from being heap allocated.

Release note: None
  • Loading branch information
mgartner committed Dec 16, 2024
1 parent 2494fcc commit 2f0f8cb
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func NewColIndexJoin(
cFetcherMemoryLimit := totalMemoryLimit

var kvFetcher *row.KVFetcher
useStreamer, txn, err := flowCtx.UseStreamer()
useStreamer, txn, err := flowCtx.UseStreamer(ctx)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colflow/explain_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func convertToVecTree(
// the creator.
&fakeBatchReceiver{},
localProcessors,
nil, /* localVectorSources */
func() {}, /* onFlowCleanupEnd */
"", /* statementSQL */
nil, /* localVectorSources */
nil, /* onFlowCleanupEnd */
"", /* statementSQL */
)
creator := newVectorizedFlowCreator(
flowBase, nil /* componentCreator */, recordingStats,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (f *vectorizedFlow) MemUsage() int64 {
func (f *vectorizedFlow) Cleanup(ctx context.Context) {
startCleanup, endCleanup := f.FlowBase.GetOnCleanupFns()
startCleanup()
defer endCleanup()
defer endCleanup(ctx)

// This cleans up all the memory and disk monitoring of the vectorized flow
// as well as closes all the closers.
Expand Down
20 changes: 10 additions & 10 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (ds *ServerImpl) setupFlow(
) (retCtx context.Context, _ flowinfra.Flow, _ execopnode.OpChains, retErr error) {
var sp *tracing.Span // will be Finish()ed by Flow.Cleanup()
var monitor, diskMonitor *mon.BytesMonitor // will be closed in Flow.Cleanup()
var onFlowCleanupEnd func() // will be called at the very end of Flow.Cleanup()
var onFlowCleanupEnd func(context.Context) // will be called at the very end of Flow.Cleanup()
// Make sure that we clean up all resources (which in the happy case are
// cleaned up in Flow.Cleanup()) if an error is encountered.
defer func() {
Expand All @@ -194,7 +194,7 @@ func (ds *ServerImpl) setupFlow(
diskMonitor.Stop(ctx)
}
if onFlowCleanupEnd != nil {
onFlowCleanupEnd()
onFlowCleanupEnd(ctx)
} else {
reserved.Close(ctx)
}
Expand Down Expand Up @@ -245,7 +245,7 @@ func (ds *ServerImpl) setupFlow(
monitor.Start(ctx, parentMonitor, reserved)
diskMonitor = execinfra.NewMonitor(ctx, ds.ParentDiskMonitor, "flow-disk-monitor")

makeLeaf := func() (*kv.Txn, error) {
makeLeaf := func(ctx context.Context) (*kv.Txn, error) {
tis := req.LeafTxnInputState
if tis == nil {
// This must be a flow running for some bulk-io operation that doesn't use
Expand Down Expand Up @@ -281,13 +281,13 @@ func (ds *ServerImpl) setupFlow(
// the whole evalContext, but that isn't free, so we choose to restore
// the original state in order to avoid performance regressions.
origTxn := localEvalCtx.Txn
onFlowCleanupEnd = func() {
onFlowCleanupEnd = func(ctx context.Context) {
localEvalCtx.Txn = origTxn
reserved.Close(ctx)
}
if localState.MustUseLeafTxn() {
var err error
leafTxn, err = makeLeaf()
leafTxn, err = makeLeaf(ctx)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -296,7 +296,7 @@ func (ds *ServerImpl) setupFlow(
localEvalCtx.Txn = leafTxn
}
} else {
onFlowCleanupEnd = func() {
onFlowCleanupEnd = func(ctx context.Context) {
reserved.Close(ctx)
}
if localState.IsLocal {
Expand All @@ -312,7 +312,7 @@ func (ds *ServerImpl) setupFlow(
// It's important to populate evalCtx.Txn early. We'll write it again in the
// f.SetTxn() call below, but by then it will already have been captured by
// processors.
leafTxn, err = makeLeaf()
leafTxn, err = makeLeaf(ctx)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -416,7 +416,7 @@ func (ds *ServerImpl) setupFlow(
} else {
// If I haven't created the leaf already, do it now.
if leafTxn == nil {
leafTxn, err = makeLeaf()
leafTxn, err = makeLeaf(ctx)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -442,7 +442,7 @@ func (ds *ServerImpl) newFlowContext(
id execinfrapb.FlowID,
evalCtx *eval.Context,
monitor, diskMonitor *mon.BytesMonitor,
makeLeafTxn func() (*kv.Txn, error),
makeLeafTxn func(context.Context) (*kv.Txn, error),
traceKV bool,
collectStats bool,
localState LocalState,
Expand Down Expand Up @@ -494,7 +494,7 @@ func newFlow(
localProcessors []execinfra.LocalProcessor,
localVectorSources map[int32]any,
isVectorized bool,
onFlowCleanupEnd func(),
onFlowCleanupEnd func(context.Context),
statementSQL string,
) flowinfra.Flow {
base := flowinfra.NewFlowBase(flowCtx, sp, flowReg, rowSyncFlowConsumer, batchSyncFlowConsumer, localProcessors, localVectorSources, onFlowCleanupEnd, statementSQL)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/execinfra/flow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type FlowCtx struct {
Txn *kv.Txn

// MakeLeafTxn returns a new LeafTxn, different from Txn.
MakeLeafTxn func() (*kv.Txn, error)
MakeLeafTxn func(context.Context) (*kv.Txn, error)

// Descriptors is used to look up leased table descriptors and to construct
// transaction bound TypeResolvers to resolve type references during flow
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/execinfra/readerbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ func (h *LimitHintHelper) ReadSomeRows(rowsRead int64) error {

// UseStreamer returns whether the kvstreamer.Streamer API should be used as
// well as the txn that should be used (regardless of the boolean return value).
func (flowCtx *FlowCtx) UseStreamer() (bool, *kv.Txn, error) {
func (flowCtx *FlowCtx) UseStreamer(ctx context.Context) (bool, *kv.Txn, error) {
useStreamer := flowCtx.EvalCtx.SessionData().StreamerEnabled && flowCtx.Txn != nil &&
flowCtx.Txn.Type() == kv.LeafTxn && flowCtx.MakeLeafTxn != nil
if !useStreamer {
return false, flowCtx.Txn, nil
}
leafTxn, err := flowCtx.MakeLeafTxn()
leafTxn, err := flowCtx.MakeLeafTxn(ctx)
if leafTxn == nil || err != nil {
// leafTxn might be nil in some flows which run outside of the txn, the
// streamer should not be used in such cases.
Expand Down
11 changes: 6 additions & 5 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ type Flow interface {
// GetOnCleanupFns returns a couple of functions that should be called at
// the very beginning and the very end of Cleanup, respectively. Both will
// be non-nil.
GetOnCleanupFns() (startCleanup, endCleanup func())
GetOnCleanupFns() (startCleanup func(), endCleanup func(context.Context))

// Cleanup must be called whenever the flow is done (meaning it either
// completes gracefully after all processors and mailboxes exited or an
Expand Down Expand Up @@ -227,7 +227,7 @@ type FlowBase struct {
// onCleanupStart and onCleanupEnd will be called in the very beginning and
// the very end of Cleanup(), respectively.
onCleanupStart func()
onCleanupEnd func()
onCleanupEnd func(context.Context)

statementSQL string

Expand Down Expand Up @@ -323,7 +323,7 @@ func NewFlowBase(
batchSyncFlowConsumer execinfra.BatchReceiver,
localProcessors []execinfra.LocalProcessor,
localVectorSources map[int32]any,
onFlowCleanupEnd func(),
onFlowCleanupEnd func(context.Context),
statementSQL string,
) *FlowBase {
// We are either in a single tenant cluster, or a SQL node in a multi-tenant
Expand Down Expand Up @@ -642,15 +642,16 @@ func (f *FlowBase) AddOnCleanupStart(fn func()) {
}

var noopFn = func() {}
var noopCtxFn = func(context.Context) {}

// GetOnCleanupFns is part of the Flow interface.
func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) {
func (f *FlowBase) GetOnCleanupFns() (startCleanup func(), endCleanup func(context.Context)) {
onCleanupStart, onCleanupEnd := f.onCleanupStart, f.onCleanupEnd
if onCleanupStart == nil {
onCleanupStart = noopFn
}
if onCleanupEnd == nil {
onCleanupEnd = noopFn
onCleanupEnd = noopCtxFn
}
return onCleanupStart, onCleanupEnd
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func newJoinReader(
// in order to ensure the lookups are ordered, so set shouldLimitBatches.
spec.MaintainOrdering, shouldLimitBatches = true, true
}
useStreamer, txn, err := flowCtx.UseStreamer()
useStreamer, txn, err := flowCtx.UseStreamer(ctx)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowflow/row_based_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (f *rowBasedFlow) Release() {
func (f *rowBasedFlow) Cleanup(ctx context.Context) {
startCleanup, endCleanup := f.FlowBase.GetOnCleanupFns()
startCleanup()
defer endCleanup()
defer endCleanup(ctx)
for i := range f.monitors {
f.monitors[i].Stop(ctx)
}
Expand Down

0 comments on commit 2f0f8cb

Please sign in to comment.