diff --git a/pkg/sql/distsqlrun/flow.go b/pkg/sql/distsqlrun/flow.go index c58a9ab9c18e..416b55e98126 100644 --- a/pkg/sql/distsqlrun/flow.go +++ b/pkg/sql/distsqlrun/flow.go @@ -128,7 +128,10 @@ type Flow struct { FlowCtx flowRegistry *flowRegistry - processors []Processor + // processors contains a subset of the processors in the flow - the ones that + // run in their own goroutines. Some processors that implement RowSource are + // scheduled to run in their consumer's goroutine; those are not present here. + processors []Processor // startables are entities that must be started when the flow starts; // currently these are outboxes and routers. startables []startable @@ -366,59 +369,68 @@ func (f *Flow) setup(ctx context.Context, spec *FlowSpec) error { } } - f.processors = make([]Processor, len(spec.Processors)) + f.processors = make([]Processor, 0, len(spec.Processors)) + // Populate f.processors: see which processors need their own goroutine and + // which are fused with their consumer. for i := range spec.Processors { - var err error - f.processors[i], err = f.makeProcessor(&spec.Processors[i], inputSyncs[i]) + pspec := &spec.Processors[i] + p, err := f.makeProcessor(pspec, inputSyncs[i]) if err != nil { return err } - // If the processor implements RowSource try to hook it up directly to the - // input of a later processor. - source, ok := f.processors[i].(RowSource) - if !ok { - continue - } - pspec := &spec.Processors[i] - if len(pspec.Output) != 1 { - // The processor has more than one output, use the normal routing - // machinery. - continue - } - ospec := &pspec.Output[0] - if ospec.Type != OutputRouterSpec_PASS_THROUGH { - // The output is not pass-through and thus is being sent through a - // router. - continue - } - if len(ospec.Streams) != 1 { - // The output contains more than one stream. - continue - } - - for pIdx, ps := range spec.Processors { - if pIdx <= i { - // Skip processors which have already been created. - continue + // fuse will return true if we managed to fuse p, false otherwise. + fuse := func() bool { + // If the processor implements RowSource try to hook it up directly to the + // input of a later processor. + source, ok := p.(RowSource) + if !ok { + return false } - for inIdx, in := range ps.Input { - // Look for "simple" inputs: an unordered input (which, by definition, - // doesn't require an ordered synchronizer), with a single input stream - // (which doesn't require a MultiplexedRowChannel). - if in.Type != InputSyncSpec_UNORDERED { - continue - } - if len(in.Streams) != 1 { + if len(pspec.Output) != 1 { + // The processor has more than one output, use the normal routing + // machinery. + return false + } + ospec := &pspec.Output[0] + if ospec.Type != OutputRouterSpec_PASS_THROUGH { + // The output is not pass-through and thus is being sent through a + // router. + return false + } + if len(ospec.Streams) != 1 { + // The output contains more than one stream. + return false + } + + for pIdx, ps := range spec.Processors { + if pIdx <= i { + // Skip processors which have already been created. continue } - if in.Streams[0].StreamID != ospec.Streams[0].StreamID { - continue + for inIdx, in := range ps.Input { + // Look for "simple" inputs: an unordered input (which, by definition, + // doesn't require an ordered synchronizer), with a single input stream + // (which doesn't require a MultiplexedRowChannel). + if in.Type != InputSyncSpec_UNORDERED { + continue + } + if len(in.Streams) != 1 { + continue + } + if in.Streams[0].StreamID != ospec.Streams[0].StreamID { + continue + } + // We found a consumer to fuse our proc to. + inputSyncs[pIdx][inIdx] = source + return true } - inputSyncs[pIdx][inIdx] = source - f.processors[i] = nil } + return false + } + if !fuse() { + f.processors = append(f.processors, p) } } return nil @@ -465,10 +477,8 @@ func (f *Flow) Start(ctx context.Context, doneFn func()) error { s.start(f.Ctx, &f.waitGroup, f.ctxCancel) } for _, p := range f.processors { - if p != nil { - f.waitGroup.Add(1) - go p.Run(&f.waitGroup) - } + f.waitGroup.Add(1) + go p.Run(&f.waitGroup) } return nil }