From 2a90585b9e40ee315ba358551ac1ef0816f26a29 Mon Sep 17 00:00:00 2001
From: Andrei Matei <andrei@cockroachlabs.com>
Date: Wed, 4 Apr 2018 14:36:26 -0400
Subject: [PATCH] distsqlrun: make flow.Processors clear that it only contains
 non-fused procs

flow.Processors used to contain awkward nil entries for fused procs. I
didn't like it.

Release note: None
---
 pkg/sql/distsqlrun/flow.go | 104 ++++++++++++++++++++-----------------
 1 file changed, 57 insertions(+), 47 deletions(-)

diff --git a/pkg/sql/distsqlrun/flow.go b/pkg/sql/distsqlrun/flow.go
index c58a9ab9c18e..416b55e98126 100644
--- a/pkg/sql/distsqlrun/flow.go
+++ b/pkg/sql/distsqlrun/flow.go
@@ -128,7 +128,10 @@ type Flow struct {
 	FlowCtx
 
 	flowRegistry *flowRegistry
-	processors   []Processor
+	// processors contains a subset of the processors in the flow - the ones that
+	// run in their own goroutines. Some processors that implement RowSource are
+	// scheduled to run in their consumer's goroutine; those are not present here.
+	processors []Processor
 	// startables are entities that must be started when the flow starts;
 	// currently these are outboxes and routers.
 	startables []startable
@@ -366,59 +369,68 @@ func (f *Flow) setup(ctx context.Context, spec *FlowSpec) error {
 		}
 	}
 
-	f.processors = make([]Processor, len(spec.Processors))
+	f.processors = make([]Processor, 0, len(spec.Processors))
 
+	// Populate f.processors: see which processors need their own goroutine and
+	// which are fused with their consumer.
 	for i := range spec.Processors {
-		var err error
-		f.processors[i], err = f.makeProcessor(&spec.Processors[i], inputSyncs[i])
+		pspec := &spec.Processors[i]
+		p, err := f.makeProcessor(pspec, inputSyncs[i])
 		if err != nil {
 			return err
 		}
 
-		// If the processor implements RowSource try to hook it up directly to the
-		// input of a later processor.
-		source, ok := f.processors[i].(RowSource)
-		if !ok {
-			continue
-		}
-		pspec := &spec.Processors[i]
-		if len(pspec.Output) != 1 {
-			// The processor has more than one output, use the normal routing
-			// machinery.
-			continue
-		}
-		ospec := &pspec.Output[0]
-		if ospec.Type != OutputRouterSpec_PASS_THROUGH {
-			// The output is not pass-through and thus is being sent through a
-			// router.
-			continue
-		}
-		if len(ospec.Streams) != 1 {
-			// The output contains more than one stream.
-			continue
-		}
-
-		for pIdx, ps := range spec.Processors {
-			if pIdx <= i {
-				// Skip processors which have already been created.
-				continue
+		// fuse will return true if we managed to fuse p, false otherwise.
+		fuse := func() bool {
+			// If the processor implements RowSource try to hook it up directly to the
+			// input of a later processor.
+			source, ok := p.(RowSource)
+			if !ok {
+				return false
 			}
-			for inIdx, in := range ps.Input {
-				// Look for "simple" inputs: an unordered input (which, by definition,
-				// doesn't require an ordered synchronizer), with a single input stream
-				// (which doesn't require a MultiplexedRowChannel).
-				if in.Type != InputSyncSpec_UNORDERED {
-					continue
-				}
-				if len(in.Streams) != 1 {
+			if len(pspec.Output) != 1 {
+				// The processor has more than one output, use the normal routing
+				// machinery.
+				return false
+			}
+			ospec := &pspec.Output[0]
+			if ospec.Type != OutputRouterSpec_PASS_THROUGH {
+				// The output is not pass-through and thus is being sent through a
+				// router.
+				return false
+			}
+			if len(ospec.Streams) != 1 {
+				// The output contains more than one stream.
+				return false
+			}
+
+			for pIdx, ps := range spec.Processors {
+				if pIdx <= i {
+					// Skip processors which have already been created.
 					continue
 				}
-				if in.Streams[0].StreamID != ospec.Streams[0].StreamID {
-					continue
+				for inIdx, in := range ps.Input {
+					// Look for "simple" inputs: an unordered input (which, by definition,
+					// doesn't require an ordered synchronizer), with a single input stream
+					// (which doesn't require a MultiplexedRowChannel).
+					if in.Type != InputSyncSpec_UNORDERED {
+						continue
+					}
+					if len(in.Streams) != 1 {
+						continue
+					}
+					if in.Streams[0].StreamID != ospec.Streams[0].StreamID {
+						continue
+					}
+					// We found a consumer to fuse our proc to.
+					inputSyncs[pIdx][inIdx] = source
+					return true
 				}
-				inputSyncs[pIdx][inIdx] = source
-				f.processors[i] = nil
 			}
+			return false
+		}
+		if !fuse() {
+			f.processors = append(f.processors, p)
 		}
 	}
 	return nil
@@ -465,10 +477,8 @@ func (f *Flow) Start(ctx context.Context, doneFn func()) error {
 		s.start(f.Ctx, &f.waitGroup, f.ctxCancel)
 	}
 	for _, p := range f.processors {
-		if p != nil {
-			f.waitGroup.Add(1)
-			go p.Run(&f.waitGroup)
-		}
+		f.waitGroup.Add(1)
+		go p.Run(&f.waitGroup)
 	}
 	return nil
 }