Skip to content

Commit

Permalink
flowinfra: fix recently introduced minor bug
Browse files Browse the repository at this point in the history
This commit fixes a minor bug introduced in cockroachdb#110625. In particular, that
PR made so that we now unconditionally call `ConsumerClosed` on the
"head" processor to make sure that resources are properly closed in the
pausable portal model. However, `ConsumerClosed` is only valid to be
called only if `Start` was called, so this commit fixes that.
Additionally, to de-risk cockroachdb#110625 further we now only call that method
for local flows since pausable portals currently disable DistSQL.

Epic: None

Release note: None
  • Loading branch information
yuzefovich committed Sep 18, 2023
1 parent 6cbd07e commit b06375d
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ type FlowBase struct {
// goroutines.
startedGoroutines bool

// headProcStarted tracks whether Start was called on the "head" processor
// in Run.
headProcStarted bool

// inboundStreams are streams that receive data from other hosts; this map
// is to be passed to FlowRegistry.RegisterFlow. This map is populated in
// Flow.Setup(), so it is safe to lookup into concurrently later.
Expand Down Expand Up @@ -571,6 +575,7 @@ func (f *FlowBase) Run(ctx context.Context, noWait bool) {
}
f.resumeCtx = ctx
log.VEventf(ctx, 1, "running %T in the flow's goroutine", headProc)
f.headProcStarted = true
headProc.Run(ctx, headOutput)
}

Expand Down Expand Up @@ -661,17 +666,26 @@ func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) {
// ConsumerClosed on the source (i.e. the "head" processor).
//
// The method is only called if:
// - the flow is local (pausable portals currently don't support DistSQL)
// - there is exactly 1 processor in the flow that runs in its own goroutine
// (which is always the case for pausable portal model at this time)
// - Start was called on that processor (ConsumerClosed is only valid to be
// called after Start)
// - that single processor implements execinfra.RowSource interface (those
// processors that don't implement it shouldn't be running through pausable
// portal model).
//
// Otherwise, this method is a noop.
func (f *FlowBase) ConsumerClosedOnHeadProc() {
if !f.IsLocal() {
return
}
if len(f.processors) != 1 {
return
}
if !f.headProcStarted {
return
}
rs, ok := f.processors[0].(execinfra.RowSource)
if !ok {
return
Expand Down

0 comments on commit b06375d

Please sign in to comment.