Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: mark planNodeToRowSource as streaming intelligently #63903

Merged
merged 1 commit into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,6 @@ type splitAndScatterProcessor struct {

var _ execinfra.Processor = &splitAndScatterProcessor{}

// OutputTypes implements the execinfra.Processor interface.
func (ssp *splitAndScatterProcessor) OutputTypes() []*types.T {
return splitAndScatterOutputTypes
}

func newSplitAndScatterProcessor(
flowCtx *execinfra.FlowCtx,
processorID int32,
Expand Down
12 changes: 10 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (

type changeAggregator struct {
execinfra.ProcessorBase
execinfra.StreamingProcessor

flowCtx *execinfra.FlowCtx
spec execinfrapb.ChangeAggregatorSpec
Expand Down Expand Up @@ -190,6 +189,11 @@ func newChangeAggregatorProcessor(
return ca, nil
}

// MustBeStreaming implements the execinfra.Processor interface.
func (ca *changeAggregator) MustBeStreaming() bool {
return true
}

// Start is part of the RowSource interface.
func (ca *changeAggregator) Start(ctx context.Context) {
ctx = ca.StartInternal(ctx, changeAggregatorProcName)
Expand Down Expand Up @@ -834,7 +838,6 @@ const (

type changeFrontier struct {
execinfra.ProcessorBase
execinfra.StreamingProcessor

flowCtx *execinfra.FlowCtx
spec execinfrapb.ChangeFrontierSpec
Expand Down Expand Up @@ -967,6 +970,11 @@ func newChangeFrontierProcessor(
return cf, nil
}

// MustBeStreaming implements the execinfra.Processor interface.
func (cf *changeFrontier) MustBeStreaming() bool {
return true
}

// Start is part of the RowSource interface.
func (cf *changeFrontier) Start(ctx context.Context) {
// StartInternal called at the beginning of the function because there are
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/importccl/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func (sp *csvWriter) OutputTypes() []*types.T {
return res
}

func (sp *csvWriter) MustBeStreaming() bool {
return false
}

func (sp *csvWriter) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "csvWriter")
defer span.Finish()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const streamIngestionFrontierProcName = `ingestfntr`

type streamIngestionFrontier struct {
execinfra.ProcessorBase
execinfra.StreamingProcessor

flowCtx *execinfra.FlowCtx
spec execinfrapb.StreamIngestionFrontierSpec
Expand Down Expand Up @@ -86,6 +85,11 @@ func newStreamIngestionFrontierProcessor(
return sf, nil
}

// MustBeStreaming implements the execinfra.Processor interface.
func (sf *streamIngestionFrontier) MustBeStreaming() bool {
return true
}

// Start is part of the RowSource interface.
func (sf *streamIngestionFrontier) Start(ctx context.Context) {
ctx = sf.StartInternal(ctx, streamIngestionFrontierProcName)
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,12 @@ func wrapRowSources(
return nil, releasables, err
}

proc, isProcessor := toWrap.(execinfra.Processor)
if !isProcessor {
return nil, nil, errors.AssertionFailedf("unexpectedly %T is not an execinfra.Processor", toWrap)
}
var c *colexec.Columnarizer
if _, mustBeStreaming := toWrap.(execinfra.StreamingProcessor); mustBeStreaming {
if proc.MustBeStreaming() {
c, err = colexec.NewStreamingColumnarizer(
ctx, colmem.NewAllocator(ctx, args.StreamingMemAccount, factory), flowCtx, args.Spec.ProcessorID, toWrap,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/execinfra/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ type RowSource interface {
// RowSourcedProcessor is the union of RowSource and Processor.
type RowSourcedProcessor interface {
RowSource
Run(context.Context)
Processor
}

// Run reads records from the source and outputs them to the receiver, properly
Expand Down
18 changes: 10 additions & 8 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type Processor interface {
// through an output router).
OutputTypes() []*types.T

// MustBeStreaming indicates whether this processor is of "streaming" nature
// and is expected to emit the output one row at a time (in both row-by-row
// and the vectorized engines).
MustBeStreaming() bool

// Run is the main loop of the processor.
Run(context.Context)
}
Expand Down Expand Up @@ -505,6 +510,11 @@ type ProcessorBase struct {
curInputToDrain int
}

// MustBeStreaming implements the Processor interface.
func (pb *ProcessorBase) MustBeStreaming() bool {
return false
}

// Reset resets this ProcessorBase, retaining allocated memory in slices.
func (pb *ProcessorBase) Reset() {
pb.Out.Reset()
Expand Down Expand Up @@ -975,18 +985,10 @@ func NewLimitedMonitor(
// these objects at creation time.
type LocalProcessor interface {
RowSourcedProcessor
StreamingProcessor
// InitWithOutput initializes this processor.
InitWithOutput(flowCtx *FlowCtx, post *execinfrapb.PostProcessSpec, output RowReceiver) error
// SetInput initializes this LocalProcessor with an input RowSource. Not all
// LocalProcessors need inputs, but this needs to be called if a
// LocalProcessor expects to get its data from another RowSource.
SetInput(ctx context.Context, input RowSource) error
}

// StreamingProcessor is a marker interface that indicates that the processor is
// of "streaming" nature and is expected to emit the output one tuple at a time
// (in both row-by-row and the vectorized engines).
type StreamingProcessor interface {
mustBeStreaming()
}
9 changes: 8 additions & 1 deletion pkg/sql/plan_node_to_row_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ type metadataForwarder interface {

type planNodeToRowSource struct {
execinfra.ProcessorBase
execinfra.StreamingProcessor

input execinfra.RowSource

Expand Down Expand Up @@ -71,6 +70,14 @@ func makePlanNodeToRowSource(

var _ execinfra.LocalProcessor = &planNodeToRowSource{}

// MustBeStreaming implements the execinfra.Processor interface.
func (p *planNodeToRowSource) MustBeStreaming() bool {
// hookFnNode is special because it might be blocked forever if we decide to
// buffer its output.
_, isHookFnNode := p.node.(*hookFnNode)
return isHookFnNode
}

// InitWithOutput implements the LocalProcessor interface.
func (p *planNodeToRowSource) InitWithOutput(
flowCtx *execinfra.FlowCtx, post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver,
Expand Down
9 changes: 7 additions & 2 deletions pkg/sql/rowexec/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,18 @@ type backfiller struct {
processorID int32
}

// OutputTypes is part of the processor interface.
// OutputTypes is part of the execinfra.Processor interface.
func (*backfiller) OutputTypes() []*types.T {
// No output types.
return nil
}

// Run is part of the Processor interface.
// MustBeStreaming is part of the execinfra.Processor interface.
func (*backfiller) MustBeStreaming() bool {
return false
}

// Run is part of the execinfra.Processor interface.
func (b *backfiller) Run(ctx context.Context) {
opName := fmt.Sprintf("%sBackfiller", b.name)
ctx = logtags.AddTag(ctx, opName, int(b.spec.Table.ID))
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/rowexec/indexbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ func (ib *indexBackfiller) OutputTypes() []*types.T {
return nil
}

func (ib *indexBackfiller) MustBeStreaming() bool {
return false
}

// indexEntryBatch represents a "batch" of index entries which are constructed
// and sent for ingestion. Breaking up the index entries into these batches
// serves for better progress reporting as explained in the ingestIndexEntries
Expand Down