Skip to content

Commit

Permalink
Merge #92121
Browse files Browse the repository at this point in the history
92121: jobs: clear claim for already-dead paused jobs r=ajwerner a=stevendanna

Previously we only cleared the claim after the state machine returned and only if the status wasn't pause-requested or
cancel-requested. This filter on status, however, was unnecessary.

The job may still be in the cancel-requested or pause-requested state when we go to clear the claim because the transaction that resulted in the canceled context may not have completed. But, it is still fine to clear the claim. There are 1 of two cases:

1) Either the transaction that cancelled us fails and we are thus
   still in the state cancel-requested or paused-requested with no
   claim. This is fine. The claim-jobs loop will claim the job and we will then move
   the state to paused or reverting, just with no context to cancel.

2) The transaction succeeds and we are in paused or reverting without
   a claim set. Just as we wanted.

Here we remove the where clause to always clear the claim when we return from the state machine.

In the case of (1), when processing the cancel-requested or paused-requested state the second time, we may still want the claim cleared. Here, we make sure it gets cleared even in the case where there is no running job that actually needs to be canceled.

Fixes #92112

Epic: None

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Nov 22, 2022
2 parents 0b9cd0a + 79ad3fd commit a9080f2
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 93 deletions.
29 changes: 23 additions & 6 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,22 +434,24 @@ const clearClaimQuery = `
SET claim_session_id = NULL, claim_instance_id = NULL
WHERE id = $1
AND claim_session_id = $2
AND claim_instance_id = $3
AND status NOT IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')`
AND claim_instance_id = $3`

// maybeClearLease clears the claim on the given job, provided that
// the current lease matches our liveness Session.
func (r *Registry) maybeClearLease(job *Job, jobErr error) {
if jobErr == nil {
return
}
r.clearLeaseForJobID(job.ID(), nil /* txn */)
}

func (r *Registry) clearLeaseForJobID(jobID jobspb.JobID, txn *kv.Txn) {
// We use the serverCtx here rather than the context from the
// caller since the caller's context may have been canceled.
r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) {
n, err := r.ex.ExecEx(ctx, "clear-job-claim", nil, /* txn */
n, err := r.ex.ExecEx(ctx, "clear-job-claim", txn,
sessiondata.InternalExecutorOverride{User: username.NodeUserName()},
clearClaimQuery, job.ID(), s.ID().UnsafeBytes(), r.ID())
clearClaimQuery, jobID, s.ID().UnsafeBytes(), r.ID())
if err != nil {
log.Warningf(ctx, "could not clear job claim: %s", err.Error())
return
Expand Down Expand Up @@ -497,11 +499,26 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
statusString := *row[1].(*tree.DString)
switch Status(statusString) {
case StatusPaused:
r.cancelRegisteredJobContext(id)
if !r.cancelRegisteredJobContext(id) {
// If we didn't already have a running job for this lease,
// clear out the lease here since it won't be cleared be
// cleared out on Resume exit.
r.clearLeaseForJobID(id, txn)
}
log.Infof(ctx, "job %d, session %s: paused", id, s.ID())
case StatusReverting:
if err := job.Update(ctx, txn, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error {
r.cancelRegisteredJobContext(id)
if !r.cancelRegisteredJobContext(id) {
// If we didn't already have a running job for this
// lease, clear out the lease here since it won't be
// cleared be cleared out on Resume exit.
//
// NB: This working as part of the update depends on
// the fact that the job struct does not have a
// claim set and thus won't validate the claim on
// update.
r.clearLeaseForJobID(id, txn)
}
md.Payload.Error = errJobCanceled.Error()
encodedErr := errors.EncodeError(ctx, errJobCanceled)
md.Payload.FinalResumeError = &encodedErr
Expand Down
142 changes: 57 additions & 85 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,42 @@ func TestRegistryLifecycle(t *testing.T) {

<-completeCh
})
t.Run("job with created by fields", func(t *testing.T) {
createdByType := "internal_test"
rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()

resumerJob := make(chan *jobs.Job, 1)
jobs.RegisterConstructor(
jobspb.TypeBackup, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumerJob <- j
return nil
},
}
}, jobs.UsesTenantCostControl)

jobID := rts.registry.MakeJobID()
record := jobs.Record{
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
CreatedBy: &jobs.CreatedByInfo{Name: createdByType, ID: 123},
}
job, err := rts.registry.CreateAdoptableJobWithTxn(rts.ctx, record, jobID, nil /* txn */)
require.NoError(t, err)

loadedJob, err := rts.registry.LoadJob(rts.ctx, jobID)
require.NoError(t, err)
require.NotNil(t, loadedJob.CreatedBy())
require.Equal(t, job.CreatedBy(), loadedJob.CreatedBy())
rts.registry.TestingNudgeAdoptionQueue()
resumedJob := <-resumerJob
require.NotNil(t, resumedJob.CreatedBy())
require.Equal(t, job.CreatedBy(), resumedJob.CreatedBy())

})
}

func checkTraceFiles(t *testing.T, registry *jobs.Registry, expectedNumFiles int) {
Expand Down Expand Up @@ -1220,21 +1256,27 @@ func checkTraceFiles(t *testing.T, registry *jobs.Registry, expectedNumFiles int
}
}

// TestJobLifecycle tests the invariants about the job lifecycle
// querires. It does not depend on the registries job management tasks
// and assumes that it maintains the lease on the job through all
// state transitions.
func TestJobLifecycle(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer jobs.ResetConstructors()()

ctx := context.Background()

s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
params, _ := tests.CreateTestServerParams()
params.Knobs.JobsTestingKnobs = &jobs.TestingKnobs{DisableRegistryLifecycleManagent: true}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

registry := s.JobRegistry().(*jobs.Registry)

createJob := func(record jobs.Record) (*jobs.Job, expectation) {
beforeTime := timeutil.Now()
job, err := registry.CreateAdoptableJobWithTxn(ctx, record, registry.MakeJobID(), nil /* txn */)
job, err := registry.CreateJobWithTxn(ctx, record, registry.MakeJobID(), nil /* txn */)
require.NoError(t, err)
payload := job.Payload()
return job, expectation{
Expand All @@ -1256,38 +1298,6 @@ func TestJobLifecycle(t *testing.T) {
return createJob(defaultRecord)
}

done := make(chan struct{})
defer close(done)
resumeSignaler := newResumeStartedSignaler()
jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumeSignaler.SignalResumeStarted()
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
return nil
}
},
}
}, jobs.UsesTenantCostControl)

startLeasedJob := func(t *testing.T, record jobs.Record) (*jobs.StartableJob, expectation) {
beforeTime := timeutil.Now()
job, err := jobs.TestingCreateAndStartJob(ctx, registry, s.DB(), record)
if err != nil {
t.Fatal(err)
}
payload := job.Payload()
return job, expectation{
DB: sqlDB,
Record: record,
Type: payload.Type(),
Before: beforeTime,
}
}

t.Run("valid job lifecycles succeed", func(t *testing.T) {
// Woody is a successful job.
woodyPride, _ := username.MakeSQLUsernameFromUserInput("Woody Pride", username.PurposeValidation)
Expand Down Expand Up @@ -1495,7 +1505,7 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("cancelable jobs can be paused until finished", func(t *testing.T) {
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()

if err := registry.PauseRequested(ctx, nil, job.ID(), ""); err != nil {
t.Fatal(err)
Expand All @@ -1517,10 +1527,6 @@ func TestJobLifecycle(t *testing.T) {
t.Fatal(err)
}

// Wait for job to be adopted so that we have the
// lease and can move to succeeded.
resumeSignaler.WaitForResumeStarted()

// PauseRequested fails after job is successful.
if err := job.Succeeded(ctx); err != nil {
t.Fatal(err)
Expand All @@ -1532,7 +1538,7 @@ func TestJobLifecycle(t *testing.T) {

t.Run("cancelable jobs can be canceled until finished", func(t *testing.T) {
{
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
if err := registry.CancelRequested(ctx, nil, job.ID()); err != nil {
t.Fatal(err)
}
Expand All @@ -1542,7 +1548,7 @@ func TestJobLifecycle(t *testing.T) {
}

{
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
if err := job.Started(ctx); err != nil {
t.Fatal(err)
}
Expand All @@ -1555,7 +1561,7 @@ func TestJobLifecycle(t *testing.T) {
}

{
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
if err := registry.PauseRequested(ctx, nil, job.ID(), ""); err != nil {
t.Fatal(err)
}
Expand All @@ -1571,7 +1577,7 @@ func TestJobLifecycle(t *testing.T) {
}

{
job, _ := startLeasedJob(t, defaultRecord)
job, _ := createDefaultJob()
if err := job.Succeeded(ctx); err != nil {
t.Fatal(err)
}
Expand All @@ -1584,7 +1590,7 @@ func TestJobLifecycle(t *testing.T) {

t.Run("unpaused jobs cannot be resumed", func(t *testing.T) {
{
job, _ := startLeasedJob(t, defaultRecord)
job, _ := createDefaultJob()
if err := registry.CancelRequested(ctx, nil, job.ID()); err != nil {
t.Fatal(err)
}
Expand All @@ -1594,7 +1600,7 @@ func TestJobLifecycle(t *testing.T) {
}

{
job, _ := startLeasedJob(t, defaultRecord)
job, _ := createDefaultJob()
if err := job.Succeeded(ctx); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1715,7 +1721,7 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("progress on paused job fails", func(t *testing.T) {
job, _ := startLeasedJob(t, defaultRecord)
job, _ := createDefaultJob()
if err := registry.PauseRequested(ctx, nil, job.ID(), ""); err != nil {
t.Fatal(err)
}
Expand All @@ -1727,7 +1733,7 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("progress on canceled job fails", func(t *testing.T) {
job, _ := startLeasedJob(t, defaultRecord)
job, _ := createDefaultJob()
if err := registry.CancelRequested(ctx, nil, job.ID()); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1759,7 +1765,7 @@ func TestJobLifecycle(t *testing.T) {
updateStatusStmt := `UPDATE system.jobs SET status = $1 WHERE id = $2`

t.Run("set details works", func(t *testing.T) {
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning))
newDetails := jobspb.ImportDetails{URIs: []string{"new"}}
exp.Record.Details = newDetails
Expand All @@ -1775,7 +1781,7 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("set details fails", func(t *testing.T) {
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning))
_, err := exp.DB.Exec(updateStatusStmt, jobs.StatusCancelRequested, job.ID())
require.NoError(t, err)
Expand All @@ -1784,7 +1790,7 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("set progress works", func(t *testing.T) {
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning))
newProgress := jobspb.ImportProgress{ResumePos: []int64{42}}
exp.Record.Progress = newProgress
Expand All @@ -1799,47 +1805,13 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("set progress fails", func(t *testing.T) {
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning))
_, err := exp.DB.Exec(updateStatusStmt, jobs.StatusPauseRequested, job.ID())
require.NoError(t, err)
require.Error(t, job.SetProgress(ctx, nil /* txn */, jobspb.ImportProgress{ResumePos: []int64{42}}))
require.NoError(t, exp.verify(job.ID(), jobs.StatusPauseRequested))
})

t.Run("job with created by fields", func(t *testing.T) {
createdByType := "internal_test"

resumerJob := make(chan *jobs.Job, 1)
jobs.RegisterConstructor(
jobspb.TypeBackup, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumerJob <- j
return nil
},
}
}, jobs.UsesTenantCostControl)

jobID := registry.MakeJobID()
record := jobs.Record{
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
CreatedBy: &jobs.CreatedByInfo{Name: createdByType, ID: 123},
}
job, err := registry.CreateAdoptableJobWithTxn(ctx, record, jobID, nil /* txn */)
require.NoError(t, err)

loadedJob, err := registry.LoadJob(ctx, jobID)
require.NoError(t, err)
require.NotNil(t, loadedJob.CreatedBy())
require.Equal(t, job.CreatedBy(), loadedJob.CreatedBy())
registry.TestingNudgeAdoptionQueue()
resumedJob := <-resumerJob
require.NotNil(t, resumedJob.CreatedBy())
require.Equal(t, job.CreatedBy(), resumedJob.CreatedBy())

})
}

// TestShowJobs manually inserts a row into system.jobs and checks that the
Expand Down
10 changes: 8 additions & 2 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,10 @@ func (r *Registry) withSession(ctx context.Context, f withSessionFunc) {
// jobs if it observes a failure. Otherwise it starts all the main daemons of
// registry that poll the jobs table and start/cancel/gc jobs.
func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error {
if r.knobs.DisableRegistryLifecycleManagent {
return nil
}

// Since the job polling system is outside user control, exclude it from cost
// accounting and control. Individual jobs are not part of this exclusion.
ctx = multitenant.WithTenantCostControlExemption(ctx)
Expand Down Expand Up @@ -1504,12 +1508,14 @@ func (r *Registry) unregister(jobID jobspb.JobID) {
}
}

func (r *Registry) cancelRegisteredJobContext(jobID jobspb.JobID) {
func (r *Registry) cancelRegisteredJobContext(jobID jobspb.JobID) bool {
r.mu.Lock()
defer r.mu.Unlock()
if aj, ok := r.mu.adoptedJobs[jobID]; ok {
aj, ok := r.mu.adoptedJobs[jobID]
if ok {
aj.cancel()
}
return ok
}

func (r *Registry) getClaimedJob(jobID jobspb.JobID) (*Job, error) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/jobs/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type TestingKnobs struct {
// DisableAdoptions disables job adoptions.
DisableAdoptions bool

// DisableRegistryLifecycleManagement
DisableRegistryLifecycleManagent bool

// BeforeWaitForJobsQuery is called once per invocation of the
// poll-show-jobs query in WaitForJobs.
BeforeWaitForJobsQuery func()
Expand Down

0 comments on commit a9080f2

Please sign in to comment.