diff --git a/CHANGELOG.md b/CHANGELOG.md index 74eadb76f7b..dd2a4182305 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.1.3 (Unreleased) +IMPROVEMENTS: +* dispatch jobs: Added optional idempotency token to `WriteOptions` which prevents Nomad from creating new dispatched jobs for retried requests. [[GH-10806](https://github.com/hashicorp/nomad/pull/10806)] + ## 1.1.2 (June 22, 2021) IMPROVEMENTS: diff --git a/api/api.go b/api/api.go index 7ce7d9a1368..d1f985dbefe 100644 --- a/api/api.go +++ b/api/api.go @@ -93,6 +93,9 @@ type WriteOptions struct { // ctx is an optional context pass through to the underlying HTTP // request layer. Use Context() and WithContext() to manage this. ctx context.Context + + // IdempotencyToken can be used to ensure the write is idempotent. + IdempotencyToken string } // QueryMeta is used to return meta data about a query @@ -593,6 +596,9 @@ func (r *request) setWriteOptions(q *WriteOptions) { if q.AuthToken != "" { r.token = q.AuthToken } + if q.IdempotencyToken != "" { + r.params.Set("idempotency_token", q.IdempotencyToken) + } r.ctx = q.Context() } diff --git a/api/api_test.go b/api/api_test.go index dda4a571a13..a845f641054 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -255,9 +255,10 @@ func TestSetWriteOptions(t *testing.T) { r, _ := c.newRequest("GET", "/v1/jobs") q := &WriteOptions{ - Region: "foo", - Namespace: "bar", - AuthToken: "foobar", + Region: "foo", + Namespace: "bar", + AuthToken: "foobar", + IdempotencyToken: "idempotent", } r.setWriteOptions(q) @@ -267,6 +268,9 @@ func TestSetWriteOptions(t *testing.T) { if r.params.Get("namespace") != "bar" { t.Fatalf("bad: %v", r.params) } + if r.params.Get("idempotency_token") != "idempotent" { + t.Fatalf("bad: %v", r.params) + } if r.token != "foobar" { t.Fatalf("bad: %v", r.token) } diff --git a/command/agent/http.go b/command/agent/http.go index 1f4e48da433..4a93ae74f76 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -666,6 +666,13 @@ func parseNamespace(req *http.Request, n *string) { } } +// parseIdempotencyToken is used to parse the ?idempotency_token parameter +func parseIdempotencyToken(req *http.Request, n *string) { + if idempotencyToken := req.URL.Query().Get("idempotency_token"); idempotencyToken != "" { + *n = idempotencyToken + } +} + // parseBool parses a query parameter to a boolean or returns (nil, nil) if the // parameter is not present. func parseBool(req *http.Request, field string) (*bool, error) { @@ -721,6 +728,7 @@ func (s *HTTPServer) parseWriteRequest(req *http.Request, w *structs.WriteReques parseNamespace(req, &w.Namespace) s.parseToken(req, &w.AuthToken) s.parseRegion(req, &w.Region) + parseIdempotencyToken(req, &w.IdempotencyToken) } // wrapUntrustedContent wraps handlers in a http.ResponseWriter that prevents diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index bd5668a53f4..420e4ac2ac8 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -9,13 +9,14 @@ import ( "time" "github.com/golang/snappy" + "github.com/kr/pretty" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + api "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/kr/pretty" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestHTTP_JobsList(t *testing.T) { @@ -1454,8 +1455,9 @@ func TestHTTP_JobDispatch(t *testing.T) { respW := httptest.NewRecorder() args2 := structs.JobDispatchRequest{ WriteRequest: structs.WriteRequest{ - Region: "global", - Namespace: structs.DefaultNamespace, + Region: "global", + Namespace: structs.DefaultNamespace, + IdempotencyToken: "foo", }, } buf := encodeReq(args2) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 5a298fd9ac5..531bc362b87 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1890,6 +1890,43 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa return err } + // Avoid creating new dispatched jobs for retry requests, by using the idempotency token + if args.IdempotencyToken != "" { + // Fetch all jobs that match the parameterized job ID prefix + iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID) + if err != nil { + errMsg := "failed to retrieve jobs for idempotency check" + j.logger.Error(errMsg, "error", err) + return fmt.Errorf(errMsg) + } + + // Iterate + for { + raw := iter.Next() + if raw == nil { + break + } + + // Ensure the parent ID is an exact match + existingJob := raw.(*structs.Job) + if existingJob.ParentID != parameterizedJob.ID { + continue + } + + // Idempotency tokens match + if existingJob.DispatchIdempotencyToken == args.IdempotencyToken { + // The existing job has not yet been garbage collected. + // Registering a new job would violate the idempotency token. + // Return the existing job. + reply.JobCreateIndex = existingJob.CreateIndex + reply.DispatchedJobID = existingJob.ID + reply.Index = existingJob.ModifyIndex + + return nil + } + } + } + // Derive the child job and commit it via Raft - with initial status dispatchJob := parameterizedJob.Copy() dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now()) @@ -1899,6 +1936,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa dispatchJob.Dispatched = true dispatchJob.Status = "" dispatchJob.StatusDescription = "" + dispatchJob.DispatchIdempotencyToken = args.IdempotencyToken // Merge in the meta data for k, v := range args.Meta { diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 889dacba121..235882fe537 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -6134,13 +6134,19 @@ func TestJobEndpoint_Dispatch(t *testing.T) { Payload: make([]byte, DispatchPayloadSizeLimit+100), } + type existingIdempotentChildJob struct { + isTerminal bool + } + type testCase struct { - name string - parameterizedJob *structs.Job - dispatchReq *structs.JobDispatchRequest - noEval bool - err bool - errStr string + name string + parameterizedJob *structs.Job + dispatchReq *structs.JobDispatchRequest + noEval bool + err bool + errStr string + idempotencyToken string + existingIdempotentJob *existingIdempotentChildJob } cases := []testCase{ { @@ -6233,6 +6239,36 @@ func TestJobEndpoint_Dispatch(t *testing.T) { err: true, errStr: "stopped", }, + { + name: "idempotency token, no existing child job", + parameterizedJob: d1, + dispatchReq: reqInputDataNoMeta, + err: false, + idempotencyToken: "foo", + existingIdempotentJob: nil, + }, + { + name: "idempotency token, w/ existing non-terminal child job", + parameterizedJob: d1, + dispatchReq: reqInputDataNoMeta, + err: false, + idempotencyToken: "foo", + existingIdempotentJob: &existingIdempotentChildJob{ + isTerminal: false, + }, + noEval: true, + }, + { + name: "idempotency token, w/ existing terminal job", + parameterizedJob: d1, + dispatchReq: reqInputDataNoMeta, + err: false, + idempotencyToken: "foo", + existingIdempotentJob: &existingIdempotentChildJob{ + isTerminal: true, + }, + noEval: true, + }, } for _, tc := range cases { @@ -6262,8 +6298,30 @@ func TestJobEndpoint_Dispatch(t *testing.T) { // Now try to dispatch tc.dispatchReq.JobID = tc.parameterizedJob.ID tc.dispatchReq.WriteRequest = structs.WriteRequest{ - Region: "global", - Namespace: tc.parameterizedJob.Namespace, + Region: "global", + Namespace: tc.parameterizedJob.Namespace, + IdempotencyToken: tc.idempotencyToken, + } + + // Dispatch with the same request so a child job w/ the idempotency key exists + var initialIdempotentDispatchResp structs.JobDispatchResponse + if tc.existingIdempotentJob != nil { + if err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &initialIdempotentDispatchResp); err != nil { + t.Fatalf("Unexpected error dispatching initial idempotent job: %v", err) + } + + if tc.existingIdempotentJob.isTerminal { + eval, err := s1.State().EvalByID(nil, initialIdempotentDispatchResp.EvalID) + if err != nil { + t.Fatalf("Unexpected error fetching eval %v", err) + } + eval = eval.Copy() + eval.Status = structs.EvalStatusComplete + err = s1.State().UpsertEvals(structs.MsgTypeTestSetup, initialIdempotentDispatchResp.Index+1, []*structs.Evaluation{eval}) + if err != nil { + t.Fatalf("Unexpected error completing eval %v", err) + } + } } var dispatchResp structs.JobDispatchResponse @@ -6315,6 +6373,17 @@ func TestJobEndpoint_Dispatch(t *testing.T) { t.Fatal("parameter job config should exist") } + // Check that the existing job is returned in the case of a supplied idempotency token + if tc.idempotencyToken != "" && tc.existingIdempotentJob != nil { + if dispatchResp.DispatchedJobID != initialIdempotentDispatchResp.DispatchedJobID { + t.Fatal("dispatched job id should match initial dispatch") + } + + if dispatchResp.JobCreateIndex != initialIdempotentDispatchResp.JobCreateIndex { + t.Fatal("dispatched job create index should match initial dispatch") + } + } + if tc.noEval { return } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e7c0f49aa78..bf73585809d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -385,6 +385,9 @@ type WriteRequest struct { // AuthToken is secret portion of the ACL token used for the request AuthToken string + // IdempotencyToken can be used to ensure the write is idempotent. + IdempotencyToken string + InternalRpcInfo } @@ -4016,6 +4019,10 @@ type Job struct { // parameterized job. Dispatched bool + // DispatchIdempotencyToken is optionally used to ensure that a dispatched job does not have any + // non-terminal siblings which have the same token value. + DispatchIdempotencyToken string + // Payload is the payload supplied when the job was dispatched. Payload []byte diff --git a/vendor/github.com/hashicorp/nomad/api/api.go b/vendor/github.com/hashicorp/nomad/api/api.go index 7ce7d9a1368..d1f985dbefe 100644 --- a/vendor/github.com/hashicorp/nomad/api/api.go +++ b/vendor/github.com/hashicorp/nomad/api/api.go @@ -93,6 +93,9 @@ type WriteOptions struct { // ctx is an optional context pass through to the underlying HTTP // request layer. Use Context() and WithContext() to manage this. ctx context.Context + + // IdempotencyToken can be used to ensure the write is idempotent. + IdempotencyToken string } // QueryMeta is used to return meta data about a query @@ -593,6 +596,9 @@ func (r *request) setWriteOptions(q *WriteOptions) { if q.AuthToken != "" { r.token = q.AuthToken } + if q.IdempotencyToken != "" { + r.params.Set("idempotency_token", q.IdempotencyToken) + } r.ctx = q.Context() }