Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce idempotency of dispatched jobs using token on dispatch request #10806

Merged
merged 13 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,22 @@ func (j *Jobs) Dispatch(jobID string, meta map[string]string,
return &resp, wm, nil
}

func (j *Jobs) DispatchIdempotent(jobID string, meta map[string]string,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a new method here is good for backwards compatibility, but I'm wondering if in the long term it would be better to add a token to the WriteOptions struct instead. That would make it possible for us to add idempotency to any kind of write API in the future without adding a bunch of new methods.

Would love to hear thoughts from @DerekStrickland @schmichael @notnoop on that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of putting the token on the WriteOptions. If feels like it enables an almost aspect oriented approach for handling this generically at more of a framework level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to WriteOptions!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alexmunda !

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm conflicted about having the field in WriteOptions and slightly lean towards how the PR had it before under JobDispatchRequest. Idempotency is such a common concern, so belonging to WriteOptions make sense. Though, it currently only applies to dispatch jobs; other APIs address a similar but not an exact problem using CAS and ModifyIndex (e.g. Update Scheduler Configuration endpoint).

I'm fine with having it in WriteOptions, as it's probably the pattern we want to use moving forward - but would recommend calling out that only the dispatch endpoint supports it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@notnoop I am open to either approach 😄. With the current WriteOptions approach, where would the best place be to call that out? Comment on the struct field?

payload []byte, idempotencyToken string, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) {
var resp JobDispatchResponse
req := &JobDispatchRequest{
JobID: jobID,
Meta: meta,
Payload: payload,
IdempotencyToken: idempotencyToken,
}
wm, err := j.client.write("/v1/job/"+url.PathEscape(jobID)+"/dispatch", req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}

// Revert is used to revert the given job to the passed version. If
// enforceVersion is set, the job is only reverted if the current version is at
// the passed version.
Expand Down Expand Up @@ -1262,9 +1278,10 @@ type DesiredUpdates struct {
}

type JobDispatchRequest struct {
JobID string
Payload []byte
Meta map[string]string
JobID string
Payload []byte
Meta map[string]string
IdempotencyToken string
}

type JobDispatchResponse struct {
Expand Down
34 changes: 34 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1899,6 +1899,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
dispatchJob.Dispatched = true
dispatchJob.Status = ""
dispatchJob.StatusDescription = ""
dispatchJob.DispatchIdempotencyToken = args.IdempotencyToken

// Merge in the meta data
for k, v := range args.Meta {
Expand All @@ -1908,6 +1909,39 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
dispatchJob.Meta[k] = v
}

// Check to see if an idempotency token was specified on the request
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments on this section are kind of the i++ // increment i by 1 sort. I'd consider adding a comment to the top of this block that says describes the goal instead:

// Ensure that we have only one dispatched version of this job running concurrently
// by comparing the idempotency token against any non-terminal versions.

if args.IdempotencyToken != "" {
// Fetch all jobs that match the parameterized job ID prefix
iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID)
if err != nil {
return fmt.Errorf("failed to retrieve jobs for idempotency check")
alexmunda marked this conversation as resolved.
Show resolved Hide resolved
}

// Iterate
for {
raw := iter.Next()
if raw == nil {
break
}

// Ensure the parent ID is an exact match
existingJob := raw.(*structs.Job)
if existingJob.ParentID != dispatchJob.ParentID {
continue
}

// Idempotency tokens match. Ensure existing job is terminal.
if existingJob.DispatchIdempotencyToken == args.IdempotencyToken {
// The existing job is either pending or running.
// Registering a new job would violate the idempotency token.
if existingJob.Status != structs.JobStatusDead {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the benefit of special casing dead jobs? I fear that this becomes an edge case for super short dispatch jobs - where if the job finishes before the caller retries request (due to long exponential backoff sleep), the job gets re-run. Naively, I'd think we should enforce idempotency as long as possible, basically until the child job is garbage collected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point, I hadn't considered the short dispatch case. For our use case on HCP, the idempotency token will map to an action for a specific resource (ie consul-cluster/{uuid}/instance-create). In the case of a transient error in the dispatched job, we'd like to retry immediately, using the same idempotency token. If idempotency is enforced for dead jobs waiting to be GC'd we wouldn't be able to retry as quickly as we'd prefer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. So there are two types of transient failures:

First, the dispatch request fails with network transient error (e.g. network or timeout) where the dispatch request may have failed but may also have succeed but the caller wasn't informed of the success. The API consumer will want to retry with idempotent token to avoid repeating the side-effects. I assume here we want to enforce the token for the longest reasonable time, as it never hurts.

The second failure is if the dispatch request succeeded but the dispatched job failed with a transient failure (e.g. instance creation failed for transient error and should be retried). Here, api consumer want to retry as soon as the job failed.

Is the second failure case that's driving this design? I would have hoped that the restart block addressed this case without the API consumers retrying dispatching jobs manually.

Also, you can achieve the second failure handling semantics even if idempotent token is enforced for dead jobs. e.g. you can have consul-cluster/{resource uuid}/instance-create/{request uuid}, where {request uuid} is updated when the dispatch job failed and you want to retry creating the same resource.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, totally forgot about the restart block and your {request uuid} suggestion makes perfect sense so I will remove the check for job status and update the comment to say something about the job not being GC'd yet.

return fmt.Errorf("dispatch violates idempotency token of non-terminal child job: %s", existingJob.ID)
alexmunda marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

}

// Compress the payload
dispatchJob.Payload = snappy.Encode(nil, args.Payload)

Expand Down
67 changes: 61 additions & 6 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6133,14 +6133,22 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
reqInputDataTooLarge := &structs.JobDispatchRequest{
Payload: make([]byte, DispatchPayloadSizeLimit+100),
}
reqIdempotentToken := &structs.JobDispatchRequest{
IdempotencyToken: "foo",
}

type existingIdempotentChildJob struct {
isTerminal bool
}

type testCase struct {
name string
parameterizedJob *structs.Job
dispatchReq *structs.JobDispatchRequest
noEval bool
err bool
errStr string
name string
parameterizedJob *structs.Job
dispatchReq *structs.JobDispatchRequest
noEval bool
err bool
errStr string
existingIdempotentJob *existingIdempotentChildJob
}
cases := []testCase{
{
Expand Down Expand Up @@ -6233,6 +6241,32 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
err: true,
errStr: "stopped",
},
{
name: "idempotency token, no existing child job",
parameterizedJob: d1,
dispatchReq: reqIdempotentToken,
err: false,
existingIdempotentJob: nil,
},
{
name: "idempotency token, w/ existing non-terminal child job",
parameterizedJob: d1,
dispatchReq: reqIdempotentToken,
err: true,
errStr: "dispatch violates idempotency token of non-terminal child job",
existingIdempotentJob: &existingIdempotentChildJob{
isTerminal: false,
},
},
{
name: "idempotency token, w/ existing terminal job",
parameterizedJob: d1,
dispatchReq: reqIdempotentToken,
err: false,
existingIdempotentJob: &existingIdempotentChildJob{
isTerminal: true,
},
},
}

for _, tc := range cases {
Expand Down Expand Up @@ -6266,6 +6300,27 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
Namespace: tc.parameterizedJob.Namespace,
}

// Dispatch with the same request so a child job w/ the idempotency key exists
if tc.existingIdempotentJob != nil {
var initialDispatchResp structs.JobDispatchResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &initialDispatchResp); err != nil {
t.Fatalf("Unexpected error dispatching initial idempotent job: %v", err)
}

if tc.existingIdempotentJob.isTerminal {
eval, err := s1.State().EvalByID(nil, initialDispatchResp.EvalID)
if err != nil {
t.Fatalf("Unexpected error fetching eval %v", err)
}
eval = eval.Copy()
eval.Status = structs.EvalStatusComplete
err = s1.State().UpsertEvals(structs.MsgTypeTestSetup, initialDispatchResp.Index+1, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("Unexpected error completing eval %v", err)
}
}
}

var dispatchResp structs.JobDispatchResponse
dispatchErr := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &dispatchResp)

Expand Down
11 changes: 8 additions & 3 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,9 +728,10 @@ type JobScaleStatusRequest struct {

// JobDispatchRequest is used to dispatch a job based on a parameterized job
type JobDispatchRequest struct {
JobID string
Payload []byte
Meta map[string]string
JobID string
Payload []byte
Meta map[string]string
IdempotencyToken string
WriteRequest
}

Expand Down Expand Up @@ -4016,6 +4017,10 @@ type Job struct {
// parameterized job.
Dispatched bool

// DispatchIdempotencyToken is optionally used to ensure that a dispatched job does not have any
// non-terminal siblings which have the same token value.
DispatchIdempotencyToken string

// Payload is the payload supplied when the job was dispatched.
Payload []byte

Expand Down
23 changes: 20 additions & 3 deletions vendor/github.com/hashicorp/nomad/api/jobs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.