Skip to content

Commit

Permalink
Merge #67671
Browse files Browse the repository at this point in the history
67671: jobs: fix bug where failure to record the trace ID prevents adoption, remove RTT r=ajwerner a=ajwerner

In #65322 we added code to record a trace ID into job progress. This code
added a new error return path to runJob which did not unregister the job
from the in-memory state in the registry. This PR fixes that oversight
by deferring that call and allowing jobs to be retried when writing that
trace ID fails.

**jobs: only write the trace ID into jobs if nonzero**
We don't trace most jobs. It's an extra write to the jobs table for each
run to just mark it as 0.

Release note (performance improvement): Eliminated a round-trip when running
most jobs.

Release note (bug fix): Fixed a bug in jobs where failures to write to the
jobs table can prevent subsequent adoption of a job until the previous
node dies or the jobs is paused.

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Jul 16, 2021
2 parents 1487f4f + d20c4eb commit 7fe91fc
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 17 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,9 @@ type backupResumer struct {
var _ jobs.TraceableJob = &backupResumer{}

// ForceRealSpan implements the TraceableJob interface.
func (b *backupResumer) ForceRealSpan() {}
func (b *backupResumer) ForceRealSpan() bool {
return true
}

// Resume is part of the jobs.Resumer interface.
func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,9 @@ type importResumer struct {
}
}

func (r *importResumer) ForceRealSpan() {}
func (r *importResumer) ForceRealSpan() bool {
return true
}

var _ jobs.TraceableJob = &importResumer{}

Expand Down
4 changes: 3 additions & 1 deletion pkg/cli/debug_job_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import (
var _ jobs.Resumer = &traceSpanResumer{}
var _ jobs.TraceableJob = &traceSpanResumer{}

func (r *traceSpanResumer) ForceRealSpan() {}
func (r *traceSpanResumer) ForceRealSpan() bool {
return true
}

type traceSpanResumer struct {
ctx context.Context
Expand Down
26 changes: 17 additions & 9 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ func (r *Registry) runJob(
typ := job.mu.payload.Type()
job.mu.Unlock()

// Make sure that we remove the job from the running set when this returns.
defer r.unregister(job.ID())

// Bookkeeping.
execCtx, cleanup := r.execCtx("resume-"+taskName, username)
defer cleanup()
Expand All @@ -263,18 +266,24 @@ func (r *Registry) runJob(
//
// A new root span will be created on every resumption of the job.
var spanOptions []tracing.SpanOption
if _, ok := resumer.(TraceableJob); ok {
if tj, ok := resumer.(TraceableJob); ok && tj.ForceRealSpan() {
spanOptions = append(spanOptions, tracing.WithForceRealSpan())
}
// TODO(ajwerner): Move this writing up the trace ID down into
// stepThroughStateMachine where we're already often (and soon with
// exponential backoff, always) updating the job in that call.
ctx, span = r.settings.Tracer.StartSpanCtx(ctx, spanName, spanOptions...)
defer span.Finish()
if err := job.Update(ctx, nil /* txn */, func(txn *kv.Txn, md JobMetadata,
ju *JobUpdater) error {
md.Progress.TraceID = span.TraceID()
ju.UpdateProgress(md.Progress)
return nil
}); err != nil {
return err
if span.TraceID() != 0 {
if err := job.Update(ctx, nil /* txn */, func(txn *kv.Txn, md JobMetadata,
ju *JobUpdater) error {
progress := *md.Progress
progress.TraceID = span.TraceID()
ju.UpdateProgress(&progress)
return nil
}); err != nil {
return err
}
}

// Run the actual job.
Expand All @@ -285,7 +294,6 @@ func (r *Registry) runJob(
if err != nil && ctx.Err() == nil {
log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err)
}
r.unregister(job.ID())
return err
}

Expand Down
13 changes: 9 additions & 4 deletions pkg/jobs/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ import (

// FakeResumer calls optional callbacks during the job lifecycle.
type FakeResumer struct {
OnResume func(context.Context) error
FailOrCancel func(context.Context) error
Success func() error
PauseRequest onPauseRequestFunc
OnResume func(context.Context) error
FailOrCancel func(context.Context) error
Success func() error
PauseRequest onPauseRequestFunc
TraceRealSpan bool
}

func (d FakeResumer) ForceRealSpan() bool {
return d.TraceRealSpan
}

var _ Resumer = FakeResumer{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type StartableJob struct {
type TraceableJob interface {
// ForceRealSpan forces the registry to create a real Span instead of a
// low-overhead non-recordable noop span.
ForceRealSpan()
ForceRealSpan() bool
}

func init() {
Expand Down
90 changes: 90 additions & 0 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ type registryTestSuite struct {
// Instead of a ch for success, use a variable because it can retry since it
// is in a transaction.
successErr error

// controls whether job resumers will ask for a real tracing span.
traceRealSpan bool
}

func noopPauseRequestFunc(
Expand All @@ -187,6 +190,8 @@ func noopPauseRequestFunc(
return nil
}

var _ jobs.TraceableJob = (*jobs.FakeResumer)(nil)

func (rts *registryTestSuite) setUp(t *testing.T) {
rts.ctx = context.Background()

Expand Down Expand Up @@ -217,6 +222,7 @@ func (rts *registryTestSuite) setUp(t *testing.T) {

jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
TraceRealSpan: rts.traceRealSpan,
OnResume: func(ctx context.Context) error {
t.Log("Starting resume")
rts.mu.Lock()
Expand Down Expand Up @@ -878,6 +884,90 @@ func TestRegistryLifecycle(t *testing.T) {
rts.mu.e.ResumeExit++
rts.check(t, jobs.StatusSucceeded)
})
t.Run("fail setting trace ID", func(t *testing.T) {
// The trace ID is set on the job above the state machine loop.
// This tests a regression where we fail to set trace ID and then
// don't clear the in-memory state that we were running this job.
// That prevents the job from being re-run.

rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()
rts.traceRealSpan = true

// Inject an error in the update to record the trace ID.
var failed atomic.Value
failed.Store(false)
rts.beforeUpdate = func(orig, updated jobs.JobMetadata) error {
if !failed.Load().(bool) &&
orig.Progress.TraceID == 0 &&
updated.Progress != nil &&
updated.Progress.TraceID != 0 {
failed.Store(true)
return errors.New("boom")
}
return nil
}

j, err := jobs.TestingCreateAndStartJob(context.Background(), rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
rts.job = j

testutils.SucceedsSoon(t, func() error {
if !failed.Load().(bool) {
return errors.New("not yet failed")
}
return nil
})

// Make sure the job retries and then succeeds.
rts.resumeCheckCh <- struct{}{}
rts.resumeCh <- nil
rts.mu.e.ResumeStart = true
rts.mu.e.ResumeExit++
rts.mu.e.Success = true
rts.check(t, jobs.StatusSucceeded)
})
t.Run("trace ID only set if requested", func(t *testing.T) {
// The trace ID can be set on the job if the job should be traced.
// Not all jobs should be traced. If the job is not being traced,
// ensure that we do not do an extra write to set it.

rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()

// Inject an error in the update to record the trace ID.
var updateCalls int
rts.beforeUpdate = func(orig, updated jobs.JobMetadata) error {
updateCalls++
return nil
}

runJob := func(t *testing.T) int {
t.Helper()
j, err := jobs.TestingCreateAndStartJob(context.Background(), rts.registry, rts.s.DB(), rts.mockJob)
require.NoError(t, err)
rts.job = j

// Make sure the job succeeds.
rts.resumeCheckCh <- struct{}{}
rts.resumeCh <- nil
rts.mu.e.ResumeStart = true
rts.mu.e.ResumeExit++
rts.mu.e.Success = true
rts.check(t, jobs.StatusSucceeded)
return updateCalls
}

updatedWithoutTracing := runJob(t)
updateCalls = 0
rts.traceRealSpan = true
updatedWithTracing := runJob(t)
require.Equal(t, updatedWithoutTracing, updatedWithTracing-1)
})
}

func TestJobLifecycle(t *testing.T) {
Expand Down

0 comments on commit 7fe91fc

Please sign in to comment.