From d3b0c93c94b32efc6a0bcf6467e732fb9e531294 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 27 Jun 2017 16:08:18 -0700 Subject: [PATCH] Validate job updates Incurs a local read-before-write but because validation is transitive there's no need to retry the read-validate-write on concurrent updates. --- nomad/job_endpoint.go | 61 ++++++++++++++++++++++++++++++-------- nomad/job_endpoint_test.go | 32 ++++++++++++++++++++ 2 files changed, 80 insertions(+), 13 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 8e64a7633bc..2d6703fb9ac 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -69,30 +69,39 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis reply.Warnings = warnings.Error() } + // Lookup the job + snap, err := j.srv.fsm.State().Snapshot() + if err != nil { + return err + } + ws := memdb.NewWatchSet() + existingJob, err := snap.JobByID(ws, args.Job.ID) + if err != nil { + return err + } + + // If EnforceIndex set, check it before trying to apply if args.EnforceIndex { - // Lookup the job - snap, err := j.srv.fsm.State().Snapshot() - if err != nil { - return err - } - ws := memdb.NewWatchSet() - job, err := snap.JobByID(ws, args.Job.ID) - if err != nil { - return err - } jmi := args.JobModifyIndex - if job != nil { + if existingJob != nil { if jmi == 0 { return fmt.Errorf("%s 0: job already exists", RegisterEnforceIndexErrPrefix) - } else if jmi != job.JobModifyIndex { + } else if jmi != existingJob.JobModifyIndex { return fmt.Errorf("%s %d: job exists with conflicting job modify index: %d", - RegisterEnforceIndexErrPrefix, jmi, job.JobModifyIndex) + RegisterEnforceIndexErrPrefix, jmi, existingJob.JobModifyIndex) } } else if jmi != 0 { return fmt.Errorf("%s %d: job does not exist", RegisterEnforceIndexErrPrefix, jmi) } } + // Validate job transitions if its an update + if existingJob != nil { + if err := validateJobUpdate(existingJob, args.Job); err != nil { + return err + } + } + // Ensure that the job has permissions for the requested Vault tokens policies := args.Job.VaultPolicies() if len(policies) != 0 { @@ -890,6 +899,32 @@ func validateJob(job *structs.Job) (invalid, warnings error) { return validationErrors.ErrorOrNil(), warnings } +// validateJobUpdate ensures updates to a job are valid. +func validateJobUpdate(old, new *structs.Job) error { + // Type transitions are disallowed + if old.Type != new.Type { + return fmt.Errorf("cannot update job from type %q to %q", old.Type, new.Type) + } + + // Transitioning to/from periodic is disallowed + if old.IsPeriodic() && !new.IsPeriodic() { + return fmt.Errorf("cannot update non-periodic job to being periodic") + } + if new.IsPeriodic() && !old.IsPeriodic() { + return fmt.Errorf("cannot update periodic job to being non-periodic") + } + + // Transitioning to/from parameterized is disallowed + if old.IsParameterized() && !new.IsParameterized() { + return fmt.Errorf("cannot update non-parameterized job to being parameterized") + } + if new.IsParameterized() && !old.IsParameterized() { + return fmt.Errorf("cannot update parameterized job to being non-parameterized") + } + + return nil +} + // Dispatch a parameterized job. func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispatchResponse) error { if done, err := j.srv.forward("Job.Dispatch", args, args, reply); done { diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index c5918e3744d..782b8b83704 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -2316,6 +2316,38 @@ func TestJobEndpoint_ValidateJob_UpdateWarning(t *testing.T) { } } +func TestJobEndpoint_ValidateJobUpdate(t *testing.T) { + old := mock.Job() + new := mock.Job() + + if err := validateJobUpdate(old, new); err != nil { + t.Errorf("expected update to be valid but got: %v", err) + } + + new.Type = "batch" + if err := validateJobUpdate(old, new); err == nil { + t.Errorf("expected err when setting new job to a different type") + } else { + t.Log(err) + } + + new = mock.Job() + new.Periodic = &structs.PeriodicConfig{Enabled: true} + if err := validateJobUpdate(old, new); err == nil { + t.Errorf("expected err when setting new job to periodic") + } else { + t.Log(err) + } + + new = mock.Job() + new.ParameterizedJob = &structs.ParameterizedJobConfig{} + if err := validateJobUpdate(old, new); err == nil { + t.Errorf("expected err when setting new job to parameterized") + } else { + t.Log(err) + } +} + func TestJobEndpoint_Dispatch(t *testing.T) { // No requirements