diff --git a/api/jobs.go b/api/jobs.go index 1fed52c7ca3..536234c3d9c 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -397,6 +397,22 @@ func (j *Jobs) Dispatch(jobID string, meta map[string]string, return &resp, wm, nil } +func (j *Jobs) DispatchIdempotent(jobID string, meta map[string]string, + payload []byte, idempotencyToken string, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) { + var resp JobDispatchResponse + req := &JobDispatchRequest{ + JobID: jobID, + Meta: meta, + Payload: payload, + IdempotencyToken: idempotencyToken, + } + wm, err := j.client.write("/v1/job/"+url.PathEscape(jobID)+"/dispatch", req, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, wm, nil +} + // Revert is used to revert the given job to the passed version. If // enforceVersion is set, the job is only reverted if the current version is at // the passed version. @@ -1262,9 +1278,10 @@ type DesiredUpdates struct { } type JobDispatchRequest struct { - JobID string - Payload []byte - Meta map[string]string + JobID string + Payload []byte + Meta map[string]string + IdempotencyToken string } type JobDispatchResponse struct { diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 29df8fb757e..02d98c5768b 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -32,11 +32,6 @@ const ( // DispatchPayloadSizeLimit is the maximum size of the uncompressed input // data payload. DispatchPayloadSizeLimit = 16 * 1024 - - // MetaDispatchIdempotencyKey is the meta key that when provided, is used - // to perform an idempotency check to ensure only 1 child of a parameterized job - // with the supplied key may be running (or pending) at a time. - MetaDispatchIdempotencyKey = "nomad_dispatch_idempotency_key" ) // ErrMultipleNamespaces is send when multiple namespaces are used in the OSS setup @@ -1904,6 +1899,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa dispatchJob.Dispatched = true dispatchJob.Status = "" dispatchJob.StatusDescription = "" + dispatchJob.DispatchIdempotencyToken = args.IdempotencyToken // Merge in the meta data for k, v := range args.Meta { @@ -1913,8 +1909,8 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa dispatchJob.Meta[k] = v } - // Check to see if an idempotency key was provided on the meta - if idempotencyKey, ok := dispatchJob.Meta[MetaDispatchIdempotencyKey]; ok { + // Check to see if an idempotency token was specified on the request + if args.IdempotencyToken != "" { // Fetch all jobs that match the parameterized job ID prefix iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID) if err != nil { @@ -1934,12 +1930,12 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa continue } - // Idempotency keys match. Ensure existing job is not currently running. - if ik, ok := existingJob.Meta[MetaDispatchIdempotencyKey]; ok && ik == idempotencyKey { + // Idempotency tokens match. Ensure existing job is terminal. + if existingJob.DispatchIdempotencyToken == args.IdempotencyToken { // The existing job is either pending or running. - // Registering a new job would violate the idempotency key. + // Registering a new job would violate the idempotency token. if existingJob.Status != structs.JobStatusDead { - return fmt.Errorf("dispatch violates idempotency key of non-terminal child job: %s", existingJob.ID) + return fmt.Errorf("dispatch violates idempotency token of non-terminal child job: %s", existingJob.ID) } } } @@ -2039,8 +2035,7 @@ func validateDispatchRequest(req *structs.JobDispatchRequest, job *structs.Job) for k := range req.Meta { _, req := required[k] _, opt := optional[k] - // Always allow the idempotency key - if !req && !opt && k != MetaDispatchIdempotencyKey { + if !req && !opt { unpermitted[k] = struct{}{} } } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index b893941a546..7db2af5e039 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -6133,10 +6133,8 @@ func TestJobEndpoint_Dispatch(t *testing.T) { reqInputDataTooLarge := &structs.JobDispatchRequest{ Payload: make([]byte, DispatchPayloadSizeLimit+100), } - reqIdempotentMeta := &structs.JobDispatchRequest{ - Meta: map[string]string{ - MetaDispatchIdempotencyKey: "foo", - }, + reqIdempotentToken := &structs.JobDispatchRequest{ + IdempotencyToken: "foo", } type existingIdempotentChildJob struct { @@ -6244,26 +6242,26 @@ func TestJobEndpoint_Dispatch(t *testing.T) { errStr: "stopped", }, { - name: "idempotent meta key, no existing child job", + name: "idempotency token, no existing child job", parameterizedJob: d1, - dispatchReq: reqIdempotentMeta, + dispatchReq: reqIdempotentToken, err: false, existingIdempotentJob: nil, }, { - name: "idempotent meta key, w/ existing non-terminal child job", + name: "idempotency token, w/ existing non-terminal child job", parameterizedJob: d1, - dispatchReq: reqIdempotentMeta, + dispatchReq: reqIdempotentToken, err: true, - errStr: "dispatch violates idempotency key of non-terminal child job", + errStr: "dispatch violates idempotency token of non-terminal child job", existingIdempotentJob: &existingIdempotentChildJob{ isTerminal: false, }, }, { - name: "idempotent meta key, w/ existing terminal job", + name: "idempotency token, w/ existing terminal job", parameterizedJob: d1, - dispatchReq: reqIdempotentMeta, + dispatchReq: reqIdempotentToken, err: false, existingIdempotentJob: &existingIdempotentChildJob{ isTerminal: true, diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e7c0f49aa78..9d1677bc57c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -728,9 +728,10 @@ type JobScaleStatusRequest struct { // JobDispatchRequest is used to dispatch a job based on a parameterized job type JobDispatchRequest struct { - JobID string - Payload []byte - Meta map[string]string + JobID string + Payload []byte + Meta map[string]string + IdempotencyToken string WriteRequest } @@ -4016,6 +4017,10 @@ type Job struct { // parameterized job. Dispatched bool + // DispatchIdempotencyToken is optionally used to ensure that a dispatched job does not have any + // non-terminal siblings which have the same token value. + DispatchIdempotencyToken string + // Payload is the payload supplied when the job was dispatched. Payload []byte diff --git a/vendor/github.com/hashicorp/nomad/api/jobs.go b/vendor/github.com/hashicorp/nomad/api/jobs.go index 1fed52c7ca3..536234c3d9c 100644 --- a/vendor/github.com/hashicorp/nomad/api/jobs.go +++ b/vendor/github.com/hashicorp/nomad/api/jobs.go @@ -397,6 +397,22 @@ func (j *Jobs) Dispatch(jobID string, meta map[string]string, return &resp, wm, nil } +func (j *Jobs) DispatchIdempotent(jobID string, meta map[string]string, + payload []byte, idempotencyToken string, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) { + var resp JobDispatchResponse + req := &JobDispatchRequest{ + JobID: jobID, + Meta: meta, + Payload: payload, + IdempotencyToken: idempotencyToken, + } + wm, err := j.client.write("/v1/job/"+url.PathEscape(jobID)+"/dispatch", req, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, wm, nil +} + // Revert is used to revert the given job to the passed version. If // enforceVersion is set, the job is only reverted if the current version is at // the passed version. @@ -1262,9 +1278,10 @@ type DesiredUpdates struct { } type JobDispatchRequest struct { - JobID string - Payload []byte - Meta map[string]string + JobID string + Payload []byte + Meta map[string]string + IdempotencyToken string } type JobDispatchResponse struct {