Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce idempotency of dispatched jobs using token on dispatch request #10806

Merged
merged 13 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ type WriteOptions struct {
// ctx is an optional context pass through to the underlying HTTP
// request layer. Use Context() and WithContext() to manage this.
ctx context.Context

// IdempotencyToken can be used to ensure the write is idempotent.
IdempotencyToken string
}

// QueryMeta is used to return meta data about a query
Expand Down
39 changes: 39 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1890,6 +1890,44 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
return err
}

// Ensure that we have only one dispatched version of this job running concurrently
// by comparing the idempotency token against any non-terminal versions.
alexmunda marked this conversation as resolved.
Show resolved Hide resolved
if args.IdempotencyToken != "" {
alexmunda marked this conversation as resolved.
Show resolved Hide resolved
// Fetch all jobs that match the parameterized job ID prefix
iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID)
if err != nil {
errMsg := "failed to retrieve jobs for idempotency check"
j.logger.Error(errMsg, "error", err)
return fmt.Errorf(errMsg)
}

// Iterate
for {
raw := iter.Next()
if raw == nil {
break
}

// Ensure the parent ID is an exact match
existingJob := raw.(*structs.Job)
if existingJob.ParentID != parameterizedJob.ID {
continue
}

// Idempotency tokens match
if existingJob.DispatchIdempotencyToken == args.IdempotencyToken {
// The existing job has not yet been garbage collected.
// Registering a new job would violate the idempotency token.
// Return the existing job.
reply.JobCreateIndex = existingJob.CreateIndex
reply.DispatchedJobID = existingJob.ID
reply.Index = existingJob.ModifyIndex
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@notnoop Should reply.Index be ModifyIndex or CreateIndex? I went with ModifyIndex so the caller can diff the create and modify indexes and tell that an existing job was returned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ModifyIndex is good as well. Thanks!


return nil
}
}
}

// Derive the child job and commit it via Raft - with initial status
dispatchJob := parameterizedJob.Copy()
dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now())
Expand All @@ -1899,6 +1937,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
dispatchJob.Dispatched = true
dispatchJob.Status = ""
dispatchJob.StatusDescription = ""
dispatchJob.DispatchIdempotencyToken = args.IdempotencyToken

// Merge in the meta data
for k, v := range args.Meta {
Expand Down
85 changes: 77 additions & 8 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6134,13 +6134,19 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
Payload: make([]byte, DispatchPayloadSizeLimit+100),
}

type existingIdempotentChildJob struct {
isTerminal bool
}

type testCase struct {
name string
parameterizedJob *structs.Job
dispatchReq *structs.JobDispatchRequest
noEval bool
err bool
errStr string
name string
parameterizedJob *structs.Job
dispatchReq *structs.JobDispatchRequest
noEval bool
err bool
errStr string
idempotencyToken string
existingIdempotentJob *existingIdempotentChildJob
}
cases := []testCase{
{
Expand Down Expand Up @@ -6233,6 +6239,36 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
err: true,
errStr: "stopped",
},
{
name: "idempotency token, no existing child job",
parameterizedJob: d1,
dispatchReq: reqInputDataNoMeta,
err: false,
idempotencyToken: "foo",
existingIdempotentJob: nil,
},
{
name: "idempotency token, w/ existing non-terminal child job",
parameterizedJob: d1,
dispatchReq: reqInputDataNoMeta,
err: false,
idempotencyToken: "foo",
existingIdempotentJob: &existingIdempotentChildJob{
isTerminal: false,
},
noEval: true,
},
{
name: "idempotency token, w/ existing terminal job",
parameterizedJob: d1,
dispatchReq: reqInputDataNoMeta,
err: false,
idempotencyToken: "foo",
existingIdempotentJob: &existingIdempotentChildJob{
isTerminal: true,
},
noEval: true,
},
}

for _, tc := range cases {
Expand Down Expand Up @@ -6262,8 +6298,30 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
// Now try to dispatch
tc.dispatchReq.JobID = tc.parameterizedJob.ID
tc.dispatchReq.WriteRequest = structs.WriteRequest{
Region: "global",
Namespace: tc.parameterizedJob.Namespace,
Region: "global",
Namespace: tc.parameterizedJob.Namespace,
IdempotencyToken: tc.idempotencyToken,
}

// Dispatch with the same request so a child job w/ the idempotency key exists
var initialIdempotentDispatchResp structs.JobDispatchResponse
if tc.existingIdempotentJob != nil {
if err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &initialIdempotentDispatchResp); err != nil {
t.Fatalf("Unexpected error dispatching initial idempotent job: %v", err)
}

if tc.existingIdempotentJob.isTerminal {
eval, err := s1.State().EvalByID(nil, initialIdempotentDispatchResp.EvalID)
if err != nil {
t.Fatalf("Unexpected error fetching eval %v", err)
}
eval = eval.Copy()
eval.Status = structs.EvalStatusComplete
err = s1.State().UpsertEvals(structs.MsgTypeTestSetup, initialIdempotentDispatchResp.Index+1, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("Unexpected error completing eval %v", err)
}
}
}

var dispatchResp structs.JobDispatchResponse
Expand Down Expand Up @@ -6315,6 +6373,17 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
t.Fatal("parameter job config should exist")
}

// Check that the existing job is returned in the case of a supplied idempotency token
if tc.idempotencyToken != "" && tc.existingIdempotentJob != nil {
if dispatchResp.DispatchedJobID != initialIdempotentDispatchResp.DispatchedJobID {
t.Fatal("dispatched job id should match initial dispatch")
}

if dispatchResp.JobCreateIndex != initialIdempotentDispatchResp.JobCreateIndex {
t.Fatal("dispatched job create index should match initial dispatch")
}
}

if tc.noEval {
return
}
Expand Down
7 changes: 7 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,9 @@ type WriteRequest struct {
// AuthToken is secret portion of the ACL token used for the request
AuthToken string

// IdempotencyToken can be used to ensure the write is idempotent.
IdempotencyToken string

InternalRpcInfo
}

Expand Down Expand Up @@ -4016,6 +4019,10 @@ type Job struct {
// parameterized job.
Dispatched bool

// DispatchIdempotencyToken is optionally used to ensure that a dispatched job does not have any
// non-terminal siblings which have the same token value.
DispatchIdempotencyToken string

// Payload is the payload supplied when the job was dispatched.
Payload []byte

Expand Down
3 changes: 3 additions & 0 deletions vendor/github.com/hashicorp/nomad/api/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.