From c4be87ad1ca947614cd536eac19835e54eea97a4 Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Wed, 23 Jun 2021 16:51:59 -0500 Subject: [PATCH 01/13] Enforce idempotency of dispatched jobs using special meta key --- nomad/job_endpoint.go | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 5a298fd9ac5..e5581fc003d 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -32,6 +32,11 @@ const ( // DispatchPayloadSizeLimit is the maximum size of the uncompressed input // data payload. DispatchPayloadSizeLimit = 16 * 1024 + + // MetaDispatchIdempotencyKey is the meta key that when provided, is used + // to perform an idempotency check to ensure only 1 child of a parameterized job + // with the supplied key may be running (or pending) at a time. + MetaDispatchIdempotencyKey = "nomad_dispatch_idempotency_key" ) // ErrMultipleNamespaces is send when multiple namespaces are used in the OSS setup @@ -1908,6 +1913,39 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa dispatchJob.Meta[k] = v } + // Check to see if an idempotency key was provided on the meta + if idempotencyKey, ok := dispatchJob.Meta[MetaDispatchIdempotencyKey]; ok { + // Fetch all jobs that match the parameterized job ID prefix + iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID) + if err != nil { + return fmt.Errorf("failed to retrieve jobs for idempotency check") + } + + // Iterate + for { + raw := iter.Next() + if raw == nil { + break + } + + // Ensure the parent ID is an exact match + existingJob := raw.(*structs.Job) + if existingJob.ParentID != dispatchJob.ParentID { + continue + } + + // Idempotency keys match. Ensure existing job is not currently running. + if ik, ok := existingJob.Meta[MetaDispatchIdempotencyKey]; ok && ik == idempotencyKey { + // The existing job is either pending or running. + // Registering a new job would violate the idempotency key. + if existingJob.Status != structs.JobStatusDead { + return fmt.Errorf("dispatch violates idempotency key of non-terminal child job: %s", existingJob.ID) + } + } + } + + } + // Compress the payload dispatchJob.Payload = snappy.Encode(nil, args.Payload) From b5d21a919191a5c6a325fa5851fe9cb59c853a48 Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Tue, 29 Jun 2021 10:02:30 -0500 Subject: [PATCH 02/13] Always allow idempotency key meta. Tests for idempotent dispatch --- nomad/job_endpoint.go | 3 +- nomad/job_endpoint_test.go | 69 ++++++++++++++++++++++++++++++++++---- 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index e5581fc003d..29df8fb757e 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -2039,7 +2039,8 @@ func validateDispatchRequest(req *structs.JobDispatchRequest, job *structs.Job) for k := range req.Meta { _, req := required[k] _, opt := optional[k] - if !req && !opt { + // Always allow the idempotency key + if !req && !opt && k != MetaDispatchIdempotencyKey { unpermitted[k] = struct{}{} } } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 889dacba121..b893941a546 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -6133,14 +6133,24 @@ func TestJobEndpoint_Dispatch(t *testing.T) { reqInputDataTooLarge := &structs.JobDispatchRequest{ Payload: make([]byte, DispatchPayloadSizeLimit+100), } + reqIdempotentMeta := &structs.JobDispatchRequest{ + Meta: map[string]string{ + MetaDispatchIdempotencyKey: "foo", + }, + } + + type existingIdempotentChildJob struct { + isTerminal bool + } type testCase struct { - name string - parameterizedJob *structs.Job - dispatchReq *structs.JobDispatchRequest - noEval bool - err bool - errStr string + name string + parameterizedJob *structs.Job + dispatchReq *structs.JobDispatchRequest + noEval bool + err bool + errStr string + existingIdempotentJob *existingIdempotentChildJob } cases := []testCase{ { @@ -6233,6 +6243,32 @@ func TestJobEndpoint_Dispatch(t *testing.T) { err: true, errStr: "stopped", }, + { + name: "idempotent meta key, no existing child job", + parameterizedJob: d1, + dispatchReq: reqIdempotentMeta, + err: false, + existingIdempotentJob: nil, + }, + { + name: "idempotent meta key, w/ existing non-terminal child job", + parameterizedJob: d1, + dispatchReq: reqIdempotentMeta, + err: true, + errStr: "dispatch violates idempotency key of non-terminal child job", + existingIdempotentJob: &existingIdempotentChildJob{ + isTerminal: false, + }, + }, + { + name: "idempotent meta key, w/ existing terminal job", + parameterizedJob: d1, + dispatchReq: reqIdempotentMeta, + err: false, + existingIdempotentJob: &existingIdempotentChildJob{ + isTerminal: true, + }, + }, } for _, tc := range cases { @@ -6266,6 +6302,27 @@ func TestJobEndpoint_Dispatch(t *testing.T) { Namespace: tc.parameterizedJob.Namespace, } + // Dispatch with the same request so a child job w/ the idempotency key exists + if tc.existingIdempotentJob != nil { + var initialDispatchResp structs.JobDispatchResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &initialDispatchResp); err != nil { + t.Fatalf("Unexpected error dispatching initial idempotent job: %v", err) + } + + if tc.existingIdempotentJob.isTerminal { + eval, err := s1.State().EvalByID(nil, initialDispatchResp.EvalID) + if err != nil { + t.Fatalf("Unexpected error fetching eval %v", err) + } + eval = eval.Copy() + eval.Status = structs.EvalStatusComplete + err = s1.State().UpsertEvals(structs.MsgTypeTestSetup, initialDispatchResp.Index+1, []*structs.Evaluation{eval}) + if err != nil { + t.Fatalf("Unexpected error completing eval %v", err) + } + } + } + var dispatchResp structs.JobDispatchResponse dispatchErr := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &dispatchResp) From f607c35421e430e9543e45cf2b50b7ed7ccbbedf Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Tue, 29 Jun 2021 15:52:12 -0500 Subject: [PATCH 03/13] Add idempotency token to dispatch request instead of special meta key --- api/jobs.go | 23 ++++++++++++++++--- nomad/job_endpoint.go | 21 +++++++---------- nomad/job_endpoint_test.go | 20 ++++++++-------- nomad/structs/structs.go | 11 ++++++--- vendor/github.com/hashicorp/nomad/api/jobs.go | 23 ++++++++++++++++--- 5 files changed, 65 insertions(+), 33 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index 1fed52c7ca3..536234c3d9c 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -397,6 +397,22 @@ func (j *Jobs) Dispatch(jobID string, meta map[string]string, return &resp, wm, nil } +func (j *Jobs) DispatchIdempotent(jobID string, meta map[string]string, + payload []byte, idempotencyToken string, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) { + var resp JobDispatchResponse + req := &JobDispatchRequest{ + JobID: jobID, + Meta: meta, + Payload: payload, + IdempotencyToken: idempotencyToken, + } + wm, err := j.client.write("/v1/job/"+url.PathEscape(jobID)+"/dispatch", req, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, wm, nil +} + // Revert is used to revert the given job to the passed version. If // enforceVersion is set, the job is only reverted if the current version is at // the passed version. @@ -1262,9 +1278,10 @@ type DesiredUpdates struct { } type JobDispatchRequest struct { - JobID string - Payload []byte - Meta map[string]string + JobID string + Payload []byte + Meta map[string]string + IdempotencyToken string } type JobDispatchResponse struct { diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 29df8fb757e..02d98c5768b 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -32,11 +32,6 @@ const ( // DispatchPayloadSizeLimit is the maximum size of the uncompressed input // data payload. DispatchPayloadSizeLimit = 16 * 1024 - - // MetaDispatchIdempotencyKey is the meta key that when provided, is used - // to perform an idempotency check to ensure only 1 child of a parameterized job - // with the supplied key may be running (or pending) at a time. - MetaDispatchIdempotencyKey = "nomad_dispatch_idempotency_key" ) // ErrMultipleNamespaces is send when multiple namespaces are used in the OSS setup @@ -1904,6 +1899,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa dispatchJob.Dispatched = true dispatchJob.Status = "" dispatchJob.StatusDescription = "" + dispatchJob.DispatchIdempotencyToken = args.IdempotencyToken // Merge in the meta data for k, v := range args.Meta { @@ -1913,8 +1909,8 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa dispatchJob.Meta[k] = v } - // Check to see if an idempotency key was provided on the meta - if idempotencyKey, ok := dispatchJob.Meta[MetaDispatchIdempotencyKey]; ok { + // Check to see if an idempotency token was specified on the request + if args.IdempotencyToken != "" { // Fetch all jobs that match the parameterized job ID prefix iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID) if err != nil { @@ -1934,12 +1930,12 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa continue } - // Idempotency keys match. Ensure existing job is not currently running. - if ik, ok := existingJob.Meta[MetaDispatchIdempotencyKey]; ok && ik == idempotencyKey { + // Idempotency tokens match. Ensure existing job is terminal. + if existingJob.DispatchIdempotencyToken == args.IdempotencyToken { // The existing job is either pending or running. - // Registering a new job would violate the idempotency key. + // Registering a new job would violate the idempotency token. if existingJob.Status != structs.JobStatusDead { - return fmt.Errorf("dispatch violates idempotency key of non-terminal child job: %s", existingJob.ID) + return fmt.Errorf("dispatch violates idempotency token of non-terminal child job: %s", existingJob.ID) } } } @@ -2039,8 +2035,7 @@ func validateDispatchRequest(req *structs.JobDispatchRequest, job *structs.Job) for k := range req.Meta { _, req := required[k] _, opt := optional[k] - // Always allow the idempotency key - if !req && !opt && k != MetaDispatchIdempotencyKey { + if !req && !opt { unpermitted[k] = struct{}{} } } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index b893941a546..7db2af5e039 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -6133,10 +6133,8 @@ func TestJobEndpoint_Dispatch(t *testing.T) { reqInputDataTooLarge := &structs.JobDispatchRequest{ Payload: make([]byte, DispatchPayloadSizeLimit+100), } - reqIdempotentMeta := &structs.JobDispatchRequest{ - Meta: map[string]string{ - MetaDispatchIdempotencyKey: "foo", - }, + reqIdempotentToken := &structs.JobDispatchRequest{ + IdempotencyToken: "foo", } type existingIdempotentChildJob struct { @@ -6244,26 +6242,26 @@ func TestJobEndpoint_Dispatch(t *testing.T) { errStr: "stopped", }, { - name: "idempotent meta key, no existing child job", + name: "idempotency token, no existing child job", parameterizedJob: d1, - dispatchReq: reqIdempotentMeta, + dispatchReq: reqIdempotentToken, err: false, existingIdempotentJob: nil, }, { - name: "idempotent meta key, w/ existing non-terminal child job", + name: "idempotency token, w/ existing non-terminal child job", parameterizedJob: d1, - dispatchReq: reqIdempotentMeta, + dispatchReq: reqIdempotentToken, err: true, - errStr: "dispatch violates idempotency key of non-terminal child job", + errStr: "dispatch violates idempotency token of non-terminal child job", existingIdempotentJob: &existingIdempotentChildJob{ isTerminal: false, }, }, { - name: "idempotent meta key, w/ existing terminal job", + name: "idempotency token, w/ existing terminal job", parameterizedJob: d1, - dispatchReq: reqIdempotentMeta, + dispatchReq: reqIdempotentToken, err: false, existingIdempotentJob: &existingIdempotentChildJob{ isTerminal: true, diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e7c0f49aa78..9d1677bc57c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -728,9 +728,10 @@ type JobScaleStatusRequest struct { // JobDispatchRequest is used to dispatch a job based on a parameterized job type JobDispatchRequest struct { - JobID string - Payload []byte - Meta map[string]string + JobID string + Payload []byte + Meta map[string]string + IdempotencyToken string WriteRequest } @@ -4016,6 +4017,10 @@ type Job struct { // parameterized job. Dispatched bool + // DispatchIdempotencyToken is optionally used to ensure that a dispatched job does not have any + // non-terminal siblings which have the same token value. + DispatchIdempotencyToken string + // Payload is the payload supplied when the job was dispatched. Payload []byte diff --git a/vendor/github.com/hashicorp/nomad/api/jobs.go b/vendor/github.com/hashicorp/nomad/api/jobs.go index 1fed52c7ca3..536234c3d9c 100644 --- a/vendor/github.com/hashicorp/nomad/api/jobs.go +++ b/vendor/github.com/hashicorp/nomad/api/jobs.go @@ -397,6 +397,22 @@ func (j *Jobs) Dispatch(jobID string, meta map[string]string, return &resp, wm, nil } +func (j *Jobs) DispatchIdempotent(jobID string, meta map[string]string, + payload []byte, idempotencyToken string, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) { + var resp JobDispatchResponse + req := &JobDispatchRequest{ + JobID: jobID, + Meta: meta, + Payload: payload, + IdempotencyToken: idempotencyToken, + } + wm, err := j.client.write("/v1/job/"+url.PathEscape(jobID)+"/dispatch", req, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, wm, nil +} + // Revert is used to revert the given job to the passed version. If // enforceVersion is set, the job is only reverted if the current version is at // the passed version. @@ -1262,9 +1278,10 @@ type DesiredUpdates struct { } type JobDispatchRequest struct { - JobID string - Payload []byte - Meta map[string]string + JobID string + Payload []byte + Meta map[string]string + IdempotencyToken string } type JobDispatchResponse struct { From 850204870f0bef12b251c504ebf354375238ed12 Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Wed, 30 Jun 2021 12:26:33 -0500 Subject: [PATCH 04/13] Make idempotency error user friendly Co-authored-by: Tim Gross --- nomad/job_endpoint.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 02d98c5768b..e35c9e1a85f 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1935,7 +1935,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa // The existing job is either pending or running. // Registering a new job would violate the idempotency token. if existingJob.Status != structs.JobStatusDead { - return fmt.Errorf("dispatch violates idempotency token of non-terminal child job: %s", existingJob.ID) + return fmt.Errorf("idempotent dispatch failed: another child job with this token is running or pending: %s", existingJob.ID) } } } From d83ff8b8c99118ae5301cd4e9139f2f74e0dab16 Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Wed, 30 Jun 2021 12:30:44 -0500 Subject: [PATCH 05/13] Update comment about idempotency check --- nomad/job_endpoint.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index e35c9e1a85f..a018fe528c2 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1909,7 +1909,8 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa dispatchJob.Meta[k] = v } - // Check to see if an idempotency token was specified on the request + // Ensure that we have only one dispatched version of this job running concurrently + // by comparing the idempotency token against any non-terminal versions. if args.IdempotencyToken != "" { // Fetch all jobs that match the parameterized job ID prefix iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID) From 63d8e92e901b8b59c314bdfba9ef6938ed33bebf Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Wed, 30 Jun 2021 14:23:37 -0500 Subject: [PATCH 06/13] Move idempotency token to write options. Remove DispatchIdempotent --- api/api.go | 3 +++ api/jobs.go | 23 +++---------------- nomad/structs/structs.go | 10 ++++---- vendor/github.com/hashicorp/nomad/api/api.go | 3 +++ vendor/github.com/hashicorp/nomad/api/jobs.go | 23 +++---------------- 5 files changed, 18 insertions(+), 44 deletions(-) diff --git a/api/api.go b/api/api.go index 7ce7d9a1368..7693bde43a2 100644 --- a/api/api.go +++ b/api/api.go @@ -93,6 +93,9 @@ type WriteOptions struct { // ctx is an optional context pass through to the underlying HTTP // request layer. Use Context() and WithContext() to manage this. ctx context.Context + + // IdempotencyToken can be used to ensure the write is idempotent. + IdempotencyToken string } // QueryMeta is used to return meta data about a query diff --git a/api/jobs.go b/api/jobs.go index 536234c3d9c..1fed52c7ca3 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -397,22 +397,6 @@ func (j *Jobs) Dispatch(jobID string, meta map[string]string, return &resp, wm, nil } -func (j *Jobs) DispatchIdempotent(jobID string, meta map[string]string, - payload []byte, idempotencyToken string, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) { - var resp JobDispatchResponse - req := &JobDispatchRequest{ - JobID: jobID, - Meta: meta, - Payload: payload, - IdempotencyToken: idempotencyToken, - } - wm, err := j.client.write("/v1/job/"+url.PathEscape(jobID)+"/dispatch", req, &resp, q) - if err != nil { - return nil, nil, err - } - return &resp, wm, nil -} - // Revert is used to revert the given job to the passed version. If // enforceVersion is set, the job is only reverted if the current version is at // the passed version. @@ -1278,10 +1262,9 @@ type DesiredUpdates struct { } type JobDispatchRequest struct { - JobID string - Payload []byte - Meta map[string]string - IdempotencyToken string + JobID string + Payload []byte + Meta map[string]string } type JobDispatchResponse struct { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 9d1677bc57c..bf73585809d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -385,6 +385,9 @@ type WriteRequest struct { // AuthToken is secret portion of the ACL token used for the request AuthToken string + // IdempotencyToken can be used to ensure the write is idempotent. + IdempotencyToken string + InternalRpcInfo } @@ -728,10 +731,9 @@ type JobScaleStatusRequest struct { // JobDispatchRequest is used to dispatch a job based on a parameterized job type JobDispatchRequest struct { - JobID string - Payload []byte - Meta map[string]string - IdempotencyToken string + JobID string + Payload []byte + Meta map[string]string WriteRequest } diff --git a/vendor/github.com/hashicorp/nomad/api/api.go b/vendor/github.com/hashicorp/nomad/api/api.go index 7ce7d9a1368..7693bde43a2 100644 --- a/vendor/github.com/hashicorp/nomad/api/api.go +++ b/vendor/github.com/hashicorp/nomad/api/api.go @@ -93,6 +93,9 @@ type WriteOptions struct { // ctx is an optional context pass through to the underlying HTTP // request layer. Use Context() and WithContext() to manage this. ctx context.Context + + // IdempotencyToken can be used to ensure the write is idempotent. + IdempotencyToken string } // QueryMeta is used to return meta data about a query diff --git a/vendor/github.com/hashicorp/nomad/api/jobs.go b/vendor/github.com/hashicorp/nomad/api/jobs.go index 536234c3d9c..1fed52c7ca3 100644 --- a/vendor/github.com/hashicorp/nomad/api/jobs.go +++ b/vendor/github.com/hashicorp/nomad/api/jobs.go @@ -397,22 +397,6 @@ func (j *Jobs) Dispatch(jobID string, meta map[string]string, return &resp, wm, nil } -func (j *Jobs) DispatchIdempotent(jobID string, meta map[string]string, - payload []byte, idempotencyToken string, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) { - var resp JobDispatchResponse - req := &JobDispatchRequest{ - JobID: jobID, - Meta: meta, - Payload: payload, - IdempotencyToken: idempotencyToken, - } - wm, err := j.client.write("/v1/job/"+url.PathEscape(jobID)+"/dispatch", req, &resp, q) - if err != nil { - return nil, nil, err - } - return &resp, wm, nil -} - // Revert is used to revert the given job to the passed version. If // enforceVersion is set, the job is only reverted if the current version is at // the passed version. @@ -1278,10 +1262,9 @@ type DesiredUpdates struct { } type JobDispatchRequest struct { - JobID string - Payload []byte - Meta map[string]string - IdempotencyToken string + JobID string + Payload []byte + Meta map[string]string } type JobDispatchResponse struct { From aa3512d11c93d3e175e2de960d5fcb2cfe24d1bc Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Thu, 1 Jul 2021 08:48:57 -0500 Subject: [PATCH 07/13] Update tests after moving idempotency token to WriteOptions --- nomad/job_endpoint_test.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 7db2af5e039..5171f9dd9c4 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -6133,9 +6133,6 @@ func TestJobEndpoint_Dispatch(t *testing.T) { reqInputDataTooLarge := &structs.JobDispatchRequest{ Payload: make([]byte, DispatchPayloadSizeLimit+100), } - reqIdempotentToken := &structs.JobDispatchRequest{ - IdempotencyToken: "foo", - } type existingIdempotentChildJob struct { isTerminal bool @@ -6148,6 +6145,7 @@ func TestJobEndpoint_Dispatch(t *testing.T) { noEval bool err bool errStr string + idempotencyToken string existingIdempotentJob *existingIdempotentChildJob } cases := []testCase{ @@ -6244,16 +6242,18 @@ func TestJobEndpoint_Dispatch(t *testing.T) { { name: "idempotency token, no existing child job", parameterizedJob: d1, - dispatchReq: reqIdempotentToken, + dispatchReq: reqInputDataNoMeta, err: false, + idempotencyToken: "foo", existingIdempotentJob: nil, }, { name: "idempotency token, w/ existing non-terminal child job", parameterizedJob: d1, - dispatchReq: reqIdempotentToken, + dispatchReq: reqInputDataNoMeta, err: true, - errStr: "dispatch violates idempotency token of non-terminal child job", + errStr: "idempotent dispatch failed: another child job with this token is running or pending", + idempotencyToken: "foo", existingIdempotentJob: &existingIdempotentChildJob{ isTerminal: false, }, @@ -6261,8 +6261,9 @@ func TestJobEndpoint_Dispatch(t *testing.T) { { name: "idempotency token, w/ existing terminal job", parameterizedJob: d1, - dispatchReq: reqIdempotentToken, + dispatchReq: reqInputDataNoMeta, err: false, + idempotencyToken: "foo", existingIdempotentJob: &existingIdempotentChildJob{ isTerminal: true, }, @@ -6296,8 +6297,9 @@ func TestJobEndpoint_Dispatch(t *testing.T) { // Now try to dispatch tc.dispatchReq.JobID = tc.parameterizedJob.ID tc.dispatchReq.WriteRequest = structs.WriteRequest{ - Region: "global", - Namespace: tc.parameterizedJob.Namespace, + Region: "global", + Namespace: tc.parameterizedJob.Namespace, + IdempotencyToken: tc.idempotencyToken, } // Dispatch with the same request so a child job w/ the idempotency key exists From 4448cff8a3a8b9c68b52b6cf083fb91b2b32c224 Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Fri, 2 Jul 2021 10:58:42 -0500 Subject: [PATCH 08/13] Move idempotency check closer to validate. Log error. --- nomad/job_endpoint.go | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index a018fe528c2..52698551660 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1890,32 +1890,15 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa return err } - // Derive the child job and commit it via Raft - with initial status - dispatchJob := parameterizedJob.Copy() - dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now()) - dispatchJob.ParentID = parameterizedJob.ID - dispatchJob.Name = dispatchJob.ID - dispatchJob.SetSubmitTime() - dispatchJob.Dispatched = true - dispatchJob.Status = "" - dispatchJob.StatusDescription = "" - dispatchJob.DispatchIdempotencyToken = args.IdempotencyToken - - // Merge in the meta data - for k, v := range args.Meta { - if dispatchJob.Meta == nil { - dispatchJob.Meta = make(map[string]string, len(args.Meta)) - } - dispatchJob.Meta[k] = v - } - // Ensure that we have only one dispatched version of this job running concurrently // by comparing the idempotency token against any non-terminal versions. if args.IdempotencyToken != "" { // Fetch all jobs that match the parameterized job ID prefix iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID) if err != nil { - return fmt.Errorf("failed to retrieve jobs for idempotency check") + errMsg := "failed to retrieve jobs for idempotency check" + j.logger.Error(errMsg, "error", err) + return fmt.Errorf(errMsg) } // Iterate @@ -1927,7 +1910,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa // Ensure the parent ID is an exact match existingJob := raw.(*structs.Job) - if existingJob.ParentID != dispatchJob.ParentID { + if existingJob.ParentID != parameterizedJob.ID { continue } @@ -1940,7 +1923,25 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa } } } + } + + // Derive the child job and commit it via Raft - with initial status + dispatchJob := parameterizedJob.Copy() + dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now()) + dispatchJob.ParentID = parameterizedJob.ID + dispatchJob.Name = dispatchJob.ID + dispatchJob.SetSubmitTime() + dispatchJob.Dispatched = true + dispatchJob.Status = "" + dispatchJob.StatusDescription = "" + dispatchJob.DispatchIdempotencyToken = args.IdempotencyToken + // Merge in the meta data + for k, v := range args.Meta { + if dispatchJob.Meta == nil { + dispatchJob.Meta = make(map[string]string, len(args.Meta)) + } + dispatchJob.Meta[k] = v } // Compress the payload From 6a1a200925c69eddd70de2c1fca3fb989539b3d0 Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Fri, 2 Jul 2021 14:08:46 -0500 Subject: [PATCH 09/13] Match idempotency key on all child jobs and return existing job when idempotency keys match. --- nomad/job_endpoint.go | 13 ++++++++----- nomad/job_endpoint_test.go | 24 ++++++++++++++++++------ 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 52698551660..d456311e81e 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1914,13 +1914,16 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa continue } - // Idempotency tokens match. Ensure existing job is terminal. + // Idempotency tokens match if existingJob.DispatchIdempotencyToken == args.IdempotencyToken { - // The existing job is either pending or running. + // The existing job has not yet been garbage collected. // Registering a new job would violate the idempotency token. - if existingJob.Status != structs.JobStatusDead { - return fmt.Errorf("idempotent dispatch failed: another child job with this token is running or pending: %s", existingJob.ID) - } + // Return the existing job. + reply.JobCreateIndex = existingJob.CreateIndex + reply.DispatchedJobID = existingJob.ID + reply.Index = existingJob.ModifyIndex + + return nil } } } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 5171f9dd9c4..235882fe537 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -6251,12 +6251,12 @@ func TestJobEndpoint_Dispatch(t *testing.T) { name: "idempotency token, w/ existing non-terminal child job", parameterizedJob: d1, dispatchReq: reqInputDataNoMeta, - err: true, - errStr: "idempotent dispatch failed: another child job with this token is running or pending", + err: false, idempotencyToken: "foo", existingIdempotentJob: &existingIdempotentChildJob{ isTerminal: false, }, + noEval: true, }, { name: "idempotency token, w/ existing terminal job", @@ -6267,6 +6267,7 @@ func TestJobEndpoint_Dispatch(t *testing.T) { existingIdempotentJob: &existingIdempotentChildJob{ isTerminal: true, }, + noEval: true, }, } @@ -6303,20 +6304,20 @@ func TestJobEndpoint_Dispatch(t *testing.T) { } // Dispatch with the same request so a child job w/ the idempotency key exists + var initialIdempotentDispatchResp structs.JobDispatchResponse if tc.existingIdempotentJob != nil { - var initialDispatchResp structs.JobDispatchResponse - if err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &initialDispatchResp); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &initialIdempotentDispatchResp); err != nil { t.Fatalf("Unexpected error dispatching initial idempotent job: %v", err) } if tc.existingIdempotentJob.isTerminal { - eval, err := s1.State().EvalByID(nil, initialDispatchResp.EvalID) + eval, err := s1.State().EvalByID(nil, initialIdempotentDispatchResp.EvalID) if err != nil { t.Fatalf("Unexpected error fetching eval %v", err) } eval = eval.Copy() eval.Status = structs.EvalStatusComplete - err = s1.State().UpsertEvals(structs.MsgTypeTestSetup, initialDispatchResp.Index+1, []*structs.Evaluation{eval}) + err = s1.State().UpsertEvals(structs.MsgTypeTestSetup, initialIdempotentDispatchResp.Index+1, []*structs.Evaluation{eval}) if err != nil { t.Fatalf("Unexpected error completing eval %v", err) } @@ -6372,6 +6373,17 @@ func TestJobEndpoint_Dispatch(t *testing.T) { t.Fatal("parameter job config should exist") } + // Check that the existing job is returned in the case of a supplied idempotency token + if tc.idempotencyToken != "" && tc.existingIdempotentJob != nil { + if dispatchResp.DispatchedJobID != initialIdempotentDispatchResp.DispatchedJobID { + t.Fatal("dispatched job id should match initial dispatch") + } + + if dispatchResp.JobCreateIndex != initialIdempotentDispatchResp.JobCreateIndex { + t.Fatal("dispatched job create index should match initial dispatch") + } + } + if tc.noEval { return } From 3a8febe5bf2c7ae6e97d517e831f84af79685101 Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Wed, 7 Jul 2021 15:54:56 -0500 Subject: [PATCH 10/13] Update idempotency comment to reflect all jobs Co-authored-by: Mahmood Ali --- nomad/job_endpoint.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index d456311e81e..531bc362b87 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1890,8 +1890,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa return err } - // Ensure that we have only one dispatched version of this job running concurrently - // by comparing the idempotency token against any non-terminal versions. + // Avoid creating new dispatched jobs for retry requests, by using the idempotency token if args.IdempotencyToken != "" { // Fetch all jobs that match the parameterized job ID prefix iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID) From 2bd2f586f406e2b30ade68f680778ed4b1b23aa6 Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Wed, 7 Jul 2021 16:26:35 -0500 Subject: [PATCH 11/13] Set/parse idempotency_token query param --- api/api.go | 3 +++ api/api_test.go | 10 +++++++--- command/agent/http.go | 8 ++++++++ command/agent/job_endpoint_test.go | 12 +++++++----- 4 files changed, 25 insertions(+), 8 deletions(-) diff --git a/api/api.go b/api/api.go index 7693bde43a2..d1f985dbefe 100644 --- a/api/api.go +++ b/api/api.go @@ -596,6 +596,9 @@ func (r *request) setWriteOptions(q *WriteOptions) { if q.AuthToken != "" { r.token = q.AuthToken } + if q.IdempotencyToken != "" { + r.params.Set("idempotency_token", q.IdempotencyToken) + } r.ctx = q.Context() } diff --git a/api/api_test.go b/api/api_test.go index dda4a571a13..a845f641054 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -255,9 +255,10 @@ func TestSetWriteOptions(t *testing.T) { r, _ := c.newRequest("GET", "/v1/jobs") q := &WriteOptions{ - Region: "foo", - Namespace: "bar", - AuthToken: "foobar", + Region: "foo", + Namespace: "bar", + AuthToken: "foobar", + IdempotencyToken: "idempotent", } r.setWriteOptions(q) @@ -267,6 +268,9 @@ func TestSetWriteOptions(t *testing.T) { if r.params.Get("namespace") != "bar" { t.Fatalf("bad: %v", r.params) } + if r.params.Get("idempotency_token") != "idempotent" { + t.Fatalf("bad: %v", r.params) + } if r.token != "foobar" { t.Fatalf("bad: %v", r.token) } diff --git a/command/agent/http.go b/command/agent/http.go index 1f4e48da433..4a93ae74f76 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -666,6 +666,13 @@ func parseNamespace(req *http.Request, n *string) { } } +// parseIdempotencyToken is used to parse the ?idempotency_token parameter +func parseIdempotencyToken(req *http.Request, n *string) { + if idempotencyToken := req.URL.Query().Get("idempotency_token"); idempotencyToken != "" { + *n = idempotencyToken + } +} + // parseBool parses a query parameter to a boolean or returns (nil, nil) if the // parameter is not present. func parseBool(req *http.Request, field string) (*bool, error) { @@ -721,6 +728,7 @@ func (s *HTTPServer) parseWriteRequest(req *http.Request, w *structs.WriteReques parseNamespace(req, &w.Namespace) s.parseToken(req, &w.AuthToken) s.parseRegion(req, &w.Region) + parseIdempotencyToken(req, &w.IdempotencyToken) } // wrapUntrustedContent wraps handlers in a http.ResponseWriter that prevents diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index bd5668a53f4..420e4ac2ac8 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -9,13 +9,14 @@ import ( "time" "github.com/golang/snappy" + "github.com/kr/pretty" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + api "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/kr/pretty" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestHTTP_JobsList(t *testing.T) { @@ -1454,8 +1455,9 @@ func TestHTTP_JobDispatch(t *testing.T) { respW := httptest.NewRecorder() args2 := structs.JobDispatchRequest{ WriteRequest: structs.WriteRequest{ - Region: "global", - Namespace: structs.DefaultNamespace, + Region: "global", + Namespace: structs.DefaultNamespace, + IdempotencyToken: "foo", }, } buf := encodeReq(args2) From 16d43ae6a5a595c398a9db184758daa43bd7ced5 Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Wed, 7 Jul 2021 19:48:59 -0500 Subject: [PATCH 12/13] Changelog entry for dispatch idempotency token --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 74eadb76f7b..dd2a4182305 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.1.3 (Unreleased) +IMPROVEMENTS: +* dispatch jobs: Added optional idempotency token to `WriteOptions` which prevents Nomad from creating new dispatched jobs for retried requests. [[GH-10806](https://github.com/hashicorp/nomad/pull/10806)] + ## 1.1.2 (June 22, 2021) IMPROVEMENTS: From d77329a6d25e4e99c2b3e4c285880361d59b72b6 Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Wed, 7 Jul 2021 19:53:46 -0500 Subject: [PATCH 13/13] Sync vendored api --- vendor/github.com/hashicorp/nomad/api/api.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/vendor/github.com/hashicorp/nomad/api/api.go b/vendor/github.com/hashicorp/nomad/api/api.go index 7693bde43a2..d1f985dbefe 100644 --- a/vendor/github.com/hashicorp/nomad/api/api.go +++ b/vendor/github.com/hashicorp/nomad/api/api.go @@ -596,6 +596,9 @@ func (r *request) setWriteOptions(q *WriteOptions) { if q.AuthToken != "" { r.token = q.AuthToken } + if q.IdempotencyToken != "" { + r.params.Set("idempotency_token", q.IdempotencyToken) + } r.ctx = q.Context() }