Skip to content

Commit

Permalink
jobs: fix bug where failure to record the trace ID prevents adoption
Browse files Browse the repository at this point in the history
In cockroachdb#65322 we added code to record a trace ID into job progress. This code
added a new error return path to runJob which did not unregister the job
from the in-memory state in the registry. This PR fixes that oversight
by deferring that call and allowing jobs to be retried when writing that
trace ID fails.

Release note (bug fix): Fixed a bug in jobs where failures to write to the
jobs table can prevent subsequent adoption of a job until the previous
node dies or the jobs is paused.
  • Loading branch information
ajwerner committed Jul 16, 2021
1 parent 4a1d6c7 commit 8f93b19
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 12 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,9 @@ type backupResumer struct {
var _ jobs.TraceableJob = &backupResumer{}

// ForceRealSpan implements the TraceableJob interface.
func (b *backupResumer) ForceRealSpan() {}
func (b *backupResumer) ForceRealSpan() bool {
return true
}

// Resume is part of the jobs.Resumer interface.
func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,9 @@ type importResumer struct {
}
}

func (r *importResumer) ForceRealSpan() {}
func (r *importResumer) ForceRealSpan() bool {
return true
}

var _ jobs.TraceableJob = &importResumer{}

Expand Down
4 changes: 3 additions & 1 deletion pkg/cli/debug_job_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import (
var _ jobs.Resumer = &traceSpanResumer{}
var _ jobs.TraceableJob = &traceSpanResumer{}

func (r *traceSpanResumer) ForceRealSpan() {}
func (r *traceSpanResumer) ForceRealSpan() bool {
return true
}

type traceSpanResumer struct {
ctx context.Context
Expand Down
14 changes: 10 additions & 4 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ func (r *Registry) runJob(
typ := job.mu.payload.Type()
job.mu.Unlock()

// Make sure that we remove the job from the running set when this returns.
defer r.unregister(job.ID())

// Bookkeeping.
execCtx, cleanup := r.execCtx("resume-"+taskName, username)
defer cleanup()
Expand All @@ -263,15 +266,19 @@ func (r *Registry) runJob(
//
// A new root span will be created on every resumption of the job.
var spanOptions []tracing.SpanOption
if _, ok := resumer.(TraceableJob); ok {
if tj, ok := resumer.(TraceableJob); ok && tj.ForceRealSpan() {
spanOptions = append(spanOptions, tracing.WithForceRealSpan())
}
// TODO(ajwerner): Move this writing up the trace ID down into
// stepThroughStateMachine where we're already often (and soon with
// exponential backoff, always) updating the job in that call.
ctx, span = r.settings.Tracer.StartSpanCtx(ctx, spanName, spanOptions...)
defer span.Finish()
if err := job.Update(ctx, nil /* txn */, func(txn *kv.Txn, md JobMetadata,
ju *JobUpdater) error {
md.Progress.TraceID = span.TraceID()
ju.UpdateProgress(md.Progress)
progress := *md.Progress
progress.TraceID = span.TraceID()
ju.UpdateProgress(&progress)
return nil
}); err != nil {
return err
Expand All @@ -285,7 +292,6 @@ func (r *Registry) runJob(
if err != nil && ctx.Err() == nil {
log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err)
}
r.unregister(job.ID())
return err
}

Expand Down
13 changes: 9 additions & 4 deletions pkg/jobs/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ import (

// FakeResumer calls optional callbacks during the job lifecycle.
type FakeResumer struct {
OnResume func(context.Context) error
FailOrCancel func(context.Context) error
Success func() error
PauseRequest onPauseRequestFunc
OnResume func(context.Context) error
FailOrCancel func(context.Context) error
Success func() error
PauseRequest onPauseRequestFunc
TraceRealSpan bool
}

func (d FakeResumer) ForceRealSpan() bool {
return d.TraceRealSpan
}

var _ Resumer = FakeResumer{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type StartableJob struct {
type TraceableJob interface {
// ForceRealSpan forces the registry to create a real Span instead of a
// low-overhead non-recordable noop span.
ForceRealSpan()
ForceRealSpan() bool
}

func init() {
Expand Down
52 changes: 52 additions & 0 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ type registryTestSuite struct {
// Instead of a ch for success, use a variable because it can retry since it
// is in a transaction.
successErr error

// controls whether job resumers will ask for a real tracing span.
traceRealSpan bool
}

func noopPauseRequestFunc(
Expand All @@ -187,6 +190,8 @@ func noopPauseRequestFunc(
return nil
}

var _ jobs.TraceableJob = (*jobs.FakeResumer)(nil)

func (rts *registryTestSuite) setUp(t *testing.T) {
rts.ctx = context.Background()

Expand Down Expand Up @@ -217,6 +222,7 @@ func (rts *registryTestSuite) setUp(t *testing.T) {

jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
TraceRealSpan: rts.traceRealSpan,
OnResume: func(ctx context.Context) error {
t.Log("Starting resume")
rts.mu.Lock()
Expand Down Expand Up @@ -878,6 +884,52 @@ func TestRegistryLifecycle(t *testing.T) {
rts.mu.e.ResumeExit++
rts.check(t, jobs.StatusSucceeded)
})
t.Run("fail setting trace ID", func(t *testing.T) {
// The trace ID is set on the job above the state machine loop.
// This tests a regression where we fail to set trace ID and then
// don't clear the in-memory state that we were running this job.
// That prevents the job from being re-run.

rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()
rts.traceRealSpan = true

// Inject an error in the update to record the trace ID.
var failed atomic.Value
failed.Store(false)
rts.beforeUpdate = func(orig, updated jobs.JobMetadata) error {
if !failed.Load().(bool) &&
orig.Progress.TraceID == 0 &&
updated.Progress != nil &&
updated.Progress.TraceID != 0 {
failed.Store(true)
return errors.New("boom")
}
return nil
}

j, err := jobs.TestingCreateAndStartJob(context.Background(), rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
rts.job = j

testutils.SucceedsSoon(t, func() error {
if !failed.Load().(bool) {
return errors.New("not yet failed")
}
return nil
})

// Make sure the job retries and then succeeds.
rts.resumeCheckCh <- struct{}{}
rts.resumeCh <- nil
rts.mu.e.ResumeStart = true
rts.mu.e.ResumeExit++
rts.mu.e.Success = true
rts.check(t, jobs.StatusSucceeded)
})
}

func TestJobLifecycle(t *testing.T) {
Expand Down

0 comments on commit 8f93b19

Please sign in to comment.