Skip to content

Commit

Permalink
streamingccl: support ingestion job to pause on error
Browse files Browse the repository at this point in the history
Previously, an ingestion job fails on error and the tenant state is
kept after failure. The user is supposed to execute another
RESTORE FROM REPLICATION STREAM statement to resume ingestion from
a given checkpoint.

Now we introduce the new semanics: the job will be paused on error,
the user can RESUME JOB to start from the previous checkpoint or
cancel the job to roll back (in another PR).

We also drop AS OF syntax as it conflicts with the purpose of pause
on error. AS OF assumes users resume ingesion for an existent tenant
from a given checkpoint and creates a new ingestion job while the
previous ingestion job is paused on error, making a bad user
experience.

Release note: None
  • Loading branch information
gh-casper committed Jul 11, 2022
1 parent 6374bd8 commit f69f2fd
Show file tree
Hide file tree
Showing 16 changed files with 343 additions and 220 deletions.
2 changes: 1 addition & 1 deletion docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ restore_stmt ::=
| 'RESTORE' targets 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' 'SYSTEM' 'USERS' 'FROM' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' 'SYSTEM' 'USERS' 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list opt_as_of_clause opt_as_tenant_clause
| 'RESTORE' targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list opt_as_tenant_clause

resume_stmt ::=
resume_jobs_stmt
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/span",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand Down
113 changes: 82 additions & 31 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ package streamingest

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
Expand All @@ -23,6 +25,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -71,11 +75,13 @@ func getStreamIngestionStats(
}
lagInfo.SlowestSourcePartitionTimestamp = hlc.MaxTimestamp
lagInfo.FastestSourcePartitionTimestamp = hlc.MinTimestamp
// TODO(casper): track spans that the slowest partition is associated with once
// we switch from tracking partition ID as frontier to tracking actual spans as
// frontier.
for partition, pp := range progress.GetStreamIngest().PartitionProgress {
if pp.IngestedTimestamp.Less(lagInfo.SlowestSourcePartitionTimestamp) {
lagInfo.SlowestSourcePartitionTimestamp = pp.IngestedTimestamp
lagInfo.SlowestSourcePartition = partition

}

if lagInfo.FastestSourcePartitionTimestamp.Less(pp.IngestedTimestamp) {
Expand Down Expand Up @@ -106,39 +112,66 @@ type streamIngestionResumer struct {
job *jobs.Job
}

func ingest(
ctx context.Context,
execCtx sql.JobExecContext,
streamAddress streamingccl.StreamAddress,
oldTenantID roachpb.TenantID,
newTenantID roachpb.TenantID,
streamID streaming.StreamID,
startTime hlc.Timestamp,
progress jobspb.Progress,
ingestionJobID jobspb.JobID,
) error {
func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.Job) error {
details := ingestionJob.Details().(jobspb.StreamIngestionDetails)
progress := ingestionJob.Progress()
streamAddress := streamingccl.StreamAddress(details.StreamAddress)

startTime := progress.GetStreamIngest().StartTime
// Start from the last checkpoint if it exists.
if h := progress.GetHighWater(); h != nil && !h.IsEmpty() {
startTime = *h
}

// If there is an existing stream ID, reconnect to it.
streamID := streaming.StreamID(details.StreamID)
// Initialize a stream client and resolve topology.
client, err := streamclient.NewStreamClient(streamAddress)
if err != nil {
return err
}
ingestWithClient := func() error {
// TODO(dt): if there is an existing stream ID, reconnect to it.
ro := retry.Options{
InitialBackoff: 1 * time.Second,
Multiplier: 2,
MaxBackoff: 10 * time.Second,
MaxRetries: 5,
}
// Make sure the producer job is active before start the stream replication.
var status streampb.StreamReplicationStatus
for r := retry.Start(ro); r.Next(); {
status, err = client.Heartbeat(ctx, streamID, startTime)
if err != nil {
return errors.Wrapf(err, "failed to resume ingestion job %d due to producer job error",
ingestionJob.ID())
}
if status.StreamStatus != streampb.StreamReplicationStatus_UNKNOWN_STREAM_STATUS_RETRY {
break
}
log.Warningf(ctx,
"producer job %d has status %s, retrying", streamID, status.StreamStatus)
}
if status.StreamStatus != streampb.StreamReplicationStatus_STREAM_ACTIVE {
return errors.Errorf("failed to resume ingestion job %d as the producer job is not active "+
"and in status %s", ingestionJob.ID(), status.StreamStatus)
}

log.Infof(ctx, "producer job %d is active, creating a stream replication plan", streamID)
topology, err := client.Plan(ctx, streamID)
if err != nil {
return err
}

// TODO(adityamaru): If the job is being resumed it is possible that it has
// check-pointed a resolved ts up to which all of its processors had ingested
// KVs. We can skip to ingesting after this resolved ts. Plumb the
// initialHighwatermark to the ingestion processor spec based on what we read
// from the job progress.
initialHighWater := startTime
if h := progress.GetHighWater(); h != nil && !h.IsEmpty() {
initialHighWater = *h
// TODO(casper): update running status
if progress.GetStreamIngest().StartTime.Less(startTime) {
progress.GetStreamIngest().StartTime = startTime
if err := ingestionJob.SetProgress(ctx, nil, *progress.GetStreamIngest()); err != nil {
return errors.Wrap(err, "failed to update job progress")
}
}

log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s",
ingestionJob.ID(), progress.GetStreamIngest().StartTime)
evalCtx := execCtx.ExtendedEvalContext()
dsp := execCtx.DistSQLPlanner()

Expand All @@ -149,24 +182,32 @@ func ingest(

// Construct stream ingestion processor specs.
streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs(
streamAddress, topology, sqlInstanceIDs, initialHighWater, ingestionJobID, streamID, oldTenantID, newTenantID)
streamAddress, topology, sqlInstanceIDs, progress.GetStreamIngest().StartTime,
ingestionJob.ID(), streamID, details.TenantID, details.NewTenantID)
if err != nil {
return err
}

// Plan and run the DistSQL flow.
if err = distStreamIngest(ctx, execCtx, sqlInstanceIDs, ingestionJobID, planCtx, dsp, streamIngestionSpecs,
streamIngestionFrontierSpec); err != nil {
log.Infof(ctx, "starting to plan and run DistSQL flow for stream ingestion job %d",
ingestionJob.ID())
if err = distStreamIngest(ctx, execCtx, sqlInstanceIDs, ingestionJob.ID(), planCtx, dsp,
streamIngestionSpecs, streamIngestionFrontierSpec); err != nil {
return err
}

// A nil error is only possible if the job was signaled to cutover and the
// processors shut down gracefully, i.e stopped ingesting any additional
// events from the replication stream. At this point it is safe to revert to
// the cutoff time to leave the cluster in a consistent state.
if err = revertToCutoverTimestamp(ctx, execCtx, ingestionJobID); err != nil {
log.Infof(ctx,
"starting to revert to the specified cutover timestamp for stream ingestion job %d",
ingestionJob.ID())
if err = revertToCutoverTimestamp(ctx, execCtx, ingestionJob.ID()); err != nil {
return err
}

log.Infof(ctx, "starting to complete the producer job %d", streamID)
// Completes the producer job in the source cluster.
return client.Complete(ctx, streamID)
}
Expand All @@ -175,15 +216,25 @@ func ingest(

// Resume is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx interface{}) error {
details := s.job.Details().(jobspb.StreamIngestionDetails)
p := execCtx.(sql.JobExecContext)

jobExecCtx := execCtx.(sql.JobExecContext)
// Start ingesting KVs from the replication stream.
streamAddress := streamingccl.StreamAddress(details.StreamAddress)
// TODO(casper): retry stream ingestion with exponential
// backoff and finally pause on error.
return ingest(resumeCtx, p, streamAddress, details.TenantID, details.NewTenantID,
streaming.StreamID(details.StreamID), details.StartTime, s.job.Progress(), s.job.ID())
err := ingest(resumeCtx, jobExecCtx, s.job)
if err != nil {
const errorFmt = "ingestion job failed (%v) but is being paused"
errorMessage := fmt.Sprintf(errorFmt, err)
log.Warningf(resumeCtx, errorFmt, err)
// The ingestion job is paused but the producer job will keep
// running until it times out. Users can still resume the job before
// the producer job times out.
return s.job.PauseRequested(resumeCtx, jobExecCtx.Txn(), func(ctx context.Context,
planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress) error {
progress.RunningStatus = errorMessage
return nil
}, errorMessage)
}
return nil
}

// revertToCutoverTimestamp reads the job progress for the cutover time and
Expand Down
16 changes: 3 additions & 13 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,21 +155,13 @@ SET enable_experimental_stream_replication = true;
defer cleanupSink()

var ingestionJobID, streamProducerJobID int64
var startTime string
sourceSQL.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&startTime)

destSQL.ExpectErr(t, "pq: either old tenant ID 10 or the new tenant ID 1 cannot be system tenant",
`RESTORE TENANT 10 FROM REPLICATION STREAM FROM $1 AS OF SYSTEM TIME `+startTime+` AS TENANT `+fmt.Sprintf("%d", roachpb.SystemTenantID.ToUint64()),
`RESTORE TENANT 10 FROM REPLICATION STREAM FROM $1 AS TENANT `+fmt.Sprintf("%d", roachpb.SystemTenantID.ToUint64()),
pgURL.String())
destSQL.QueryRow(t,
`RESTORE TENANT 10 FROM REPLICATION STREAM FROM $1 AS OF SYSTEM TIME `+startTime+` AS TENANT 20`,
`RESTORE TENANT 10 FROM REPLICATION STREAM FROM $1 AS TENANT 20`,
pgURL.String(),
).Scan(&ingestionJobID)

sourceDBRunner.CheckQueryResultsRetry(t,
"SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'STREAM REPLICATION'", [][]string{{"1"}})
sourceDBRunner.QueryRow(t, "SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'STREAM REPLICATION'").
Scan(&streamProducerJobID)
).Scan(&ingestionJobID, &streamProducerJobID)

sourceSQL.Exec(t, `
CREATE DATABASE d;
Expand Down Expand Up @@ -233,14 +225,12 @@ func TestCutoverBuiltin(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
db := sqlDB.DB

startTimestamp := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
streamIngestJobRecord := jobs.Record{
Description: "test stream ingestion",
Username: username.RootUserName(),
Details: jobspb.StreamIngestionDetails{
StreamAddress: "randomgen://test",
Span: roachpb.Span{Key: keys.LocalMax, EndKey: keys.LocalMax.Next()},
StartTime: startTimestamp,
},
Progress: jobspb.StreamIngestionProgress{},
}
Expand Down
24 changes: 9 additions & 15 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -153,17 +153,7 @@ func ingestionPlanHook(
}

// TODO(adityamaru): Add privileges checks. Probably the same as RESTORE.
// Empty start time indicates initial scan is enabled.
startTime := hlc.Timestamp{}
if ingestionStmt.AsOf.Expr != nil {
asOf, err := p.EvalAsOfTimestamp(ctx, ingestionStmt.AsOf)
if err != nil {
return err
}
startTime = asOf.Timestamp
}

//TODO(casper): make target to be tenant-only.
// TODO(casper): make target to be tenant-only.
oldTenantID := roachpb.MakeTenantID(ingestionStmt.Targets.TenantID.ID)
newTenantID := oldTenantID
if ingestionStmt.AsTenant.Specified {
Expand All @@ -179,6 +169,8 @@ func ingestionPlanHook(
if err != nil {
return err
}
// Create the producer job first for the purpose of observability,
// user is able to know the producer job id immediately after executing the RESTORE.
streamID, err := client.Create(ctx, oldTenantID)
if err != nil {
return err
Expand All @@ -193,7 +185,6 @@ func ingestionPlanHook(
StreamID: uint64(streamID),
TenantID: oldTenantID,
Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()},
StartTime: startTime,
NewTenantID: newTenantID,
}

Expand All @@ -215,11 +206,14 @@ func ingestionPlanHook(
if err != nil {
return err
}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(sj.ID()))}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(sj.ID())), tree.NewDInt(tree.DInt(streamID))}
return nil
}

return fn, jobs.DetachedJobExecutionResultHeader, nil, false, nil
return fn, colinfo.ResultColumns{
{Name: "ingestion_job_id", Typ: types.Int},
{Name: "producer_job_id", Typ: types.Int},
}, nil, false, nil
}

func init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,9 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err
}

if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil {
if streamingKnobs.RunAfterReceivingEvent != nil {
streamingKnobs.RunAfterReceivingEvent(sip.Ctx)
if streamingKnobs != nil && streamingKnobs.RunAfterReceivingEvent != nil {
if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx); err != nil {
return nil, err
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,9 @@ func TestStreamIngestionProcessor(t *testing.T) {

processEventCh := make(chan struct{})
defer close(processEventCh)
streamingTestingKnob := &sql.StreamingTestingKnobs{RunAfterReceivingEvent: func(ctx context.Context) {
streamingTestingKnob := &sql.StreamingTestingKnobs{RunAfterReceivingEvent: func(ctx context.Context) error {
processEventCh <- struct{}{}
return nil
}}
sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB,
partitions, startTime, nil /* interceptEvents */, tenantRekey, mockClient, nil /* cutoverProvider */, streamingTestingKnob)
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ import (
"github.com/stretchr/testify/require"
)

func getHighWaterMark(jobID int, sqlDB *gosql.DB) (*hlc.Timestamp, error) {
func getHighWaterMark(ingestionJobID int, sqlDB *gosql.DB) (*hlc.Timestamp, error) {
var progressBytes []byte
if err := sqlDB.QueryRow(
`SELECT progress FROM system.jobs WHERE id = $1`, jobID,
`SELECT progress FROM system.jobs WHERE id = $1`, ingestionJobID,
).Scan(&progressBytes); err != nil {
return nil, err
}
Expand Down Expand Up @@ -161,8 +161,8 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
_, err = conn.Exec(`SET enable_experimental_stream_replication = true`)
require.NoError(t, err)

var jobID int
require.NoError(t, conn.QueryRow(query).Scan(&jobID))
var ingestionJobID, producerJobID int
require.NoError(t, conn.QueryRow(query).Scan(&ingestionJobID, &producerJobID))

// Start the ingestion stream and wait for at least one AddSSTable to ensure the job is running.
allowResponse <- struct{}{}
Expand All @@ -176,7 +176,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
// Ensure that the job has made some progress.
var highwater hlc.Timestamp
testutils.SucceedsSoon(t, func() error {
hw, err := getHighWaterMark(jobID, conn)
hw, err := getHighWaterMark(ingestionJobID, conn)
require.NoError(t, err)
if hw == nil {
return errors.New("highwatermark is unset, no progress has been reported")
Expand All @@ -191,7 +191,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
// Pick a cutover time just before the latest resolved timestamp.
cutoverTime := timeutil.Unix(0, highwater.WallTime).UTC().Add(-1 * time.Microsecond).Round(time.Microsecond)
_, err = conn.Exec(`SELECT crdb_internal.complete_stream_ingestion_job ($1, $2)`,
jobID, cutoverTime)
ingestionJobID, cutoverTime)
require.NoError(t, err)

// Wait for the job to issue a revert request.
Expand All @@ -203,7 +203,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
// Wait for the ingestion job to have been marked as succeeded.
testutils.SucceedsSoon(t, func() error {
var status string
sqlDB.QueryRow(t, `SELECT status FROM system.jobs WHERE id = $1`, jobID).Scan(&status)
sqlDB.QueryRow(t, `SELECT status FROM system.jobs WHERE id = $1`, ingestionJobID).Scan(&status)
if jobs.Status(status) != jobs.StatusSucceeded {
return errors.New("job not in succeeded state")
}
Expand Down
Loading

0 comments on commit f69f2fd

Please sign in to comment.