diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index da5da78e548a..fb19bffc0460 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -415,6 +415,7 @@ func (r *Registry) runJob( log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err) } + r.maybeClearLease(job, err) r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) r.maybeRecordExecutionFailure(ctx, err, job) if r.knobs.AfterJobStateMachine != nil { @@ -423,18 +424,53 @@ func (r *Registry) runJob( return err } +const clearClaimQuery = ` + UPDATE system.jobs + SET claim_session_id = NULL, claim_instance_id = NULL + WHERE id = $1 + AND claim_session_id = $2 + AND claim_instance_id = $3 + AND status NOT IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')` + +// maybeClearLease clears the claim on the given job, provided that +// the current lease matches our liveness Session. +func (r *Registry) maybeClearLease(job *Job, jobErr error) { + if jobErr == nil { + return + } + + // We use the serverCtx here rather than the context from the + // caller since the caller's context may have been canceled. + r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) { + err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if err := txn.SetUserPriority(roachpb.MinUserPriority); err != nil { + return errors.WithAssertionFailure(err) + } + n, err := r.ex.ExecEx(ctx, "clear-job-claim", txn, + sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, + clearClaimQuery, job.ID(), s.ID().UnsafeBytes(), r.ID()) + if err != nil { + return err + } + log.VEventf(ctx, 2, "cleared leases for %d jobs", n) + return nil + }) + if err != nil { + log.Warningf(ctx, "could not clear job claim: %s", err.Error()) + } + }) +} + const pauseAndCancelUpdate = ` UPDATE system.jobs - SET status = + SET status = CASE WHEN status = '` + string(StatusPauseRequested) + `' THEN '` + string(StatusPaused) + `' WHEN status = '` + string(StatusCancelRequested) + `' THEN '` + string(StatusReverting) + `' ELSE status END, num_runs = 0, - last_run = NULL, - claim_session_id = NULL, - claim_instance_id = NULL + last_run = NULL WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')) AND ((claim_session_id = $1) AND (claim_instance_id = $2)) RETURNING id, status diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 243cab6fd063..97853fd0abe2 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -3160,9 +3160,16 @@ func TestPauseReason(t *testing.T) { return n } mustNotHaveClaim := func() { - require.Equal(t, 0, countRowsWithClaimInfo()) + t.Helper() + testutils.SucceedsSoon(t, func() error { + if countRowsWithClaimInfo() == 0 { + return nil + } + return errors.New("still waiting for claim to clear") + }) } mustHaveClaim := func() { + t.Helper() testutils.SucceedsSoon(t, func() error { if countRowsWithClaimInfo() == 1 { return nil