Skip to content

Commit

Permalink
jobs: clear job claim after execution
Browse files Browse the repository at this point in the history
Since #89014 the job system reset a job's claim when transitioning it
from pause-requested to paused and from cancel-requested to
reverting. The job system signals these transitions to the running
Resumer by cancelling the job's context and does not wait for the
resumer to exit. Once the claim is clear, another node can adopt the
job and start running it's OnFailOrCancel callback. As a result,
clearing the context makes it more likely that OnFailOrCancel
executions will overlap with Resume executions.

In general, Jobs need to assume that Resume may still be running while
OnFailOrCancel is called. But, making it more likely isn't in our
interest.

Here, we only clear the lease when we exit the job state machine.
This makes it much more likely that OnFailOrCancel doesn't start until
Resume has returned.

Release note: None
  • Loading branch information
stevendanna committed Nov 9, 2022
1 parent 0713a2c commit f0b9e4d
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 10 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ go_test(
"//pkg/sql/types",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -426,7 +427,7 @@ func TestAlterChangefeedTelemetry(t *testing.T) {
feed := testFeed.(cdctest.EnterpriseTestFeed)

require.NoError(t, feed.Pause())

jobutils.WaitForJobToHaveNoLease(t, sqlDB, feed.JobID())
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP bar, foo ADD baz UNSET diff SET resolved, format=json`, feed.JobID()))

counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
Expand Down
41 changes: 36 additions & 5 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,26 +415,57 @@ func (r *Registry) runJob(
log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err)
}

r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err)
r.maybeRecordExecutionFailure(ctx, err, job)
// NB: After this point, the job may no longer have the claim
// and further updates to the job record from this node may
// fail.
r.maybeClearLease(job, err)
r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err)
if r.knobs.AfterJobStateMachine != nil {
r.knobs.AfterJobStateMachine()
}
return err
}

const clearClaimQuery = `
UPDATE system.jobs
SET claim_session_id = NULL, claim_instance_id = NULL
WHERE id = $1
AND claim_session_id = $2
AND claim_instance_id = $3
AND status NOT IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')`

// maybeClearLease clears the claim on the given job, provided that
// the current lease matches our liveness Session.
func (r *Registry) maybeClearLease(job *Job, jobErr error) {
if jobErr == nil {
return
}

// We use the serverCtx here rather than the context from the
// caller since the caller's context may have been canceled.
r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) {
n, err := r.ex.ExecEx(ctx, "clear-job-claim", nil, /* txn */
sessiondata.InternalExecutorOverride{User: username.NodeUserName()},
clearClaimQuery, job.ID(), s.ID().UnsafeBytes(), r.ID())
if err != nil {
log.Warningf(ctx, "could not clear job claim: %s", err.Error())
return
}
log.VEventf(ctx, 2, "cleared leases for %d jobs", n)
})
}

const pauseAndCancelUpdate = `
UPDATE system.jobs
SET status =
SET status =
CASE
WHEN status = '` + string(StatusPauseRequested) + `' THEN '` + string(StatusPaused) + `'
WHEN status = '` + string(StatusCancelRequested) + `' THEN '` + string(StatusReverting) + `'
ELSE status
END,
num_runs = 0,
last_run = NULL,
claim_session_id = NULL,
claim_instance_id = NULL
last_run = NULL
WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `'))
AND ((claim_session_id = $1) AND (claim_instance_id = $2))
RETURNING id, status
Expand Down
51 changes: 48 additions & 3 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -1204,10 +1205,11 @@ func TestJobLifecycle(t *testing.T) {

done := make(chan struct{})
defer close(done)

resumeSignaler := newResumeStartedSignaler()
jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumeSignaler.SignalResumeStarted()
select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -1462,6 +1464,10 @@ func TestJobLifecycle(t *testing.T) {
t.Fatal(err)
}

// Wait for job to be adopted so that we have the
// lease and can move to succeeded.
resumeSignaler.WaitForResumeStarted()

// PauseRequested fails after job is successful.
if err := job.Succeeded(ctx); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -3107,6 +3113,35 @@ func checkBundle(t *testing.T, zipFile string, expectedFiles []string) {
require.Equal(t, expectedFiles, filesInZip)
}

type resumeStartedSignaler struct {
syncutil.Mutex
cond *sync.Cond
isStarted bool
}

func newResumeStartedSignaler() *resumeStartedSignaler {
ret := &resumeStartedSignaler{}
ret.cond = sync.NewCond(&ret.Mutex)
return ret

}

func (r *resumeStartedSignaler) SignalResumeStarted() {
r.Lock()
r.isStarted = true
r.cond.Signal()
r.Unlock()
}

func (r *resumeStartedSignaler) WaitForResumeStarted() {
r.Lock()
for !r.isStarted {
r.cond.Wait()
}
r.isStarted = false
r.Unlock()
}

// TestPauseReason tests pausing a job with a user specified reason.
func TestPauseReason(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand All @@ -3123,10 +3158,11 @@ func TestPauseReason(t *testing.T) {

done := make(chan struct{})
defer close(done)

resumeSignaler := newResumeStartedSignaler()
jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumeSignaler.SignalResumeStarted()
select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -3158,9 +3194,16 @@ func TestPauseReason(t *testing.T) {
return n
}
mustNotHaveClaim := func() {
require.Equal(t, 0, countRowsWithClaimInfo())
t.Helper()
testutils.SucceedsSoon(t, func() error {
if countRowsWithClaimInfo() == 0 {
return nil
}
return errors.New("still waiting for claim to clear")
})
}
mustHaveClaim := func() {
t.Helper()
testutils.SucceedsSoon(t, func() error {
if countRowsWithClaimInfo() == 1 {
return nil
Expand All @@ -3173,6 +3216,7 @@ func TestPauseReason(t *testing.T) {
q := fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID)
tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}})
mustHaveClaim()
resumeSignaler.WaitForResumeStarted()

getStatusAndPayload := func(t *testing.T, id jobspb.JobID) (string, jobspb.Payload) {
var payloadBytes []byte
Expand Down Expand Up @@ -3206,6 +3250,7 @@ func TestPauseReason(t *testing.T) {

checkStatusAndPauseReason(t, jobID, "running", "for testing")
mustHaveClaim()
resumeSignaler.WaitForResumeStarted()
}
{
// Pause the job again with a different reason. Verify that the job is paused with the reason.
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1529,7 +1529,7 @@ func (r *Registry) maybeRecordExecutionFailure(ctx context.Context, err error, j
return
}
if updateErr != nil {
log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, err)
log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, updateErr)
}
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/testutils/jobutils/jobs_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ func waitForJobToHaveStatus(
}, 2*time.Minute)
}

func WaitForJobToHaveNoLease(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) {
t.Helper()
testutils.SucceedsWithin(t, func() error {
var sessionID []byte
var instanceID gosql.NullInt64
db.QueryRow(t, `SELECT claim_session_id, claim_instance_id FROM system.jobs WHERE id = $1`, jobID).Scan(&sessionID, &instanceID)
if sessionID == nil && !instanceID.Valid {
return nil
}
return errors.Newf("job %d still has claim information")
}, 2*time.Minute)
}

// RunJob runs the provided job control statement, initializing, notifying and
// closing the chan at the passed pointer (see below for why) and returning the
// jobID and error result. PAUSE JOB and CANCEL JOB are racy in that it's hard
Expand Down

0 comments on commit f0b9e4d

Please sign in to comment.