diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index deaad9dfb74c..b2ef18c18c82 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -405,7 +405,9 @@ type backupResumer struct { var _ jobs.TraceableJob = &backupResumer{} // ForceRealSpan implements the TraceableJob interface. -func (b *backupResumer) ForceRealSpan() {} +func (b *backupResumer) ForceRealSpan() bool { + return true +} // Resume is part of the jobs.Resumer interface. func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 2f616989e641..dca7c1fdf82b 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -1172,7 +1172,9 @@ type importResumer struct { } } -func (r *importResumer) ForceRealSpan() {} +func (r *importResumer) ForceRealSpan() bool { + return true +} var _ jobs.TraceableJob = &importResumer{} diff --git a/pkg/cli/debug_job_trace_test.go b/pkg/cli/debug_job_trace_test.go index 28a69c2fe63f..9b76c7040918 100644 --- a/pkg/cli/debug_job_trace_test.go +++ b/pkg/cli/debug_job_trace_test.go @@ -41,7 +41,9 @@ import ( var _ jobs.Resumer = &traceSpanResumer{} var _ jobs.TraceableJob = &traceSpanResumer{} -func (r *traceSpanResumer) ForceRealSpan() {} +func (r *traceSpanResumer) ForceRealSpan() bool { + return true +} type traceSpanResumer struct { ctx context.Context diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 67d37620d2fb..03c5e27f9625 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -251,6 +251,9 @@ func (r *Registry) runJob( typ := job.mu.payload.Type() job.mu.Unlock() + // Make sure that we remove the job from the running set when this returns. + defer r.unregister(job.ID()) + // Bookkeeping. execCtx, cleanup := r.execCtx("resume-"+taskName, username) defer cleanup() @@ -263,15 +266,19 @@ func (r *Registry) runJob( // // A new root span will be created on every resumption of the job. var spanOptions []tracing.SpanOption - if _, ok := resumer.(TraceableJob); ok { + if tj, ok := resumer.(TraceableJob); ok && tj.ForceRealSpan() { spanOptions = append(spanOptions, tracing.WithForceRealSpan()) } + // TODO(ajwerner): Move this writing up the trace ID down into + // stepThroughStateMachine where we're already often (and soon with + // exponential backoff, always) updating the job in that call. ctx, span = r.settings.Tracer.StartSpanCtx(ctx, spanName, spanOptions...) defer span.Finish() if err := job.Update(ctx, nil /* txn */, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error { - md.Progress.TraceID = span.TraceID() - ju.UpdateProgress(md.Progress) + progress := *md.Progress + progress.TraceID = span.TraceID() + ju.UpdateProgress(&progress) return nil }); err != nil { return err @@ -285,7 +292,6 @@ func (r *Registry) runJob( if err != nil && ctx.Err() == nil { log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err) } - r.unregister(job.ID()) return err } diff --git a/pkg/jobs/helpers_test.go b/pkg/jobs/helpers_test.go index 6f1aea8be934..c5baa5048ecd 100644 --- a/pkg/jobs/helpers_test.go +++ b/pkg/jobs/helpers_test.go @@ -19,10 +19,15 @@ import ( // FakeResumer calls optional callbacks during the job lifecycle. type FakeResumer struct { - OnResume func(context.Context) error - FailOrCancel func(context.Context) error - Success func() error - PauseRequest onPauseRequestFunc + OnResume func(context.Context) error + FailOrCancel func(context.Context) error + Success func() error + PauseRequest onPauseRequestFunc + TraceRealSpan bool +} + +func (d FakeResumer) ForceRealSpan() bool { + return d.TraceRealSpan } var _ Resumer = FakeResumer{} diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 6ff7f57d75c0..71e7902f9c49 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -96,7 +96,7 @@ type StartableJob struct { type TraceableJob interface { // ForceRealSpan forces the registry to create a real Span instead of a // low-overhead non-recordable noop span. - ForceRealSpan() + ForceRealSpan() bool } func init() { diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 7a64c85bb1e9..fb2d4285d16d 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -179,6 +179,9 @@ type registryTestSuite struct { // Instead of a ch for success, use a variable because it can retry since it // is in a transaction. successErr error + + // controls whether job resumers will ask for a real tracing span. + traceRealSpan bool } func noopPauseRequestFunc( @@ -187,6 +190,8 @@ func noopPauseRequestFunc( return nil } +var _ jobs.TraceableJob = (*jobs.FakeResumer)(nil) + func (rts *registryTestSuite) setUp(t *testing.T) { rts.ctx = context.Background() @@ -217,6 +222,7 @@ func (rts *registryTestSuite) setUp(t *testing.T) { jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{ + TraceRealSpan: rts.traceRealSpan, OnResume: func(ctx context.Context) error { t.Log("Starting resume") rts.mu.Lock() @@ -878,6 +884,52 @@ func TestRegistryLifecycle(t *testing.T) { rts.mu.e.ResumeExit++ rts.check(t, jobs.StatusSucceeded) }) + t.Run("fail setting trace ID", func(t *testing.T) { + // The trace ID is set on the job above the state machine loop. + // This tests a regression where we fail to set trace ID and then + // don't clear the in-memory state that we were running this job. + // That prevents the job from being re-run. + + rts := registryTestSuite{} + rts.setUp(t) + defer rts.tearDown() + rts.traceRealSpan = true + + // Inject an error in the update to record the trace ID. + var failed atomic.Value + failed.Store(false) + rts.beforeUpdate = func(orig, updated jobs.JobMetadata) error { + if !failed.Load().(bool) && + orig.Progress.TraceID == 0 && + updated.Progress != nil && + updated.Progress.TraceID != 0 { + failed.Store(true) + return errors.New("boom") + } + return nil + } + + j, err := jobs.TestingCreateAndStartJob(context.Background(), rts.registry, rts.s.DB(), rts.mockJob) + if err != nil { + t.Fatal(err) + } + rts.job = j + + testutils.SucceedsSoon(t, func() error { + if !failed.Load().(bool) { + return errors.New("not yet failed") + } + return nil + }) + + // Make sure the job retries and then succeeds. + rts.resumeCheckCh <- struct{}{} + rts.resumeCh <- nil + rts.mu.e.ResumeStart = true + rts.mu.e.ResumeExit++ + rts.mu.e.Success = true + rts.check(t, jobs.StatusSucceeded) + }) } func TestJobLifecycle(t *testing.T) {