From c4be87ad1ca947614cd536eac19835e54eea97a4 Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Wed, 23 Jun 2021 16:51:59 -0500 Subject: [PATCH] Enforce idempotency of dispatched jobs using special meta key --- nomad/job_endpoint.go | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 5a298fd9ac5..e5581fc003d 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -32,6 +32,11 @@ const ( // DispatchPayloadSizeLimit is the maximum size of the uncompressed input // data payload. DispatchPayloadSizeLimit = 16 * 1024 + + // MetaDispatchIdempotencyKey is the meta key that when provided, is used + // to perform an idempotency check to ensure only 1 child of a parameterized job + // with the supplied key may be running (or pending) at a time. + MetaDispatchIdempotencyKey = "nomad_dispatch_idempotency_key" ) // ErrMultipleNamespaces is send when multiple namespaces are used in the OSS setup @@ -1908,6 +1913,39 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa dispatchJob.Meta[k] = v } + // Check to see if an idempotency key was provided on the meta + if idempotencyKey, ok := dispatchJob.Meta[MetaDispatchIdempotencyKey]; ok { + // Fetch all jobs that match the parameterized job ID prefix + iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID) + if err != nil { + return fmt.Errorf("failed to retrieve jobs for idempotency check") + } + + // Iterate + for { + raw := iter.Next() + if raw == nil { + break + } + + // Ensure the parent ID is an exact match + existingJob := raw.(*structs.Job) + if existingJob.ParentID != dispatchJob.ParentID { + continue + } + + // Idempotency keys match. Ensure existing job is not currently running. + if ik, ok := existingJob.Meta[MetaDispatchIdempotencyKey]; ok && ik == idempotencyKey { + // The existing job is either pending or running. + // Registering a new job would violate the idempotency key. + if existingJob.Status != structs.JobStatusDead { + return fmt.Errorf("dispatch violates idempotency key of non-terminal child job: %s", existingJob.ID) + } + } + } + + } + // Compress the payload dispatchJob.Payload = snappy.Encode(nil, args.Payload)