diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index d930275da331..6841b2ea7e01 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -207,7 +207,7 @@ restore_stmt ::= | 'RESTORE' targets 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options - | 'RESTORE' targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list opt_as_of_clause opt_as_tenant_clause + | 'RESTORE' targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list opt_as_tenant_clause resume_stmt ::= resume_jobs_stmt diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 942afcc0525e..35df0c098284 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -49,6 +49,7 @@ go_library( "//pkg/util/log", "//pkg/util/metric", "//pkg/util/protoutil", + "//pkg/util/retry", "//pkg/util/span", "//pkg/util/syncutil", "//pkg/util/timeutil", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index b40d956b7a62..76379036b4f3 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -10,6 +10,8 @@ package streamingest import ( "context" + "fmt" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" @@ -23,6 +25,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -71,11 +75,13 @@ func getStreamIngestionStats( } lagInfo.SlowestSourcePartitionTimestamp = hlc.MaxTimestamp lagInfo.FastestSourcePartitionTimestamp = hlc.MinTimestamp + // TODO(casper): track spans that the slowest partition is associated with once + // we switch from tracking partition ID as frontier to tracking actual spans as + // frontier. for partition, pp := range progress.GetStreamIngest().PartitionProgress { if pp.IngestedTimestamp.Less(lagInfo.SlowestSourcePartitionTimestamp) { lagInfo.SlowestSourcePartitionTimestamp = pp.IngestedTimestamp lagInfo.SlowestSourcePartition = partition - } if lagInfo.FastestSourcePartitionTimestamp.Less(pp.IngestedTimestamp) { @@ -106,39 +112,66 @@ type streamIngestionResumer struct { job *jobs.Job } -func ingest( - ctx context.Context, - execCtx sql.JobExecContext, - streamAddress streamingccl.StreamAddress, - oldTenantID roachpb.TenantID, - newTenantID roachpb.TenantID, - streamID streaming.StreamID, - startTime hlc.Timestamp, - progress jobspb.Progress, - ingestionJobID jobspb.JobID, -) error { +func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.Job) error { + details := ingestionJob.Details().(jobspb.StreamIngestionDetails) + progress := ingestionJob.Progress() + streamAddress := streamingccl.StreamAddress(details.StreamAddress) + + startTime := progress.GetStreamIngest().StartTime + // Start from the last checkpoint if it exists. + if h := progress.GetHighWater(); h != nil && !h.IsEmpty() { + startTime = *h + } + + // If there is an existing stream ID, reconnect to it. + streamID := streaming.StreamID(details.StreamID) // Initialize a stream client and resolve topology. client, err := streamclient.NewStreamClient(streamAddress) if err != nil { return err } ingestWithClient := func() error { - // TODO(dt): if there is an existing stream ID, reconnect to it. + ro := retry.Options{ + InitialBackoff: 1 * time.Second, + Multiplier: 2, + MaxBackoff: 10 * time.Second, + MaxRetries: 5, + } + // Make sure the producer job is active before start the stream replication. + var status streampb.StreamReplicationStatus + for r := retry.Start(ro); r.Next(); { + status, err = client.Heartbeat(ctx, streamID, startTime) + if err != nil { + return errors.Wrapf(err, "failed to resume ingestion job %d due to producer job error", + ingestionJob.ID()) + } + if status.StreamStatus != streampb.StreamReplicationStatus_UNKNOWN_STREAM_STATUS_RETRY { + break + } + log.Warningf(ctx, + "producer job %d has status %s, retrying", streamID, status.StreamStatus) + } + if status.StreamStatus != streampb.StreamReplicationStatus_STREAM_ACTIVE { + return errors.Errorf("failed to resume ingestion job %d as the producer job is not active "+ + "and in status %s", ingestionJob.ID(), status.StreamStatus) + } + + log.Infof(ctx, "producer job %d is active, creating a stream replication plan", streamID) topology, err := client.Plan(ctx, streamID) if err != nil { return err } - // TODO(adityamaru): If the job is being resumed it is possible that it has - // check-pointed a resolved ts up to which all of its processors had ingested - // KVs. We can skip to ingesting after this resolved ts. Plumb the - // initialHighwatermark to the ingestion processor spec based on what we read - // from the job progress. - initialHighWater := startTime - if h := progress.GetHighWater(); h != nil && !h.IsEmpty() { - initialHighWater = *h + // TODO(casper): update running status + if progress.GetStreamIngest().StartTime.Less(startTime) { + progress.GetStreamIngest().StartTime = startTime + if err := ingestionJob.SetProgress(ctx, nil, *progress.GetStreamIngest()); err != nil { + return errors.Wrap(err, "failed to update job progress") + } } + log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s", + ingestionJob.ID(), progress.GetStreamIngest().StartTime) evalCtx := execCtx.ExtendedEvalContext() dsp := execCtx.DistSQLPlanner() @@ -149,14 +182,17 @@ func ingest( // Construct stream ingestion processor specs. streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs( - streamAddress, topology, sqlInstanceIDs, initialHighWater, ingestionJobID, streamID, oldTenantID, newTenantID) + streamAddress, topology, sqlInstanceIDs, progress.GetStreamIngest().StartTime, + ingestionJob.ID(), streamID, details.TenantID, details.NewTenantID) if err != nil { return err } // Plan and run the DistSQL flow. - if err = distStreamIngest(ctx, execCtx, sqlInstanceIDs, ingestionJobID, planCtx, dsp, streamIngestionSpecs, - streamIngestionFrontierSpec); err != nil { + log.Infof(ctx, "starting to plan and run DistSQL flow for stream ingestion job %d", + ingestionJob.ID()) + if err = distStreamIngest(ctx, execCtx, sqlInstanceIDs, ingestionJob.ID(), planCtx, dsp, + streamIngestionSpecs, streamIngestionFrontierSpec); err != nil { return err } @@ -164,9 +200,14 @@ func ingest( // processors shut down gracefully, i.e stopped ingesting any additional // events from the replication stream. At this point it is safe to revert to // the cutoff time to leave the cluster in a consistent state. - if err = revertToCutoverTimestamp(ctx, execCtx, ingestionJobID); err != nil { + log.Infof(ctx, + "starting to revert to the specified cutover timestamp for stream ingestion job %d", + ingestionJob.ID()) + if err = revertToCutoverTimestamp(ctx, execCtx, ingestionJob.ID()); err != nil { return err } + + log.Infof(ctx, "starting to complete the producer job %d", streamID) // Completes the producer job in the source cluster. return client.Complete(ctx, streamID) } @@ -175,15 +216,25 @@ func ingest( // Resume is part of the jobs.Resumer interface. func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx interface{}) error { - details := s.job.Details().(jobspb.StreamIngestionDetails) - p := execCtx.(sql.JobExecContext) - + jobExecCtx := execCtx.(sql.JobExecContext) // Start ingesting KVs from the replication stream. - streamAddress := streamingccl.StreamAddress(details.StreamAddress) // TODO(casper): retry stream ingestion with exponential // backoff and finally pause on error. - return ingest(resumeCtx, p, streamAddress, details.TenantID, details.NewTenantID, - streaming.StreamID(details.StreamID), details.StartTime, s.job.Progress(), s.job.ID()) + err := ingest(resumeCtx, jobExecCtx, s.job) + if err != nil { + const errorFmt = "ingestion job failed (%v) but is being paused" + errorMessage := fmt.Sprintf(errorFmt, err) + log.Warningf(resumeCtx, errorFmt, err) + // The ingestion job is paused but the producer job will keep + // running until it times out. Users can still resume the job before + // the producer job times out. + return s.job.PauseRequested(resumeCtx, jobExecCtx.Txn(), func(ctx context.Context, + planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress) error { + progress.RunningStatus = errorMessage + return nil + }, errorMessage) + } + return nil } // revertToCutoverTimestamp reads the job progress for the cutover time and diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 982b2b0efa54..f65b1fda68b9 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -155,21 +155,13 @@ SET enable_experimental_stream_replication = true; defer cleanupSink() var ingestionJobID, streamProducerJobID int64 - var startTime string - sourceSQL.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&startTime) - destSQL.ExpectErr(t, "pq: either old tenant ID 10 or the new tenant ID 1 cannot be system tenant", - `RESTORE TENANT 10 FROM REPLICATION STREAM FROM $1 AS OF SYSTEM TIME `+startTime+` AS TENANT `+fmt.Sprintf("%d", roachpb.SystemTenantID.ToUint64()), + `RESTORE TENANT 10 FROM REPLICATION STREAM FROM $1 AS TENANT `+fmt.Sprintf("%d", roachpb.SystemTenantID.ToUint64()), pgURL.String()) destSQL.QueryRow(t, - `RESTORE TENANT 10 FROM REPLICATION STREAM FROM $1 AS OF SYSTEM TIME `+startTime+` AS TENANT 20`, + `RESTORE TENANT 10 FROM REPLICATION STREAM FROM $1 AS TENANT 20`, pgURL.String(), - ).Scan(&ingestionJobID) - - sourceDBRunner.CheckQueryResultsRetry(t, - "SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'STREAM REPLICATION'", [][]string{{"1"}}) - sourceDBRunner.QueryRow(t, "SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'STREAM REPLICATION'"). - Scan(&streamProducerJobID) + ).Scan(&ingestionJobID, &streamProducerJobID) sourceSQL.Exec(t, ` CREATE DATABASE d; @@ -233,14 +225,12 @@ func TestCutoverBuiltin(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) db := sqlDB.DB - startTimestamp := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} streamIngestJobRecord := jobs.Record{ Description: "test stream ingestion", Username: username.RootUserName(), Details: jobspb.StreamIngestionDetails{ StreamAddress: "randomgen://test", Span: roachpb.Span{Key: keys.LocalMax, EndKey: keys.LocalMax.Next()}, - StartTime: startTimestamp, }, Progress: jobspb.StreamIngestionProgress{}, } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index 7de954dcf4fc..78f4c22a91d7 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -26,7 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" ) @@ -153,17 +153,7 @@ func ingestionPlanHook( } // TODO(adityamaru): Add privileges checks. Probably the same as RESTORE. - // Empty start time indicates initial scan is enabled. - startTime := hlc.Timestamp{} - if ingestionStmt.AsOf.Expr != nil { - asOf, err := p.EvalAsOfTimestamp(ctx, ingestionStmt.AsOf) - if err != nil { - return err - } - startTime = asOf.Timestamp - } - - //TODO(casper): make target to be tenant-only. + // TODO(casper): make target to be tenant-only. oldTenantID := roachpb.MakeTenantID(ingestionStmt.Targets.TenantID.ID) newTenantID := oldTenantID if ingestionStmt.AsTenant.Specified { @@ -179,6 +169,8 @@ func ingestionPlanHook( if err != nil { return err } + // Create the producer job first for the purpose of observability, + // user is able to know the producer job id immediately after executing the RESTORE. streamID, err := client.Create(ctx, oldTenantID) if err != nil { return err @@ -193,7 +185,6 @@ func ingestionPlanHook( StreamID: uint64(streamID), TenantID: oldTenantID, Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}, - StartTime: startTime, NewTenantID: newTenantID, } @@ -215,11 +206,14 @@ func ingestionPlanHook( if err != nil { return err } - resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(sj.ID()))} + resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(sj.ID())), tree.NewDInt(tree.DInt(streamID))} return nil } - return fn, jobs.DetachedJobExecutionResultHeader, nil, false, nil + return fn, colinfo.ResultColumns{ + {Name: "ingestion_job_id", Typ: types.Int}, + {Name: "producer_job_id", Typ: types.Int}, + }, nil, false, nil } func init() { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 4ff778c33684..4ee51f2ae505 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -468,9 +468,9 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err } if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok { - if streamingKnobs != nil { - if streamingKnobs.RunAfterReceivingEvent != nil { - streamingKnobs.RunAfterReceivingEvent(sip.Ctx) + if streamingKnobs != nil && streamingKnobs.RunAfterReceivingEvent != nil { + if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx); err != nil { + return nil, err } } } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index 72ffb8d46d9f..921a4a4a987a 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -260,8 +260,9 @@ func TestStreamIngestionProcessor(t *testing.T) { processEventCh := make(chan struct{}) defer close(processEventCh) - streamingTestingKnob := &sql.StreamingTestingKnobs{RunAfterReceivingEvent: func(ctx context.Context) { + streamingTestingKnob := &sql.StreamingTestingKnobs{RunAfterReceivingEvent: func(ctx context.Context) error { processEventCh <- struct{}{} + return nil }} sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, partitions, startTime, nil /* interceptEvents */, tenantRekey, mockClient, nil /* cutoverProvider */, streamingTestingKnob) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go index d93f0b0c9e6c..925cca1028b0 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go @@ -41,10 +41,10 @@ import ( "github.com/stretchr/testify/require" ) -func getHighWaterMark(jobID int, sqlDB *gosql.DB) (*hlc.Timestamp, error) { +func getHighWaterMark(ingestionJobID int, sqlDB *gosql.DB) (*hlc.Timestamp, error) { var progressBytes []byte if err := sqlDB.QueryRow( - `SELECT progress FROM system.jobs WHERE id = $1`, jobID, + `SELECT progress FROM system.jobs WHERE id = $1`, ingestionJobID, ).Scan(&progressBytes); err != nil { return nil, err } @@ -161,8 +161,8 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { _, err = conn.Exec(`SET enable_experimental_stream_replication = true`) require.NoError(t, err) - var jobID int - require.NoError(t, conn.QueryRow(query).Scan(&jobID)) + var ingestionJobID, producerJobID int + require.NoError(t, conn.QueryRow(query).Scan(&ingestionJobID, &producerJobID)) // Start the ingestion stream and wait for at least one AddSSTable to ensure the job is running. allowResponse <- struct{}{} @@ -176,7 +176,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { // Ensure that the job has made some progress. var highwater hlc.Timestamp testutils.SucceedsSoon(t, func() error { - hw, err := getHighWaterMark(jobID, conn) + hw, err := getHighWaterMark(ingestionJobID, conn) require.NoError(t, err) if hw == nil { return errors.New("highwatermark is unset, no progress has been reported") @@ -191,7 +191,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { // Pick a cutover time just before the latest resolved timestamp. cutoverTime := timeutil.Unix(0, highwater.WallTime).UTC().Add(-1 * time.Microsecond).Round(time.Microsecond) _, err = conn.Exec(`SELECT crdb_internal.complete_stream_ingestion_job ($1, $2)`, - jobID, cutoverTime) + ingestionJobID, cutoverTime) require.NoError(t, err) // Wait for the job to issue a revert request. @@ -203,7 +203,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { // Wait for the ingestion job to have been marked as succeeded. testutils.SucceedsSoon(t, func() error { var status string - sqlDB.QueryRow(t, `SELECT status FROM system.jobs WHERE id = $1`, jobID).Scan(&status) + sqlDB.QueryRow(t, `SELECT status FROM system.jobs WHERE id = $1`, ingestionJobID).Scan(&status) if jobs.Status(status) != jobs.StatusSucceeded { return errors.New("job not in succeeded state") } diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index fc9d900cb036..eeca0c9310a0 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -10,6 +10,7 @@ package streamingest import ( "context" + gosql "database/sql" "fmt" "net/http" "net/http/httptest" @@ -24,12 +25,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -47,15 +51,39 @@ type tenantStreamingClustersArgs struct { destTenantID roachpb.TenantID destInitFunc execFunc destNumNodes int + testingKnobs *sql.StreamingTestingKnobs +} + +var defaultTenantStreamingClustersArgs = tenantStreamingClustersArgs{ + srcTenantID: roachpb.MakeTenantID(10), + srcInitFunc: func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { + sysSQL.ExecMultiple(t, configureClusterSettings(srcClusterSetting)...) + tenantSQL.Exec(t, ` + CREATE DATABASE d; + CREATE TABLE d.t1(i int primary key, a string, b string); + CREATE TABLE d.t2(i int primary key); + INSERT INTO d.t1 (i) VALUES (42); + INSERT INTO d.t2 VALUES (2); + UPDATE d.t1 SET b = 'world' WHERE i = 42; + `) + }, + srcNumNodes: 1, + destTenantID: roachpb.MakeTenantID(20), + destInitFunc: func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { + sysSQL.ExecMultiple(t, configureClusterSettings(destClusterSetting)...) + }, + destNumNodes: 1, } type tenantStreamingClusters struct { t *testing.T args tenantStreamingClustersArgs + srcCluster *testcluster.TestCluster srcSysSQL *sqlutils.SQLRunner srcTenantSQL *sqlutils.SQLRunner srcURL url.URL + destCluster *testcluster.TestCluster destSysSQL *sqlutils.SQLRunner destTenantSQL *sqlutils.SQLRunner } @@ -68,7 +96,7 @@ func (c *tenantStreamingClusters) compareResult(query string) { // Waits for the ingestion job high watermark to reach the given high watermark. func (c *tenantStreamingClusters) waitUntilHighWatermark( - highWatermark time.Time, ingestionJobID jobspb.JobID, + highWatermark hlc.Timestamp, ingestionJobID jobspb.JobID, ) { testutils.SucceedsSoon(c.t, func() error { progress := jobutils.GetJobProgress(c.t, c.destSysSQL, ingestionJobID) @@ -77,7 +105,7 @@ func (c *tenantStreamingClusters) waitUntilHighWatermark( highWatermark.String()) } highwater := *progress.GetHighWater() - if highwater.GoTime().Before(highWatermark) { + if highwater.Less(highWatermark) { return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s", highwater.String(), highWatermark.String()) } @@ -96,19 +124,11 @@ func (c *tenantStreamingClusters) cutover( } // Returns producer job ID and ingestion job ID. -func (c *tenantStreamingClusters) startStreamReplication(startTime string) (int, int) { +func (c *tenantStreamingClusters) startStreamReplication() (int, int) { var ingestionJobID, streamProducerJobID int - streamReplStmt := fmt.Sprintf("RESTORE TENANT %s FROM REPLICATION STREAM FROM '%s'", - c.args.srcTenantID, c.srcURL.String()) - if startTime != "" { - streamReplStmt = streamReplStmt + fmt.Sprintf(" AS OF SYSTEM TIME %s", startTime) - } - streamReplStmt = streamReplStmt + fmt.Sprintf("AS TENANT %s", c.args.destTenantID) - c.destSysSQL.QueryRow(c.t, streamReplStmt).Scan(&ingestionJobID) - c.srcSysSQL.CheckQueryResultsRetry(c.t, - "SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'STREAM REPLICATION'", [][]string{{"1"}}) - c.srcSysSQL.QueryRow(c.t, "SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'STREAM REPLICATION'"). - Scan(&streamProducerJobID) + streamReplStmt := fmt.Sprintf("RESTORE TENANT %s FROM REPLICATION STREAM FROM '%s' AS TENANT %s", + c.args.srcTenantID, c.srcURL.String(), c.args.destTenantID) + c.destSysSQL.QueryRow(c.t, streamReplStmt).Scan(&ingestionJobID, &streamProducerJobID) return streamProducerJobID, ingestionJobID } @@ -120,7 +140,11 @@ func createTenantStreamingClusters( // to system tenants. Tracked with #76378. DisableDefaultTestTenant: true, Knobs: base.TestingKnobs{ - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()}, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + DistSQL: &execinfra.TestingKnobs{ + StreamingTestingKnobs: args.testingKnobs, + }, + }, } startTestClusterWithTenant := func( @@ -129,13 +153,14 @@ func createTenantStreamingClusters( serverArgs base.TestServerArgs, tenantID roachpb.TenantID, numNodes int, - ) (*sqlutils.SQLRunner, *sqlutils.SQLRunner, url.URL, func()) { + ) (*testcluster.TestCluster, *gosql.DB, url.URL, func()) { params := base.TestClusterArgs{ServerArgs: serverArgs} c := testcluster.StartTestCluster(t, numNodes, params) + c.Server(0).Clock().Now() // TODO(casper): support adding splits when we have multiple nodes. _, tenantConn := serverutils.StartTenant(t, c.Server(0), base.TestTenantArgs{TenantID: tenantID}) pgURL, cleanupSinkCert := sqlutils.PGUrl(t, c.Server(0).ServingSQLAddr(), t.Name(), url.User(username.RootUser)) - return sqlutils.MakeSQLRunner(c.ServerConn(0)), sqlutils.MakeSQLRunner(tenantConn), pgURL, func() { + return c, tenantConn, pgURL, func() { require.NoError(t, tenantConn.Close()) c.Stopper().Stop(ctx) cleanupSinkCert() @@ -143,26 +168,31 @@ func createTenantStreamingClusters( } // Start the source cluster. - sourceSysSQL, sourceTenantSQL, srcURL, srcCleanup := startTestClusterWithTenant(ctx, t, serverArgs, args.srcTenantID, args.srcNumNodes) + srcCluster, srcTenantDB, srcURL, srcCleanup := + startTestClusterWithTenant(ctx, t, serverArgs, args.srcTenantID, args.srcNumNodes) // Start the destination cluster. - destSysSQL, destTenantSQL, _, destCleanup := startTestClusterWithTenant(ctx, t, serverArgs, args.destTenantID, args.destNumNodes) - - args.srcInitFunc(t, sourceSysSQL, sourceTenantSQL) - args.destInitFunc(t, destSysSQL, destTenantSQL) + destCluster, destTenantDB, _, destCleanup := + startTestClusterWithTenant(ctx, t, serverArgs, args.destTenantID, args.destNumNodes) + + tsc := &tenantStreamingClusters{ + t: t, + args: args, + srcCluster: srcCluster, + srcSysSQL: sqlutils.MakeSQLRunner(srcCluster.ServerConn(0)), + srcTenantSQL: sqlutils.MakeSQLRunner(srcTenantDB), + srcURL: srcURL, + destCluster: destCluster, + destSysSQL: sqlutils.MakeSQLRunner(destCluster.ServerConn(0)), + destTenantSQL: sqlutils.MakeSQLRunner(destTenantDB), + } + args.srcInitFunc(t, tsc.srcSysSQL, tsc.srcTenantSQL) + args.destInitFunc(t, tsc.destSysSQL, tsc.destTenantSQL) // Enable stream replication on dest by default. - destSysSQL.Exec(t, `SET enable_experimental_stream_replication = true;`) - return &tenantStreamingClusters{ - t: t, - args: args, - srcSysSQL: sourceSysSQL, - srcTenantSQL: sourceTenantSQL, - srcURL: srcURL, - destSysSQL: destSysSQL, - destTenantSQL: destTenantSQL, - }, func() { - destCleanup() - srcCleanup() - } + tsc.destSysSQL.Exec(t, `SET enable_experimental_stream_replication = true;`) + return tsc, func() { + destCleanup() + srcCleanup() + } } func (c *tenantStreamingClusters) srcExec(exec execFunc) { @@ -173,7 +203,7 @@ var srcClusterSetting = map[string]string{ `kv.rangefeed.enabled`: `true`, `kv.closed_timestamp.target_duration`: `'1s'`, // Large timeout makes test to not fail with unexpected timeout failures. - `stream_replication.job_liveness_timeout`: `'20s'`, + `stream_replication.job_liveness_timeout`: `'3m'`, `stream_replication.stream_liveness_track_frequency`: `'2s'`, `stream_replication.min_checkpoint_frequency`: `'1s'`, // Make all AddSSTable operation to trigger AddSSTable events. @@ -211,7 +241,6 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) { defer log.Scope(t).Close(t) skip.UnderRaceWithIssue(t, 83867) - skip.UnderStressRace(t, "slow under stressrace") dataSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == "GET" { @@ -223,113 +252,59 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) { defer dataSrv.Close() ctx := context.Background() - testTenantStreaming := func(t *testing.T, withInitialScan bool) { - // 'startTime' is a timestamp before we insert any data into the source cluster. - var startTime string - c, cleanup := createTenantStreamingClusters(ctx, t, tenantStreamingClustersArgs{ - srcTenantID: roachpb.MakeTenantID(10), - srcInitFunc: func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { - sysSQL.ExecMultiple(t, configureClusterSettings(srcClusterSetting)...) - if !withInitialScan { - sysSQL.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&startTime) - } - tenantSQL.Exec(t, ` - CREATE DATABASE d; - CREATE TABLE d.t1(i int primary key, a string, b string); - CREATE TABLE d.t2(i int primary key); - INSERT INTO d.t1 (i) VALUES (42); - INSERT INTO d.t2 VALUES (2); - UPDATE d.t1 SET b = 'world' WHERE i = 42; - `) - tenantSQL.Exec(t, ` - ALTER TABLE d.t1 DROP COLUMN b; - `) - }, - srcNumNodes: 1, - destTenantID: roachpb.MakeTenantID(20), - destInitFunc: func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { - sysSQL.ExecMultiple(t, configureClusterSettings(destClusterSetting)...) - }, - destNumNodes: 1, - }) - defer cleanup() - - producerJobID, ingestionJobID := c.startStreamReplication(startTime) - c.srcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { - tenantSQL.Exec(t, "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)") - tenantSQL.Exec(t, "IMPORT INTO d.x CSV DATA ($1)", dataSrv.URL) - }) - - var cutoverTime time.Time - c.srcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { - sysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime) - }) - c.cutover(producerJobID, ingestionJobID, cutoverTime) - - c.compareResult("SELECT * FROM d.t1") - c.compareResult("SELECT * FROM d.t2") - c.compareResult("SELECT * FROM d.x") - // After cutover, changes to source won't be streamed into destination cluster. - c.srcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { - tenantSQL.Exec(t, `INSERT INTO d.t2 VALUES (3);`) - }) - // Check the dst cluster didn't receive the change after a while. - <-time.NewTimer(3 * time.Second).C - require.Equal(t, [][]string{{"2"}}, c.destTenantSQL.QueryStr(t, "SELECT * FROM d.t2")) - } + c, cleanup := createTenantStreamingClusters(ctx, t, defaultTenantStreamingClustersArgs) + defer cleanup() - t.Run("initial-scan", func(t *testing.T) { - testTenantStreaming(t, true /* withInitialScan */) + producerJobID, ingestionJobID := c.startStreamReplication() + c.srcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { + tenantSQL.Exec(t, "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)") + tenantSQL.Exec(t, "IMPORT INTO d.x CSV DATA ($1)", dataSrv.URL) }) - t.Run("no-initial-scan", func(t *testing.T) { - testTenantStreaming(t, false /* withInitialScan */) + var cutoverTime time.Time + c.srcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { + sysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime) }) + c.cutover(producerJobID, ingestionJobID, cutoverTime) + + c.compareResult("SELECT * FROM d.t1") + c.compareResult("SELECT * FROM d.t2") + c.compareResult("SELECT * FROM d.x") + // After cutover, changes to source won't be streamed into destination cluster. + c.srcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { + tenantSQL.Exec(t, `INSERT INTO d.t2 VALUES (3);`) + }) + // Check the dst cluster didn't receive the change after a while. + <-time.NewTimer(3 * time.Second).C + require.Equal(t, [][]string{{"2"}}, c.destTenantSQL.QueryStr(t, "SELECT * FROM d.t2")) } func TestTenantStreamingProducerJobTimedOut(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + // TODO(casper): now this has the same race issue with + // TestPartitionedStreamReplicationClient, please fix them together in the future. + skip.UnderRace(t, "disabled under race") + ctx := context.Background() - srcClusterSetting[`stream_replication.job_liveness_timeout`] = `'3m'` - c, cleanup := createTenantStreamingClusters(ctx, t, tenantStreamingClustersArgs{ - srcTenantID: roachpb.MakeTenantID(10), - srcInitFunc: func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { - sysSQL.ExecMultiple(t, configureClusterSettings(srcClusterSetting)...) - tenantSQL.Exec(t, ` - CREATE DATABASE d; - CREATE TABLE d.t1(i int primary key, a string, b string); - CREATE TABLE d.t2(i int primary key); - INSERT INTO d.t1 (i) VALUES (42); - INSERT INTO d.t2 VALUES (2); - UPDATE d.t1 SET b = 'world' WHERE i = 42; - `) - }, - srcNumNodes: 1, - destTenantID: roachpb.MakeTenantID(20), - destInitFunc: func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { - sysSQL.ExecMultiple(t, configureClusterSettings(destClusterSetting)...) - }, - destNumNodes: 1, - }) + c, cleanup := createTenantStreamingClusters(ctx, t, defaultTenantStreamingClustersArgs) defer cleanup() // initial scan - producerJobID, ingestionJobID := c.startStreamReplication("" /* startTime */) + producerJobID, ingestionJobID := c.startStreamReplication() jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) - var srcTime time.Time - c.srcSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&srcTime) + srcTime := c.srcCluster.Server(0).Clock().Now() c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) c.compareResult("SELECT * FROM d.t1") c.compareResult("SELECT * FROM d.t2") stats := streamIngestionStats(t, c.destSysSQL, ingestionJobID) - require.True(t, srcTime.Before(stats.ReplicationLagInfo.MinIngestedTimestamp.GoTime())) + require.True(t, srcTime.LessEq(stats.ReplicationLagInfo.MinIngestedTimestamp)) require.Equal(t, "", stats.ProducerError) // Make producer job easily times out @@ -338,7 +313,7 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) { })...) jobutils.WaitForJobToFail(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) - jobutils.WaitForJobToFail(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + jobutils.WaitForJobToPause(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) // Make dest cluster to ingest KV events faster. c.srcSysSQL.ExecMultiple(t, configureClusterSettings(map[string]string{ @@ -350,4 +325,110 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) { <-time.NewTimer(3 * time.Second).C require.Equal(t, [][]string{{"0"}}, c.destTenantSQL.QueryStr(t, "SELECT count(*) FROM d.t2 WHERE i = 3")) + + // After resumed, the ingestion job paused on failure again. + c.destSysSQL.Exec(t, fmt.Sprintf("RESUME JOB %d", ingestionJobID)) + jobutils.WaitForJobToPause(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) +} + +func TestTenantStreamingPauseResumeIngestion(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // TODO(casper): now this has the same race issue with + // TestPartitionedStreamReplicationClient, please fix them together in the future. + skip.UnderRace(t, "disabled under race") + + ctx := context.Background() + args := defaultTenantStreamingClustersArgs + c, cleanup := createTenantStreamingClusters(ctx, t, args) + defer cleanup() + + producerJobID, ingestionJobID := c.startStreamReplication() + + jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + + srcTime := c.srcCluster.Server(0).Clock().Now() + c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + + c.compareResult("SELECT * FROM d.t1") + c.compareResult("SELECT * FROM d.t2") + + // Pause ingestion. + c.destSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", ingestionJobID)) + jobutils.WaitForJobToPause(t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + pausedCheckpoint := streamIngestionStats(t, c.destSysSQL, ingestionJobID). + ReplicationLagInfo.MinIngestedTimestamp + // Check we paused at a timestamp greater than the previously reached high watermark + require.True(t, srcTime.LessEq(pausedCheckpoint)) + + // Introduce new update to the src. + c.srcTenantSQL.Exec(t, "INSERT INTO d.t2 VALUES (3);") + // Check the dst cluster didn't receive the new change after pausing for a while. + <-time.NewTimer(3 * time.Second).C + require.Equal(t, [][]string{{"0"}}, + c.destTenantSQL.QueryStr(t, "SELECT count(*) FROM d.t2 WHERE i = 3")) + // Confirm that the job high watermark doesn't change. If the dest cluster is still subscribing + // to src cluster checkpoints events, the job high watermark may change. + require.Equal(t, pausedCheckpoint, + streamIngestionStats(t, c.destSysSQL, ingestionJobID).ReplicationLagInfo.MinIngestedTimestamp) + + // Resume ingestion. + c.destSysSQL.Exec(t, fmt.Sprintf("RESUME JOB %d", ingestionJobID)) + jobutils.WaitForJobToRun(t, c.srcSysSQL, jobspb.JobID(producerJobID)) + + // Confirm that dest tenant has received the new change after resumption. + srcTime = c.srcCluster.Server(0).Clock().Now() + c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + c.compareResult("SELECT * FROM d.t2") + // Confirm this new run resumed from the previous checkpoint. + require.Equal(t, pausedCheckpoint, + streamIngestionStats(t, c.destSysSQL, ingestionJobID).IngestionProgress.StartTime) +} + +func TestTenantStreamingPauseOnError(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // TODO(casper): now this has the same race issue with + // TestPartitionedStreamReplicationClient, please fix them together in the future. + skip.UnderRace(t, "disabled under race") + + ctx := context.Background() + ingestErrCh := make(chan error, 1) + args := defaultTenantStreamingClustersArgs + args.testingKnobs = &sql.StreamingTestingKnobs{RunAfterReceivingEvent: func(ctx context.Context) error { + return <-ingestErrCh + }} + c, cleanup := createTenantStreamingClusters(ctx, t, args) + defer cleanup() + + // Make ingestion error out only once. + ingestErrCh <- errors.Newf("ingestion error from test") + close(ingestErrCh) + + producerJobID, ingestionJobID := c.startStreamReplication() + jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + jobutils.WaitForJobToPause(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + + // Check we didn't make any progress. + require.Nil(t, streamIngestionStats(t, c.destSysSQL, ingestionJobID).ReplicationLagInfo) + // Confirm we don't receive any change from src. + c.destTenantSQL.ExpectErr(t, "\"d.t1\" does not exist", "SELECT * FROM d.t1") + c.destTenantSQL.ExpectErr(t, "\"d.t2\" does not exist", "SELECT * FROM d.t2") + + // Resume ingestion. + c.destSysSQL.Exec(t, fmt.Sprintf("RESUME JOB %d", ingestionJobID)) + jobutils.WaitForJobToRun(t, c.srcSysSQL, jobspb.JobID(producerJobID)) + + // Check dest has caught up the previous updates. + srcTime := c.srcCluster.Server(0).Clock().Now() + c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + c.compareResult("SELECT * FROM d.t1") + c.compareResult("SELECT * FROM d.t2") + + // Confirm this new run resumed from the empty checkpoint. + require.True(t, + streamIngestionStats(t, c.destSysSQL, ingestionJobID).IngestionProgress.StartTime.IsEmpty()) } diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go b/pkg/ccl/streamingccl/streamproducer/producer_job_test.go index 2ab79c698c05..a30904dc84a1 100644 --- a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go +++ b/pkg/ccl/streamingccl/streamproducer/producer_job_test.go @@ -227,8 +227,7 @@ func TestStreamReplicationProducerJob(t *testing.T) { expire := expirationTime(jr).Add(10 * time.Millisecond) require.NoError(t, source.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { streamStatus, err = updateReplicationStreamProgress( - ctx, expire, - ptp, registry, streaming.StreamID(jr.JobID), updatedFrontier, txn) + ctx, expire, ptp, registry, streaming.StreamID(jr.JobID), updatedFrontier, txn) return err })) require.Equal(t, streampb.StreamReplicationStatus_STREAM_ACTIVE, streamStatus.StreamStatus) diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go index 18ea1ef744c2..6ee4e5f41d1a 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go @@ -100,7 +100,7 @@ func updateReplicationStreamProgress( ptsProvider protectedts.Provider, registry *jobs.Registry, streamID streaming.StreamID, - ts hlc.Timestamp, + consumedTime hlc.Timestamp, txn *kv.Txn, ) (status streampb.StreamReplicationStatus, err error) { const useReadLock = false @@ -123,11 +123,19 @@ func updateReplicationStreamProgress( return nil } - if shouldUpdatePTS := ptsRecord.Timestamp.Less(ts); shouldUpdatePTS { - if err = ptsProvider.UpdateTimestamp(ctx, txn, ptsID, ts); err != nil { + // TODO(casper): Error out when the protected timestamp moves backward as the ingestion + // processors may consume kv changes that are not protected. We are fine for now + // for the sake of long GC window. + // Now this can happen because the frontier processor moves forward the protected timestamp + // in the source cluster through heartbeats before it reports the new frontier to the + // ingestion job resumer which later updates the job high watermark. When we retry another + // ingestion using the previous ingestion high watermark, it can fall behind the + // source cluster protected timestamp. + if shouldUpdatePTS := ptsRecord.Timestamp.Less(consumedTime); shouldUpdatePTS { + if err = ptsProvider.UpdateTimestamp(ctx, txn, ptsID, consumedTime); err != nil { return err } - status.ProtectedTimestamp = &ts + status.ProtectedTimestamp = &consumedTime } // Allow expiration time to go backwards as user may set a smaller timeout. md.Progress.GetStreamReplication().Expiration = expiration @@ -180,7 +188,8 @@ func heartbeatReplicationStream( } return updateReplicationStreamProgress(evalCtx.Ctx(), - expirationTime, execConfig.ProtectedTimestampProvider, execConfig.JobRegistry, streamID, frontier, txn) + expirationTime, execConfig.ProtectedTimestampProvider, execConfig.JobRegistry, + streamID, frontier, txn) } // getReplicationStreamSpec gets a replication stream specification for the specified stream. diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index d8eead34d3bb..129f73226ca1 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -85,8 +85,6 @@ message StreamIngestionDetails { // a span. Note that KVs received from the stream may need to be re-keyed into // this span. roachpb.Span span = 2 [(gogoproto.nullable) = false]; - // The job will ingest events from StartTime onwards. - util.hlc.Timestamp start_time = 3 [(gogoproto.nullable) = false]; reserved 5; roachpb.TenantID tenant_id = 6 [(gogoproto.customname) = "TenantID", (gogoproto.nullable) = false]; @@ -109,6 +107,11 @@ message StreamIngestionProgress { (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/base.SQLInstanceID", (gogoproto.nullable) = false]; } + + // The job will ingest events from StartTime onwards during the current run. + // This may change when the the ingestion job gets resumed from the previous checkpoint. + util.hlc.Timestamp start_time = 3 [(gogoproto.nullable) = false]; + // CutoverTime is set to signal to the stream ingestion job to complete its // ingestion. This involves stopping any subsequent ingestion, and rolling // back any additional ingested data, to bring the ingested cluster to a diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 975dd8ef87ae..b8f66cb1724e 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1563,7 +1563,7 @@ func (*BackupRestoreTestingKnobs) ModuleTestingKnobs() {} type StreamingTestingKnobs struct { // RunAfterReceivingEvent allows blocking the stream ingestion processor after // a single event has been received. - RunAfterReceivingEvent func(ctx context.Context) + RunAfterReceivingEvent func(ctx context.Context) error } var _ base.ModuleTestingKnobs = &StreamingTestingKnobs{} diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index d4a4a6602a55..1a139bef888c 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -3181,13 +3181,12 @@ restore_stmt: Options: *($9.restoreOptions()), } } -| RESTORE targets FROM REPLICATION STREAM FROM string_or_placeholder_opt_list opt_as_of_clause opt_as_tenant_clause +| RESTORE targets FROM REPLICATION STREAM FROM string_or_placeholder_opt_list opt_as_tenant_clause { $$.val = &tree.StreamIngestion{ Targets: $2.targetList(), From: $7.stringOrPlaceholderOptList(), - AsOf: $8.asOfClause(), - AsTenant: $9.asTenantClause(), + AsTenant: $8.asTenantClause(), } } | RESTORE error // SHOW HELP: RESTORE diff --git a/pkg/sql/parser/testdata/backup_restore b/pkg/sql/parser/testdata/backup_restore index ceb882eadab6..fb6fd33b9a20 100644 --- a/pkg/sql/parser/testdata/backup_restore +++ b/pkg/sql/parser/testdata/backup_restore @@ -722,20 +722,20 @@ RESTORE TENANT _ FROM REPLICATION STREAM FROM $1 -- literals removed RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 -- identifiers removed parse -RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' AS OF SYSTEM TIME '1' +RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' ---- -RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' AS OF SYSTEM TIME '1' -RESTORE TENANT 123 FROM REPLICATION STREAM FROM ('bar') AS OF SYSTEM TIME ('1') -- fully parenthesized -RESTORE TENANT _ FROM REPLICATION STREAM FROM '_' AS OF SYSTEM TIME '_' -- literals removed -RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' AS OF SYSTEM TIME '1' -- identifiers removed +RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' +RESTORE TENANT 123 FROM REPLICATION STREAM FROM ('bar') -- fully parenthesized +RESTORE TENANT _ FROM REPLICATION STREAM FROM '_' -- literals removed +RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' -- identifiers removed parse -RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 AS OF SYSTEM TIME '1' +RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 ---- -RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 AS OF SYSTEM TIME '1' -RESTORE TENANT 123 FROM REPLICATION STREAM FROM ($1) AS OF SYSTEM TIME ('1') -- fully parenthesized -RESTORE TENANT _ FROM REPLICATION STREAM FROM $1 AS OF SYSTEM TIME '_' -- literals removed -RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 AS OF SYSTEM TIME '1' -- identifiers removed +RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 +RESTORE TENANT 123 FROM REPLICATION STREAM FROM ($1) -- fully parenthesized +RESTORE TENANT _ FROM REPLICATION STREAM FROM $1 -- literals removed +RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 -- identifiers removed parse BACKUP TABLE foo TO 'bar' WITH revision_history, detached diff --git a/pkg/sql/sem/tree/stream_ingestion.go b/pkg/sql/sem/tree/stream_ingestion.go index 129083dceeb7..6606972712e9 100644 --- a/pkg/sql/sem/tree/stream_ingestion.go +++ b/pkg/sql/sem/tree/stream_ingestion.go @@ -14,7 +14,6 @@ package tree type StreamIngestion struct { Targets TargetList From StringOrPlaceholderOptList - AsOf AsOfClause AsTenant TenantID } @@ -27,10 +26,6 @@ func (node *StreamIngestion) Format(ctx *FmtCtx) { ctx.WriteString(" ") ctx.WriteString("FROM REPLICATION STREAM FROM ") ctx.FormatNode(&node.From) - if node.AsOf.Expr != nil { - ctx.WriteString(" ") - ctx.FormatNode(&node.AsOf) - } if node.AsTenant.Specified { ctx.WriteString(" AS TENANT ") ctx.FormatNode(&node.AsTenant)