Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce idempotency of dispatched jobs using token on dispatch request #10806

Merged
merged 13 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ type WriteOptions struct {
// ctx is an optional context pass through to the underlying HTTP
// request layer. Use Context() and WithContext() to manage this.
ctx context.Context

// IdempotencyToken can be used to ensure the write is idempotent.
IdempotencyToken string
}

// QueryMeta is used to return meta data about a query
Expand Down
35 changes: 35 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1899,6 +1899,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
dispatchJob.Dispatched = true
dispatchJob.Status = ""
dispatchJob.StatusDescription = ""
dispatchJob.DispatchIdempotencyToken = args.IdempotencyToken

// Merge in the meta data
for k, v := range args.Meta {
Expand All @@ -1908,6 +1909,40 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
dispatchJob.Meta[k] = v
}

// Ensure that we have only one dispatched version of this job running concurrently
// by comparing the idempotency token against any non-terminal versions.
alexmunda marked this conversation as resolved.
Show resolved Hide resolved
if args.IdempotencyToken != "" {
alexmunda marked this conversation as resolved.
Show resolved Hide resolved
// Fetch all jobs that match the parameterized job ID prefix
iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID)
if err != nil {
return fmt.Errorf("failed to retrieve jobs for idempotency check")
alexmunda marked this conversation as resolved.
Show resolved Hide resolved
}

// Iterate
for {
raw := iter.Next()
if raw == nil {
break
}

// Ensure the parent ID is an exact match
existingJob := raw.(*structs.Job)
if existingJob.ParentID != dispatchJob.ParentID {
continue
}

// Idempotency tokens match. Ensure existing job is terminal.
if existingJob.DispatchIdempotencyToken == args.IdempotencyToken {
// The existing job is either pending or running.
// Registering a new job would violate the idempotency token.
if existingJob.Status != structs.JobStatusDead {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the benefit of special casing dead jobs? I fear that this becomes an edge case for super short dispatch jobs - where if the job finishes before the caller retries request (due to long exponential backoff sleep), the job gets re-run. Naively, I'd think we should enforce idempotency as long as possible, basically until the child job is garbage collected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point, I hadn't considered the short dispatch case. For our use case on HCP, the idempotency token will map to an action for a specific resource (ie consul-cluster/{uuid}/instance-create). In the case of a transient error in the dispatched job, we'd like to retry immediately, using the same idempotency token. If idempotency is enforced for dead jobs waiting to be GC'd we wouldn't be able to retry as quickly as we'd prefer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. So there are two types of transient failures:

First, the dispatch request fails with network transient error (e.g. network or timeout) where the dispatch request may have failed but may also have succeed but the caller wasn't informed of the success. The API consumer will want to retry with idempotent token to avoid repeating the side-effects. I assume here we want to enforce the token for the longest reasonable time, as it never hurts.

The second failure is if the dispatch request succeeded but the dispatched job failed with a transient failure (e.g. instance creation failed for transient error and should be retried). Here, api consumer want to retry as soon as the job failed.

Is the second failure case that's driving this design? I would have hoped that the restart block addressed this case without the API consumers retrying dispatching jobs manually.

Also, you can achieve the second failure handling semantics even if idempotent token is enforced for dead jobs. e.g. you can have consul-cluster/{resource uuid}/instance-create/{request uuid}, where {request uuid} is updated when the dispatch job failed and you want to retry creating the same resource.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, totally forgot about the restart block and your {request uuid} suggestion makes perfect sense so I will remove the check for job status and update the comment to say something about the job not being GC'd yet.

return fmt.Errorf("idempotent dispatch failed: another child job with this token is running or pending: %s", existingJob.ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return an error here? How should clients cope with this error? It might be best to return info of the existing job info.

The rationale is that clients should ensure that tokens are unique (e.g. uuids, etc) and this condition should only occur if the caller is retrying a request that failed due to transient error (e.g. network error, request timeout). As such, it will be nicer to return the original dispatch request results. If we return an error here (which Nomad codes as HTTP 500 code sadly :( ), api consumers will need to special case idempotency related errors (vs keep retrying) and need to parse the error to find the job id and potentially make a follow up query to get result data. Returning the existing job ID in such a case seems to be more ergonomics.

Returning success code seems to match EC2 RunInstance idempotent request behavior. I'd be curious to know how other common APIs handle retried idempotent requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am open to returning the existing job info as I do think that is what most APIs do for idempotent operations.
I do have 1 question about the reply in this case: do we need to fetch the current eval for the existing job if it's not periodic? It appears that non-periodic dispatches return the eval id etc

nomad/nomad/job_endpoint.go

Lines 1999 to 2001 in 4448cff

reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I'd skip eval fields in this PR for now.

The eval creation code needs to change so the job dispatch and eval creation is atomic - I missed this case in #8435 . I can follow up there and re-examine this path again. Thanks!

}
}
}

}

// Compress the payload
dispatchJob.Payload = snappy.Encode(nil, args.Payload)

Expand Down
73 changes: 65 additions & 8 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6134,13 +6134,19 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
Payload: make([]byte, DispatchPayloadSizeLimit+100),
}

type existingIdempotentChildJob struct {
isTerminal bool
}

type testCase struct {
name string
parameterizedJob *structs.Job
dispatchReq *structs.JobDispatchRequest
noEval bool
err bool
errStr string
name string
parameterizedJob *structs.Job
dispatchReq *structs.JobDispatchRequest
noEval bool
err bool
errStr string
idempotencyToken string
existingIdempotentJob *existingIdempotentChildJob
}
cases := []testCase{
{
Expand Down Expand Up @@ -6233,6 +6239,35 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
err: true,
errStr: "stopped",
},
{
name: "idempotency token, no existing child job",
parameterizedJob: d1,
dispatchReq: reqInputDataNoMeta,
err: false,
idempotencyToken: "foo",
existingIdempotentJob: nil,
},
{
name: "idempotency token, w/ existing non-terminal child job",
parameterizedJob: d1,
dispatchReq: reqInputDataNoMeta,
err: true,
errStr: "idempotent dispatch failed: another child job with this token is running or pending",
idempotencyToken: "foo",
existingIdempotentJob: &existingIdempotentChildJob{
isTerminal: false,
},
},
{
name: "idempotency token, w/ existing terminal job",
parameterizedJob: d1,
dispatchReq: reqInputDataNoMeta,
err: false,
idempotencyToken: "foo",
existingIdempotentJob: &existingIdempotentChildJob{
isTerminal: true,
},
},
}

for _, tc := range cases {
Expand Down Expand Up @@ -6262,8 +6297,30 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
// Now try to dispatch
tc.dispatchReq.JobID = tc.parameterizedJob.ID
tc.dispatchReq.WriteRequest = structs.WriteRequest{
Region: "global",
Namespace: tc.parameterizedJob.Namespace,
Region: "global",
Namespace: tc.parameterizedJob.Namespace,
IdempotencyToken: tc.idempotencyToken,
}

// Dispatch with the same request so a child job w/ the idempotency key exists
if tc.existingIdempotentJob != nil {
var initialDispatchResp structs.JobDispatchResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &initialDispatchResp); err != nil {
t.Fatalf("Unexpected error dispatching initial idempotent job: %v", err)
}

if tc.existingIdempotentJob.isTerminal {
eval, err := s1.State().EvalByID(nil, initialDispatchResp.EvalID)
if err != nil {
t.Fatalf("Unexpected error fetching eval %v", err)
}
eval = eval.Copy()
eval.Status = structs.EvalStatusComplete
err = s1.State().UpsertEvals(structs.MsgTypeTestSetup, initialDispatchResp.Index+1, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("Unexpected error completing eval %v", err)
}
}
}

var dispatchResp structs.JobDispatchResponse
Expand Down
7 changes: 7 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,9 @@ type WriteRequest struct {
// AuthToken is secret portion of the ACL token used for the request
AuthToken string

// IdempotencyToken can be used to ensure the write is idempotent.
IdempotencyToken string

InternalRpcInfo
}

Expand Down Expand Up @@ -4016,6 +4019,10 @@ type Job struct {
// parameterized job.
Dispatched bool

// DispatchIdempotencyToken is optionally used to ensure that a dispatched job does not have any
// non-terminal siblings which have the same token value.
DispatchIdempotencyToken string

// Payload is the payload supplied when the job was dispatched.
Payload []byte

Expand Down
3 changes: 3 additions & 0 deletions vendor/github.com/hashicorp/nomad/api/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.