Skip to content

Commit

Permalink
Merge #24479
Browse files Browse the repository at this point in the history
24479: distsqlrun: make flow.Processors clear only contain non-fused procs r=andreimatei a=andreimatei

flow.Processors used to contain awkward nil entries for fused procs. I
didn't like it.

Release note: None
  • Loading branch information
craig[bot] committed Apr 4, 2018
2 parents 98e12ba + 2a90585 commit 38dbf4d
Showing 1 changed file with 57 additions and 47 deletions.
104 changes: 57 additions & 47 deletions pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ type Flow struct {
FlowCtx

flowRegistry *flowRegistry
processors []Processor
// processors contains a subset of the processors in the flow - the ones that
// run in their own goroutines. Some processors that implement RowSource are
// scheduled to run in their consumer's goroutine; those are not present here.
processors []Processor
// startables are entities that must be started when the flow starts;
// currently these are outboxes and routers.
startables []startable
Expand Down Expand Up @@ -366,59 +369,68 @@ func (f *Flow) setup(ctx context.Context, spec *FlowSpec) error {
}
}

f.processors = make([]Processor, len(spec.Processors))
f.processors = make([]Processor, 0, len(spec.Processors))

// Populate f.processors: see which processors need their own goroutine and
// which are fused with their consumer.
for i := range spec.Processors {
var err error
f.processors[i], err = f.makeProcessor(&spec.Processors[i], inputSyncs[i])
pspec := &spec.Processors[i]
p, err := f.makeProcessor(pspec, inputSyncs[i])
if err != nil {
return err
}

// If the processor implements RowSource try to hook it up directly to the
// input of a later processor.
source, ok := f.processors[i].(RowSource)
if !ok {
continue
}
pspec := &spec.Processors[i]
if len(pspec.Output) != 1 {
// The processor has more than one output, use the normal routing
// machinery.
continue
}
ospec := &pspec.Output[0]
if ospec.Type != OutputRouterSpec_PASS_THROUGH {
// The output is not pass-through and thus is being sent through a
// router.
continue
}
if len(ospec.Streams) != 1 {
// The output contains more than one stream.
continue
}

for pIdx, ps := range spec.Processors {
if pIdx <= i {
// Skip processors which have already been created.
continue
// fuse will return true if we managed to fuse p, false otherwise.
fuse := func() bool {
// If the processor implements RowSource try to hook it up directly to the
// input of a later processor.
source, ok := p.(RowSource)
if !ok {
return false
}
for inIdx, in := range ps.Input {
// Look for "simple" inputs: an unordered input (which, by definition,
// doesn't require an ordered synchronizer), with a single input stream
// (which doesn't require a MultiplexedRowChannel).
if in.Type != InputSyncSpec_UNORDERED {
continue
}
if len(in.Streams) != 1 {
if len(pspec.Output) != 1 {
// The processor has more than one output, use the normal routing
// machinery.
return false
}
ospec := &pspec.Output[0]
if ospec.Type != OutputRouterSpec_PASS_THROUGH {
// The output is not pass-through and thus is being sent through a
// router.
return false
}
if len(ospec.Streams) != 1 {
// The output contains more than one stream.
return false
}

for pIdx, ps := range spec.Processors {
if pIdx <= i {
// Skip processors which have already been created.
continue
}
if in.Streams[0].StreamID != ospec.Streams[0].StreamID {
continue
for inIdx, in := range ps.Input {
// Look for "simple" inputs: an unordered input (which, by definition,
// doesn't require an ordered synchronizer), with a single input stream
// (which doesn't require a MultiplexedRowChannel).
if in.Type != InputSyncSpec_UNORDERED {
continue
}
if len(in.Streams) != 1 {
continue
}
if in.Streams[0].StreamID != ospec.Streams[0].StreamID {
continue
}
// We found a consumer to fuse our proc to.
inputSyncs[pIdx][inIdx] = source
return true
}
inputSyncs[pIdx][inIdx] = source
f.processors[i] = nil
}
return false
}
if !fuse() {
f.processors = append(f.processors, p)
}
}
return nil
Expand Down Expand Up @@ -465,10 +477,8 @@ func (f *Flow) Start(ctx context.Context, doneFn func()) error {
s.start(f.Ctx, &f.waitGroup, f.ctxCancel)
}
for _, p := range f.processors {
if p != nil {
f.waitGroup.Add(1)
go p.Run(&f.waitGroup)
}
f.waitGroup.Add(1)
go p.Run(&f.waitGroup)
}
return nil
}
Expand Down

0 comments on commit 38dbf4d

Please sign in to comment.