diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index b37a1ff55519..9dd0794f93ab 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -284,7 +284,7 @@ func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID( // Generate namespace entry. codec := keys.MakeSQLCodec(config.tenantID) - key := catalogkeys.MakePublicObjectNameKey(codec, 50, testTable.Name) + key := catalogkeys.MakePublicObjectNameKey(codec, IngestionDatabaseID, testTable.Name) k := rekey(config.tenantID, key) var value roachpb.Value value.SetInt(int64(testTable.GetID())) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index b1382d3590d0..80c76c76966c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/bulk" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -97,6 +98,9 @@ type streamIngestionProcessor struct { // consuming multiple partitions from a stream. streamPartitionClients []streamclient.Client + // cutoverProvider indicates when the cutover time has been reached. + cutoverProvider cutoverProvider + // Checkpoint events may need to be buffered if they arrive within the same // minimumFlushInterval. bufferedCheckpoints map[string]hlc.Timestamp @@ -181,10 +185,14 @@ func newStreamIngestionDataProcessor( curBatch: make([]storage.MVCCKeyValue, 0), bufferedCheckpoints: make(map[string]hlc.Timestamp), maxFlushRateTimer: timeutil.NewTimer(), - cutoverCh: make(chan struct{}), - closePoller: make(chan struct{}), - rekeyer: rekeyer, - rewriteToDiffKey: spec.TenantRekey.NewID != spec.TenantRekey.OldID, + cutoverProvider: &cutoverFromJobProgress{ + jobID: jobspb.JobID(spec.JobID), + registry: flowCtx.Cfg.JobRegistry, + }, + cutoverCh: make(chan struct{}), + closePoller: make(chan struct{}), + rekeyer: rekeyer, + rewriteToDiffKey: spec.TenantRekey.NewID != spec.TenantRekey.OldID, } if err := sip.Init(sip, post, streamIngestionResultTypes, flowCtx, processorID, output, nil, /* memMonitor */ execinfra.ProcStateOpts{ @@ -352,9 +360,7 @@ func (sip *streamIngestionProcessor) checkForCutoverSignal( ctx context.Context, stopPoller chan struct{}, ) error { sv := &sip.flowCtx.Cfg.Settings.SV - registry := sip.flowCtx.Cfg.JobRegistry tick := time.NewTicker(cutoverSignalPollInterval.Get(sv)) - jobID := sip.spec.JobID defer tick.Stop() for { select { @@ -363,20 +369,11 @@ func (sip *streamIngestionProcessor) checkForCutoverSignal( case <-ctx.Done(): return ctx.Err() case <-tick.C: - j, err := registry.LoadJob(ctx, jobspb.JobID(jobID)) + cutoverReached, err := sip.cutoverProvider.cutoverReached(ctx) if err != nil { return err } - progress := j.Progress() - var sp *jobspb.Progress_StreamIngest - var ok bool - if sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok { - return errors.Newf("unknown progress type %T in stream ingestion job %d", - j.Progress().Progress, jobID) - } - // Job has been signaled to complete. - if resolvedTimestamp := progress.GetHighWater(); !sp.StreamIngest.CutoverTime.IsEmpty() && - resolvedTimestamp != nil && sp.StreamIngest.CutoverTime.Less(*resolvedTimestamp) { + if cutoverReached { sip.cutoverCh <- struct{}{} return nil } @@ -664,6 +661,40 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) { return &flushedCheckpoints, sip.batcher.Reset(ctx) } +// cutoverProvider allows us to override how we decide when the job has reached +// the cutover places in tests. +type cutoverProvider interface { + cutoverReached(context.Context) (bool, error) +} + +// custoverFromJobProgress is a cutoverProvider that decides whether the cutover +// time has been reached based on the progress stored on the job record. +type cutoverFromJobProgress struct { + registry *jobs.Registry + jobID jobspb.JobID +} + +func (c *cutoverFromJobProgress) cutoverReached(ctx context.Context) (bool, error) { + j, err := c.registry.LoadJob(ctx, c.jobID) + if err != nil { + return false, err + } + progress := j.Progress() + var sp *jobspb.Progress_StreamIngest + var ok bool + if sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok { + return false, errors.Newf("unknown progress type %T in stream ingestion job %d", + j.Progress().Progress, c.jobID) + } + // Job has been signaled to complete. + if resolvedTimestamp := progress.GetHighWater(); !sp.StreamIngest.CutoverTime.IsEmpty() && + resolvedTimestamp != nil && sp.StreamIngest.CutoverTime.Less(*resolvedTimestamp) { + return true, nil + } + + return false, nil +} + func init() { rowexec.NewStreamIngestionDataProcessor = newStreamIngestionDataProcessor } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index 644935edea86..cd23cdff0c14 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -38,8 +38,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" - "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -202,8 +200,8 @@ func TestStreamIngestionProcessor(t *testing.T) { {ID: "1", SubscriptionToken: p1}, {ID: "2", SubscriptionToken: p2}, } - out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, "randomgen://test/", - partitions, startTime, nil /* interceptEvents */, tenantRekey, mockClient) + out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, + partitions, startTime, nil /* interceptEvents */, tenantRekey, mockClient, nil /* cutoverProvider */) require.NoError(t, err) actualRows := make(map[string]struct{}) @@ -238,8 +236,8 @@ func TestStreamIngestionProcessor(t *testing.T) { {SubscriptionToken: streamclient.SubscriptionToken("1")}, {SubscriptionToken: streamclient.SubscriptionToken("2")}, } - out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, "randomgen://test", - partitions, startTime, nil /* interceptEvents */, tenantRekey, &errorStreamClient{}) + out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, + partitions, startTime, nil /* interceptEvents */, tenantRekey, &errorStreamClient{}, nil /* cutoverProvider */) require.NoError(t, err) // Expect no rows, and just the error. @@ -262,8 +260,8 @@ func TestStreamIngestionProcessor(t *testing.T) { streamingTestingKnob := &sql.StreamingTestingKnobs{RunAfterReceivingEvent: func(ctx context.Context) { processEventCh <- struct{}{} }} - sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, "randomgen://test/", - partitions, startTime, nil /* interceptEvents */, tenantRekey, mockClient, streamingTestingKnob) + sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, + partitions, startTime, nil /* interceptEvents */, tenantRekey, mockClient, nil /* cutoverProvider */, streamingTestingKnob) defer func() { require.NoError(t, sip.forceClientForTests.Close()) }() @@ -310,7 +308,7 @@ func getPartitionSpanToTableID( // Aggregate the table IDs which should have been ingested. for _, pa := range partitions { - pKey := roachpb.Key(pa.SubscriptionToken) + pKey := roachpb.Key(pa.ID) pSpan := roachpb.Span{Key: pKey, EndKey: pKey.Next()} paURL, err := url.Parse(string(pa.SubscriptionToken)) require.NoError(t, err) @@ -401,11 +399,14 @@ func makeTestStreamURI( "&TENANT_ID=" + strconv.Itoa(tenantID) } +type noCutover struct{} + +func (n noCutover) cutoverReached(context.Context) (bool, error) { return false, nil } + // TestRandomClientGeneration tests the ingestion processor against a random // stream workload. func TestRandomClientGeneration(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 61287, "flaky test") defer log.Scope(t).Close(t) ctx := context.Background() @@ -414,8 +415,6 @@ func TestRandomClientGeneration(t *testing.T) { defer tc.Stopper().Stop(ctx) registry := tc.Server(0).JobRegistry().(*jobs.Registry) kvDB := tc.Server(0).DB() - conn := tc.Conns[0] - sqlDB := sqlutils.MakeSQLRunner(conn) // TODO: Consider testing variations on these parameters. const tenantID = 20 @@ -447,8 +446,10 @@ func TestRandomClientGeneration(t *testing.T) { require.NoError(t, err) streamValidator := newStreamClientValidator(rekeyer) validator := registerValidatorWithClient(streamValidator) - out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, streamAddr, topo, - startTime, []streamclient.InterceptFn{cancelAfterCheckpoints, validator}, tenantRekey, nil /* mockClient */) + + out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, + topo, startTime, []streamclient.InterceptFn{cancelAfterCheckpoints, validator}, tenantRekey, + streamClient, noCutover{}) require.NoError(t, err) partitionSpanToTableID := getPartitionSpanToTableID(t, topo) @@ -466,6 +467,7 @@ func TestRandomClientGeneration(t *testing.T) { if row == nil { break } + datum := row[0].Datum protoBytes, ok := datum.(*tree.DBytes) require.True(t, ok) @@ -475,8 +477,8 @@ func TestRandomClientGeneration(t *testing.T) { for _, resolvedSpan := range resolvedSpans.ResolvedSpans { if _, ok := partitionSpanToTableID[resolvedSpan.Span.String()]; !ok { - t.Fatalf("expected resolved span %v to be either in one of the supplied partition"+ - " addresses %v", resolvedSpan.Span, topo) + t.Fatalf("expected resolved span %v to be in one of the supplied partition"+ + " addresses %v", resolvedSpan.Span, partitionSpanToTableID) } // All resolved timestamp events should be greater than the start time. @@ -497,11 +499,6 @@ func TestRandomClientGeneration(t *testing.T) { } for pSpan, id := range partitionSpanToTableID { - numRows, err := strconv.Atoi(sqlDB.QueryStr(t, fmt.Sprintf( - `SELECT count(*) FROM defaultdb.%s%d`, streamclient.IngestionTablePrefix, id))[0][0]) - require.NoError(t, err) - require.Greater(t, numRows, 0, "at least 1 row ingested expected") - // Scan the store for KVs ingested by this partition, and compare the MVCC // KVs against the KVEvents streamed up to the max ingested timestamp for // the partition. @@ -515,15 +512,15 @@ func runStreamIngestionProcessor( t *testing.T, registry *jobs.Registry, kvDB *kv.DB, - streamAddr string, partitions streamclient.Topology, startTime hlc.Timestamp, interceptEvents []streamclient.InterceptFn, tenantRekey execinfrapb.TenantRekey, mockClient streamclient.Client, + cutoverProvider cutoverProvider, ) (*distsqlutils.RowBuffer, error) { - sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, streamAddr, - partitions, startTime, interceptEvents, tenantRekey, mockClient, nil /* streamingTestingKnobs */) + sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, + partitions, startTime, interceptEvents, tenantRekey, mockClient, cutoverProvider, nil /* streamingTestingKnobs */) require.NoError(t, err) sip.Run(ctx) @@ -543,16 +540,19 @@ func getStreamIngestionProcessor( t *testing.T, registry *jobs.Registry, kvDB *kv.DB, - streamAddr string, partitions streamclient.Topology, startTime hlc.Timestamp, interceptEvents []streamclient.InterceptFn, tenantRekey execinfrapb.TenantRekey, mockClient streamclient.Client, + cutoverProvider cutoverProvider, streamingTestingKnobs *sql.StreamingTestingKnobs, ) (*streamIngestionProcessor, *distsqlutils.RowBuffer, error) { st := cluster.MakeTestingClusterSettings() evalCtx := eval.MakeTestingEvalContext(st) + if mockClient == nil { + return nil, nil, errors.AssertionFailedf("non-nil streamclient required") + } testDiskMonitor := execinfra.NewTestDiskMonitor(ctx, st) defer testDiskMonitor.Stop(ctx) @@ -573,7 +573,7 @@ func getStreamIngestionProcessor( post := execinfrapb.PostProcessSpec{} var spec execinfrapb.StreamIngestionDataSpec - spec.StreamAddress = streamAddr + spec.StreamAddress = "http://unused" spec.TenantRekey = tenantRekey spec.PartitionIds = make([]string, len(partitions)) spec.PartitionAddresses = make([]string, len(partitions)) @@ -592,8 +592,9 @@ func getStreamIngestionProcessor( t.Fatal("expected the processor that's created to be a split and scatter processor") } - if mockClient != nil { - sip.forceClientForTests = mockClient + sip.forceClientForTests = mockClient + if cutoverProvider != nil { + sip.cutoverProvider = cutoverProvider } if interceptable, ok := sip.forceClientForTests.(streamclient.InterceptableStreamClient); ok {