Skip to content

Commit

Permalink
Merge pull request #10806 from hashicorp/munda/idempotent-job-dispatch
Browse files Browse the repository at this point in the history
Enforce idempotency of dispatched jobs using token on dispatch request
  • Loading branch information
Mahmood Ali authored Jul 8, 2021
2 parents b397edf + d77329a commit 0f0bcdb
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 16 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.1.3 (Unreleased)

IMPROVEMENTS:
* dispatch jobs: Added optional idempotency token to `WriteOptions` which prevents Nomad from creating new dispatched jobs for retried requests. [[GH-10806](https://github.com/hashicorp/nomad/pull/10806)]

## 1.1.2 (June 22, 2021)

IMPROVEMENTS:
Expand Down
6 changes: 6 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ type WriteOptions struct {
// ctx is an optional context pass through to the underlying HTTP
// request layer. Use Context() and WithContext() to manage this.
ctx context.Context

// IdempotencyToken can be used to ensure the write is idempotent.
IdempotencyToken string
}

// QueryMeta is used to return meta data about a query
Expand Down Expand Up @@ -593,6 +596,9 @@ func (r *request) setWriteOptions(q *WriteOptions) {
if q.AuthToken != "" {
r.token = q.AuthToken
}
if q.IdempotencyToken != "" {
r.params.Set("idempotency_token", q.IdempotencyToken)
}
r.ctx = q.Context()
}

Expand Down
10 changes: 7 additions & 3 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,10 @@ func TestSetWriteOptions(t *testing.T) {

r, _ := c.newRequest("GET", "/v1/jobs")
q := &WriteOptions{
Region: "foo",
Namespace: "bar",
AuthToken: "foobar",
Region: "foo",
Namespace: "bar",
AuthToken: "foobar",
IdempotencyToken: "idempotent",
}
r.setWriteOptions(q)

Expand All @@ -267,6 +268,9 @@ func TestSetWriteOptions(t *testing.T) {
if r.params.Get("namespace") != "bar" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("idempotency_token") != "idempotent" {
t.Fatalf("bad: %v", r.params)
}
if r.token != "foobar" {
t.Fatalf("bad: %v", r.token)
}
Expand Down
8 changes: 8 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,13 @@ func parseNamespace(req *http.Request, n *string) {
}
}

// parseIdempotencyToken is used to parse the ?idempotency_token parameter
func parseIdempotencyToken(req *http.Request, n *string) {
if idempotencyToken := req.URL.Query().Get("idempotency_token"); idempotencyToken != "" {
*n = idempotencyToken
}
}

// parseBool parses a query parameter to a boolean or returns (nil, nil) if the
// parameter is not present.
func parseBool(req *http.Request, field string) (*bool, error) {
Expand Down Expand Up @@ -721,6 +728,7 @@ func (s *HTTPServer) parseWriteRequest(req *http.Request, w *structs.WriteReques
parseNamespace(req, &w.Namespace)
s.parseToken(req, &w.AuthToken)
s.parseRegion(req, &w.Region)
parseIdempotencyToken(req, &w.IdempotencyToken)
}

// wrapUntrustedContent wraps handlers in a http.ResponseWriter that prevents
Expand Down
12 changes: 7 additions & 5 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"time"

"github.com/golang/snappy"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

api "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestHTTP_JobsList(t *testing.T) {
Expand Down Expand Up @@ -1454,8 +1455,9 @@ func TestHTTP_JobDispatch(t *testing.T) {
respW := httptest.NewRecorder()
args2 := structs.JobDispatchRequest{
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
Region: "global",
Namespace: structs.DefaultNamespace,
IdempotencyToken: "foo",
},
}
buf := encodeReq(args2)
Expand Down
38 changes: 38 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1890,6 +1890,43 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
return err
}

// Avoid creating new dispatched jobs for retry requests, by using the idempotency token
if args.IdempotencyToken != "" {
// Fetch all jobs that match the parameterized job ID prefix
iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID)
if err != nil {
errMsg := "failed to retrieve jobs for idempotency check"
j.logger.Error(errMsg, "error", err)
return fmt.Errorf(errMsg)
}

// Iterate
for {
raw := iter.Next()
if raw == nil {
break
}

// Ensure the parent ID is an exact match
existingJob := raw.(*structs.Job)
if existingJob.ParentID != parameterizedJob.ID {
continue
}

// Idempotency tokens match
if existingJob.DispatchIdempotencyToken == args.IdempotencyToken {
// The existing job has not yet been garbage collected.
// Registering a new job would violate the idempotency token.
// Return the existing job.
reply.JobCreateIndex = existingJob.CreateIndex
reply.DispatchedJobID = existingJob.ID
reply.Index = existingJob.ModifyIndex

return nil
}
}
}

// Derive the child job and commit it via Raft - with initial status
dispatchJob := parameterizedJob.Copy()
dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now())
Expand All @@ -1899,6 +1936,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
dispatchJob.Dispatched = true
dispatchJob.Status = ""
dispatchJob.StatusDescription = ""
dispatchJob.DispatchIdempotencyToken = args.IdempotencyToken

// Merge in the meta data
for k, v := range args.Meta {
Expand Down
85 changes: 77 additions & 8 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6134,13 +6134,19 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
Payload: make([]byte, DispatchPayloadSizeLimit+100),
}

type existingIdempotentChildJob struct {
isTerminal bool
}

type testCase struct {
name string
parameterizedJob *structs.Job
dispatchReq *structs.JobDispatchRequest
noEval bool
err bool
errStr string
name string
parameterizedJob *structs.Job
dispatchReq *structs.JobDispatchRequest
noEval bool
err bool
errStr string
idempotencyToken string
existingIdempotentJob *existingIdempotentChildJob
}
cases := []testCase{
{
Expand Down Expand Up @@ -6233,6 +6239,36 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
err: true,
errStr: "stopped",
},
{
name: "idempotency token, no existing child job",
parameterizedJob: d1,
dispatchReq: reqInputDataNoMeta,
err: false,
idempotencyToken: "foo",
existingIdempotentJob: nil,
},
{
name: "idempotency token, w/ existing non-terminal child job",
parameterizedJob: d1,
dispatchReq: reqInputDataNoMeta,
err: false,
idempotencyToken: "foo",
existingIdempotentJob: &existingIdempotentChildJob{
isTerminal: false,
},
noEval: true,
},
{
name: "idempotency token, w/ existing terminal job",
parameterizedJob: d1,
dispatchReq: reqInputDataNoMeta,
err: false,
idempotencyToken: "foo",
existingIdempotentJob: &existingIdempotentChildJob{
isTerminal: true,
},
noEval: true,
},
}

for _, tc := range cases {
Expand Down Expand Up @@ -6262,8 +6298,30 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
// Now try to dispatch
tc.dispatchReq.JobID = tc.parameterizedJob.ID
tc.dispatchReq.WriteRequest = structs.WriteRequest{
Region: "global",
Namespace: tc.parameterizedJob.Namespace,
Region: "global",
Namespace: tc.parameterizedJob.Namespace,
IdempotencyToken: tc.idempotencyToken,
}

// Dispatch with the same request so a child job w/ the idempotency key exists
var initialIdempotentDispatchResp structs.JobDispatchResponse
if tc.existingIdempotentJob != nil {
if err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &initialIdempotentDispatchResp); err != nil {
t.Fatalf("Unexpected error dispatching initial idempotent job: %v", err)
}

if tc.existingIdempotentJob.isTerminal {
eval, err := s1.State().EvalByID(nil, initialIdempotentDispatchResp.EvalID)
if err != nil {
t.Fatalf("Unexpected error fetching eval %v", err)
}
eval = eval.Copy()
eval.Status = structs.EvalStatusComplete
err = s1.State().UpsertEvals(structs.MsgTypeTestSetup, initialIdempotentDispatchResp.Index+1, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("Unexpected error completing eval %v", err)
}
}
}

var dispatchResp structs.JobDispatchResponse
Expand Down Expand Up @@ -6315,6 +6373,17 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
t.Fatal("parameter job config should exist")
}

// Check that the existing job is returned in the case of a supplied idempotency token
if tc.idempotencyToken != "" && tc.existingIdempotentJob != nil {
if dispatchResp.DispatchedJobID != initialIdempotentDispatchResp.DispatchedJobID {
t.Fatal("dispatched job id should match initial dispatch")
}

if dispatchResp.JobCreateIndex != initialIdempotentDispatchResp.JobCreateIndex {
t.Fatal("dispatched job create index should match initial dispatch")
}
}

if tc.noEval {
return
}
Expand Down
7 changes: 7 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,9 @@ type WriteRequest struct {
// AuthToken is secret portion of the ACL token used for the request
AuthToken string

// IdempotencyToken can be used to ensure the write is idempotent.
IdempotencyToken string

InternalRpcInfo
}

Expand Down Expand Up @@ -4016,6 +4019,10 @@ type Job struct {
// parameterized job.
Dispatched bool

// DispatchIdempotencyToken is optionally used to ensure that a dispatched job does not have any
// non-terminal siblings which have the same token value.
DispatchIdempotencyToken string

// Payload is the payload supplied when the job was dispatched.
Payload []byte

Expand Down
6 changes: 6 additions & 0 deletions vendor/github.com/hashicorp/nomad/api/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 0f0bcdb

Please sign in to comment.