Skip to content

Commit

Permalink
execinfra: remove MetadataTest* processors
Browse files Browse the repository at this point in the history
This commit removes `MetadataTestSender` and `MetadataTestReceiver`
processors since they no longer provide much value. I believe they were
introduced when we added a `ProducerMetadata` as a return parameter to
`Next` method in order to ensure that at least some artificial metadata
is propagated correctly throughout the whole flow.

The main goal of this commit is the removal of `fakedist-metadata` and
`5node-metadata` logic test configs in order to speed up the CI time.

The justification for removal of these processors without putting
in their place is that these processors are not that useful - the only
thing they can test is that *some* metadata is propagated through the
row-based flows. Note that they don't test whether all necessary
metadata is emitted (for example, whether `LeafTxnFinalState`). We've
using the vectorized engine as the default for a while now, and these
processors don't get planned with the vectorized flows. Thus, it seems
silly to have a logic test config like `fakedist-metadata` that is part
of the default configs.

Release note: None
  • Loading branch information
yuzefovich committed Jun 24, 2022
1 parent b75ebd1 commit ea559df
Show file tree
Hide file tree
Showing 22 changed files with 75 additions and 603 deletions.
12 changes: 0 additions & 12 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,6 @@ func supportedNatively(spec *execinfrapb.ProcessorSpec) error {
var (
errCoreUnsupportedNatively = errors.New("unsupported processor core")
errLocalPlanNodeWrap = errors.New("LocalPlanNode core needs to be wrapped")
errMetadataTestSenderWrap = errors.New("core.MetadataTestSender is not supported")
errMetadataTestReceiverWrap = errors.New("core.MetadataTestReceiver is not supported")
errChangeAggregatorWrap = errors.New("core.ChangeAggregator is not supported")
errChangeFrontierWrap = errors.New("core.ChangeFrontier is not supported")
errReadImportWrap = errors.New("core.ReadImport is not supported")
Expand Down Expand Up @@ -282,16 +280,6 @@ func canWrap(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.ProcessorSp
return errSamplerWrap
case spec.Core.SampleAggregator != nil:
return errSampleAggregatorWrap
case spec.Core.MetadataTestSender != nil:
// We do not wrap MetadataTestSender because of the way metadata is
// propagated through the vectorized flow - it is drained at the flow
// shutdown unlike these test processors expect.
return errMetadataTestSenderWrap
case spec.Core.MetadataTestReceiver != nil:
// We do not wrap MetadataTestReceiver because of the way metadata is
// propagated through the vectorized flow - it is drained at the flow
// shutdown unlike these test processors expect.
return errMetadataTestReceiverWrap
case spec.Core.ZigzagJoiner != nil:
case spec.Core.ProjectSet != nil:
case spec.Core.Windower != nil:
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/colflow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ go_test(
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/colcontainerutils",
"//pkg/testutils/distsqlutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
Expand Down
127 changes: 59 additions & 68 deletions pkg/sql/colflow/vectorized_meta_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,47 @@ import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/colexec"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colflow"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

type testBatchReceiver struct {
batches []coldata.Batch
metadata []*execinfrapb.ProducerMetadata
}

var _ execinfra.BatchReceiver = &testBatchReceiver{}

func (t *testBatchReceiver) ProducerDone() {}

func (t *testBatchReceiver) PushBatch(
batch coldata.Batch, meta *execinfrapb.ProducerMetadata,
) execinfra.ConsumerStatus {
status := execinfra.NeedMoreRows
if batch != nil {
t.batches = append(t.batches, batch)
} else if meta != nil {
t.metadata = append(t.metadata, meta)
} else {
status = execinfra.ConsumerClosed
}
return status
}

// TestVectorizedMetaPropagation tests whether metadata is correctly propagated
// alongside columnar operators. It sets up the following "flow":
// RowSource -> metadataTestSender -> columnarizer -> noopOperator ->
// -> materializer -> metadataTestReceiver. Metadata propagation is hooked up
// manually from the columnarizer into the materializer similar to how it is
// done in setupVectorizedFlow.
// in the vectorized flows. It creates a colexecop.Operator as well as a
// colexecop.MetadataSource which are hooked up into the BatchFlowCoordinator in
// the same way as in vectorizedFlowCreator.setupFlow.
func TestVectorizedMetaPropagation(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand All @@ -46,73 +67,43 @@ func TestVectorizedMetaPropagation(t *testing.T) {
Cfg: &execinfra.ServerConfig{Settings: cluster.MakeTestingClusterSettings()},
}

nRows := 10
nCols := 1
nBatches := 10
typs := types.OneIntCol

input := distsqlutils.NewRowBuffer(typs, randgen.MakeIntRows(nRows, nCols), distsqlutils.RowBufferArgs{})
mtsSpec := execinfrapb.ProcessorCoreUnion{
MetadataTestSender: &execinfrapb.MetadataTestSenderSpec{
ID: uuid.MakeV4().String(),
// Prepare the input operator.
batch := testAllocator.NewMemBatchWithFixedCapacity(typs, 1 /* capacity */)
batch.SetLength(1)
source := colexecop.NewRepeatableBatchSource(testAllocator, batch, typs)
source.ResetBatchesToReturn(nBatches)

// Setup the metadata source.
expectedMetadata := []execinfrapb.ProducerMetadata{{RowNum: &execinfrapb.RemoteProducerMetadata_RowNum{LastMsg: true}}}
drainMetaCbCalled := false
metadataSource := colexectestutils.CallbackMetadataSource{
DrainMetaCb: func() []execinfrapb.ProducerMetadata {
if drainMetaCbCalled {
return nil
}
drainMetaCbCalled = true
return expectedMetadata
},
}
mts, err := execinfra.NewMetadataTestSender(
&flowCtx,
0,
input,
&execinfrapb.PostProcessSpec{},
nil,
uuid.MakeV4().String(),
)
if err != nil {
t.Fatal(err)
}

col := colexec.NewBufferingColumnarizer(testAllocator, &flowCtx, 1, mts)
noop := colexecop.NewNoop(col)
mat := colexec.NewMaterializer(
output := &testBatchReceiver{}
f := colflow.NewBatchFlowCoordinator(
&flowCtx,
2, /* processorID */
0, /* processorID */
colexecargs.OpWithMetaInfo{
Root: noop,
MetadataSources: colexecop.MetadataSources{col},
Root: source,
MetadataSources: colexecop.MetadataSources{metadataSource},
},
typs,
output,
func() {}, /* cancelFlow */
)
f.Run(context.Background())

mtr, err := execinfra.NewMetadataTestReceiver(
&flowCtx,
3,
mat,
&execinfrapb.PostProcessSpec{},
nil,
[]string{mtsSpec.MetadataTestSender.ID},
)
if err != nil {
t.Fatal(err)
}
mtr.Start(ctx)

rowCount, metaCount := 0, 0
for {
row, meta := mtr.Next()
if row == nil && meta == nil {
break
}
if row != nil {
rowCount++
} else if meta.Err != nil {
t.Fatal(meta.Err)
} else {
metaCount++
}
}
if rowCount != nRows {
t.Fatalf("expected %d rows but %d received", nRows, rowCount)
}
if metaCount != nRows+1 {
// metadataTestSender sends a meta after each row plus an additional one to
// indicate the last meta.
t.Fatalf("expected %d meta but %d received", nRows+1, metaCount)
}
// Ensure that the expected number of batches and metadata objects have been
// pushed into the output.
require.Equal(t, nBatches, len(output.batches))
require.Equal(t, len(expectedMetadata), len(output.metadata))
}
62 changes: 6 additions & 56 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/colflow"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execagg"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand Down Expand Up @@ -93,10 +92,6 @@ type DistSQLPlanner struct {
distSQLSrv *distsql.ServerImpl
spanResolver physicalplan.SpanResolver

// metadataTestTolerance is the minimum level required to plan metadata test
// processors.
metadataTestTolerance execinfra.MetadataTestLevel

// runnerChan is used to send out requests (for running SetupFlow RPCs) to a
// pool of workers.
runnerChan chan runnerRequest
Expand Down Expand Up @@ -203,13 +198,12 @@ func NewDistSQLPlanner(
connHealth: nodeDialer.ConnHealthTryDial,
isAvailable: isAvailable,
},
distSender: distSender,
nodeDescs: nodeDescs,
rpcCtx: rpcCtx,
metadataTestTolerance: execinfra.NoExplain,
sqlInstanceProvider: sqlInstanceProvider,
codec: codec,
clock: clock,
distSender: distSender,
nodeDescs: nodeDescs,
rpcCtx: rpcCtx,
sqlInstanceProvider: sqlInstanceProvider,
codec: codec,
clock: clock,
}

dsp.parallelLocalScansSem = quotapool.NewIntPool("parallel local scans concurrency",
Expand All @@ -227,10 +221,6 @@ func NewDistSQLPlanner(
return dsp
}

func (dsp *DistSQLPlanner) shouldPlanTestMetadata() bool {
return dsp.distSQLSrv.TestingKnobs.MetadataTestLevel >= dsp.metadataTestTolerance
}

// GetSQLInstanceInfo gets a node descriptor by node ID.
func (dsp *DistSQLPlanner) GetSQLInstanceInfo(
sqlInstanceID base.SQLInstanceID,
Expand Down Expand Up @@ -3082,24 +3072,6 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(
planCtx.traceMetadata.associateNodeWithComponents(node, processors)
}

if dsp.shouldPlanTestMetadata() {
if err := plan.CheckLastStagePost(); err != nil {
log.Fatalf(ctx, "%v", err)
}
plan.AddNoGroupingStageWithCoreFunc(
func(_ int, _ *physicalplan.Processor) execinfrapb.ProcessorCoreUnion {
return execinfrapb.ProcessorCoreUnion{
MetadataTestSender: &execinfrapb.MetadataTestSenderSpec{
ID: uuid.MakeV4().String(),
},
}
},
execinfrapb.PostProcessSpec{},
plan.GetResultTypes(),
plan.MergeOrdering,
)
}

return plan, err
}

Expand Down Expand Up @@ -4147,15 +4119,6 @@ func (dsp *DistSQLPlanner) FinalizePlan(planCtx *PlanningCtx, plan *PhysicalPlan
func (dsp *DistSQLPlanner) finalizePlanWithRowCount(
planCtx *PlanningCtx, plan *PhysicalPlan, rowCount int64,
) {
// Find all MetadataTestSenders in the plan, so that the MetadataTestReceiver
// knows how many sender IDs it should expect.
var metadataSenders []string
for _, proc := range plan.Processors {
if proc.Spec.Core.MetadataTestSender != nil {
metadataSenders = append(metadataSenders, proc.Spec.Core.MetadataTestSender.ID)
}
}

maybeMoveSingleFlowToGateway(planCtx, plan, rowCount)

// Add a final "result" stage if necessary.
Expand All @@ -4173,19 +4136,6 @@ func (dsp *DistSQLPlanner) finalizePlanWithRowCount(
// PlanToStreamColMap is no longer necessary.
plan.PlanToStreamColMap = nil

if len(metadataSenders) > 0 {
plan.AddSingleGroupStage(
dsp.gatewaySQLInstanceID,
execinfrapb.ProcessorCoreUnion{
MetadataTestReceiver: &execinfrapb.MetadataTestReceiverSpec{
SenderIDs: metadataSenders,
},
},
execinfrapb.PostProcessSpec{},
plan.GetResultTypes(),
)
}

// Set up the endpoints for plan.Streams.
plan.PopulateEndpoints()

Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/execinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ go_library(
srcs = [
"base.go",
"flow_context.go",
"metadata_test_receiver.go",
"metadata_test_sender.go",
"metrics.go",
"outboxbase.go",
"processorsbase.go",
Expand Down
Loading

0 comments on commit ea559df

Please sign in to comment.