From 71d26e70afa82203609ff68600c5ceb02bba996a Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Wed, 29 Jun 2022 12:06:32 +0100 Subject: [PATCH] streamingccl: re-enabled TestRandomClientGeneration TestRandomClientGeneration was skipped in #61292 as a flake. However, in the time since then, other changes in this code broke this test more completely. Re-enabling the test requirea few unrelated changes: - The stream ingestion processor required a fully formed job to be able to poll the cutover time. Now, test code can set a cutoverProvider that doesn't depend on a full job record. - The request intercepting depended on an explicit client being set. This test was rather passing the processor a randgen URI. Now we pass the client explicitly and also update the test code to make it clear that the stream URI isn't actually used for anything. - The code was attempting to validate the number of rows using SQL. I haven't dug into how this was working in the past. But as we are connecting to the host tenant and the keys are being ingested to a guest tenant, we would need a connection to the guest tenant to validate the table data. I've simply removed this assertion since I don't think it was testing very much compared to the KV level assertions also used in the test. - The test code assumed that the partitions were keyed based on the subscription token rather than the subscription ID. It isn't clear what the original source of the flakiness was. However, the test has run a few hundred times under stress without issue. Alternatively, we could just delete this test. Fixes #61287 Release note: None --- .../streamclient/random_stream_client.go | 2 +- .../stream_ingestion_processor.go | 65 ++++++++++++++----- .../stream_ingestion_processor_test.go | 57 ++++++++-------- 3 files changed, 78 insertions(+), 46 deletions(-) diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index b37a1ff55519..9dd0794f93ab 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -284,7 +284,7 @@ func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID( // Generate namespace entry. codec := keys.MakeSQLCodec(config.tenantID) - key := catalogkeys.MakePublicObjectNameKey(codec, 50, testTable.Name) + key := catalogkeys.MakePublicObjectNameKey(codec, IngestionDatabaseID, testTable.Name) k := rekey(config.tenantID, key) var value roachpb.Value value.SetInt(int64(testTable.GetID())) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index b1382d3590d0..80c76c76966c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/bulk" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -97,6 +98,9 @@ type streamIngestionProcessor struct { // consuming multiple partitions from a stream. streamPartitionClients []streamclient.Client + // cutoverProvider indicates when the cutover time has been reached. + cutoverProvider cutoverProvider + // Checkpoint events may need to be buffered if they arrive within the same // minimumFlushInterval. bufferedCheckpoints map[string]hlc.Timestamp @@ -181,10 +185,14 @@ func newStreamIngestionDataProcessor( curBatch: make([]storage.MVCCKeyValue, 0), bufferedCheckpoints: make(map[string]hlc.Timestamp), maxFlushRateTimer: timeutil.NewTimer(), - cutoverCh: make(chan struct{}), - closePoller: make(chan struct{}), - rekeyer: rekeyer, - rewriteToDiffKey: spec.TenantRekey.NewID != spec.TenantRekey.OldID, + cutoverProvider: &cutoverFromJobProgress{ + jobID: jobspb.JobID(spec.JobID), + registry: flowCtx.Cfg.JobRegistry, + }, + cutoverCh: make(chan struct{}), + closePoller: make(chan struct{}), + rekeyer: rekeyer, + rewriteToDiffKey: spec.TenantRekey.NewID != spec.TenantRekey.OldID, } if err := sip.Init(sip, post, streamIngestionResultTypes, flowCtx, processorID, output, nil, /* memMonitor */ execinfra.ProcStateOpts{ @@ -352,9 +360,7 @@ func (sip *streamIngestionProcessor) checkForCutoverSignal( ctx context.Context, stopPoller chan struct{}, ) error { sv := &sip.flowCtx.Cfg.Settings.SV - registry := sip.flowCtx.Cfg.JobRegistry tick := time.NewTicker(cutoverSignalPollInterval.Get(sv)) - jobID := sip.spec.JobID defer tick.Stop() for { select { @@ -363,20 +369,11 @@ func (sip *streamIngestionProcessor) checkForCutoverSignal( case <-ctx.Done(): return ctx.Err() case <-tick.C: - j, err := registry.LoadJob(ctx, jobspb.JobID(jobID)) + cutoverReached, err := sip.cutoverProvider.cutoverReached(ctx) if err != nil { return err } - progress := j.Progress() - var sp *jobspb.Progress_StreamIngest - var ok bool - if sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok { - return errors.Newf("unknown progress type %T in stream ingestion job %d", - j.Progress().Progress, jobID) - } - // Job has been signaled to complete. - if resolvedTimestamp := progress.GetHighWater(); !sp.StreamIngest.CutoverTime.IsEmpty() && - resolvedTimestamp != nil && sp.StreamIngest.CutoverTime.Less(*resolvedTimestamp) { + if cutoverReached { sip.cutoverCh <- struct{}{} return nil } @@ -664,6 +661,40 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) { return &flushedCheckpoints, sip.batcher.Reset(ctx) } +// cutoverProvider allows us to override how we decide when the job has reached +// the cutover places in tests. +type cutoverProvider interface { + cutoverReached(context.Context) (bool, error) +} + +// custoverFromJobProgress is a cutoverProvider that decides whether the cutover +// time has been reached based on the progress stored on the job record. +type cutoverFromJobProgress struct { + registry *jobs.Registry + jobID jobspb.JobID +} + +func (c *cutoverFromJobProgress) cutoverReached(ctx context.Context) (bool, error) { + j, err := c.registry.LoadJob(ctx, c.jobID) + if err != nil { + return false, err + } + progress := j.Progress() + var sp *jobspb.Progress_StreamIngest + var ok bool + if sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok { + return false, errors.Newf("unknown progress type %T in stream ingestion job %d", + j.Progress().Progress, c.jobID) + } + // Job has been signaled to complete. + if resolvedTimestamp := progress.GetHighWater(); !sp.StreamIngest.CutoverTime.IsEmpty() && + resolvedTimestamp != nil && sp.StreamIngest.CutoverTime.Less(*resolvedTimestamp) { + return true, nil + } + + return false, nil +} + func init() { rowexec.NewStreamIngestionDataProcessor = newStreamIngestionDataProcessor } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index 644935edea86..cd23cdff0c14 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -38,8 +38,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" - "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -202,8 +200,8 @@ func TestStreamIngestionProcessor(t *testing.T) { {ID: "1", SubscriptionToken: p1}, {ID: "2", SubscriptionToken: p2}, } - out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, "randomgen://test/", - partitions, startTime, nil /* interceptEvents */, tenantRekey, mockClient) + out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, + partitions, startTime, nil /* interceptEvents */, tenantRekey, mockClient, nil /* cutoverProvider */) require.NoError(t, err) actualRows := make(map[string]struct{}) @@ -238,8 +236,8 @@ func TestStreamIngestionProcessor(t *testing.T) { {SubscriptionToken: streamclient.SubscriptionToken("1")}, {SubscriptionToken: streamclient.SubscriptionToken("2")}, } - out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, "randomgen://test", - partitions, startTime, nil /* interceptEvents */, tenantRekey, &errorStreamClient{}) + out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, + partitions, startTime, nil /* interceptEvents */, tenantRekey, &errorStreamClient{}, nil /* cutoverProvider */) require.NoError(t, err) // Expect no rows, and just the error. @@ -262,8 +260,8 @@ func TestStreamIngestionProcessor(t *testing.T) { streamingTestingKnob := &sql.StreamingTestingKnobs{RunAfterReceivingEvent: func(ctx context.Context) { processEventCh <- struct{}{} }} - sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, "randomgen://test/", - partitions, startTime, nil /* interceptEvents */, tenantRekey, mockClient, streamingTestingKnob) + sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, + partitions, startTime, nil /* interceptEvents */, tenantRekey, mockClient, nil /* cutoverProvider */, streamingTestingKnob) defer func() { require.NoError(t, sip.forceClientForTests.Close()) }() @@ -310,7 +308,7 @@ func getPartitionSpanToTableID( // Aggregate the table IDs which should have been ingested. for _, pa := range partitions { - pKey := roachpb.Key(pa.SubscriptionToken) + pKey := roachpb.Key(pa.ID) pSpan := roachpb.Span{Key: pKey, EndKey: pKey.Next()} paURL, err := url.Parse(string(pa.SubscriptionToken)) require.NoError(t, err) @@ -401,11 +399,14 @@ func makeTestStreamURI( "&TENANT_ID=" + strconv.Itoa(tenantID) } +type noCutover struct{} + +func (n noCutover) cutoverReached(context.Context) (bool, error) { return false, nil } + // TestRandomClientGeneration tests the ingestion processor against a random // stream workload. func TestRandomClientGeneration(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 61287, "flaky test") defer log.Scope(t).Close(t) ctx := context.Background() @@ -414,8 +415,6 @@ func TestRandomClientGeneration(t *testing.T) { defer tc.Stopper().Stop(ctx) registry := tc.Server(0).JobRegistry().(*jobs.Registry) kvDB := tc.Server(0).DB() - conn := tc.Conns[0] - sqlDB := sqlutils.MakeSQLRunner(conn) // TODO: Consider testing variations on these parameters. const tenantID = 20 @@ -447,8 +446,10 @@ func TestRandomClientGeneration(t *testing.T) { require.NoError(t, err) streamValidator := newStreamClientValidator(rekeyer) validator := registerValidatorWithClient(streamValidator) - out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, streamAddr, topo, - startTime, []streamclient.InterceptFn{cancelAfterCheckpoints, validator}, tenantRekey, nil /* mockClient */) + + out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, + topo, startTime, []streamclient.InterceptFn{cancelAfterCheckpoints, validator}, tenantRekey, + streamClient, noCutover{}) require.NoError(t, err) partitionSpanToTableID := getPartitionSpanToTableID(t, topo) @@ -466,6 +467,7 @@ func TestRandomClientGeneration(t *testing.T) { if row == nil { break } + datum := row[0].Datum protoBytes, ok := datum.(*tree.DBytes) require.True(t, ok) @@ -475,8 +477,8 @@ func TestRandomClientGeneration(t *testing.T) { for _, resolvedSpan := range resolvedSpans.ResolvedSpans { if _, ok := partitionSpanToTableID[resolvedSpan.Span.String()]; !ok { - t.Fatalf("expected resolved span %v to be either in one of the supplied partition"+ - " addresses %v", resolvedSpan.Span, topo) + t.Fatalf("expected resolved span %v to be in one of the supplied partition"+ + " addresses %v", resolvedSpan.Span, partitionSpanToTableID) } // All resolved timestamp events should be greater than the start time. @@ -497,11 +499,6 @@ func TestRandomClientGeneration(t *testing.T) { } for pSpan, id := range partitionSpanToTableID { - numRows, err := strconv.Atoi(sqlDB.QueryStr(t, fmt.Sprintf( - `SELECT count(*) FROM defaultdb.%s%d`, streamclient.IngestionTablePrefix, id))[0][0]) - require.NoError(t, err) - require.Greater(t, numRows, 0, "at least 1 row ingested expected") - // Scan the store for KVs ingested by this partition, and compare the MVCC // KVs against the KVEvents streamed up to the max ingested timestamp for // the partition. @@ -515,15 +512,15 @@ func runStreamIngestionProcessor( t *testing.T, registry *jobs.Registry, kvDB *kv.DB, - streamAddr string, partitions streamclient.Topology, startTime hlc.Timestamp, interceptEvents []streamclient.InterceptFn, tenantRekey execinfrapb.TenantRekey, mockClient streamclient.Client, + cutoverProvider cutoverProvider, ) (*distsqlutils.RowBuffer, error) { - sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, streamAddr, - partitions, startTime, interceptEvents, tenantRekey, mockClient, nil /* streamingTestingKnobs */) + sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, + partitions, startTime, interceptEvents, tenantRekey, mockClient, cutoverProvider, nil /* streamingTestingKnobs */) require.NoError(t, err) sip.Run(ctx) @@ -543,16 +540,19 @@ func getStreamIngestionProcessor( t *testing.T, registry *jobs.Registry, kvDB *kv.DB, - streamAddr string, partitions streamclient.Topology, startTime hlc.Timestamp, interceptEvents []streamclient.InterceptFn, tenantRekey execinfrapb.TenantRekey, mockClient streamclient.Client, + cutoverProvider cutoverProvider, streamingTestingKnobs *sql.StreamingTestingKnobs, ) (*streamIngestionProcessor, *distsqlutils.RowBuffer, error) { st := cluster.MakeTestingClusterSettings() evalCtx := eval.MakeTestingEvalContext(st) + if mockClient == nil { + return nil, nil, errors.AssertionFailedf("non-nil streamclient required") + } testDiskMonitor := execinfra.NewTestDiskMonitor(ctx, st) defer testDiskMonitor.Stop(ctx) @@ -573,7 +573,7 @@ func getStreamIngestionProcessor( post := execinfrapb.PostProcessSpec{} var spec execinfrapb.StreamIngestionDataSpec - spec.StreamAddress = streamAddr + spec.StreamAddress = "http://unused" spec.TenantRekey = tenantRekey spec.PartitionIds = make([]string, len(partitions)) spec.PartitionAddresses = make([]string, len(partitions)) @@ -592,8 +592,9 @@ func getStreamIngestionProcessor( t.Fatal("expected the processor that's created to be a split and scatter processor") } - if mockClient != nil { - sip.forceClientForTests = mockClient + sip.forceClientForTests = mockClient + if cutoverProvider != nil { + sip.cutoverProvider = cutoverProvider } if interceptable, ok := sip.forceClientForTests.(streamclient.InterceptableStreamClient); ok {