diff --git a/nomad/fsm.go b/nomad/fsm.go index 46919aa5c2b..2ff734b565b 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -325,7 +325,8 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { // We always add the job to the periodic dispatcher because there is the // possibility that the periodic spec was removed and then we should stop // tracking it. - if err := n.periodicDispatcher.Add(req.Job); err != nil { + added, err := n.periodicDispatcher.Add(req.Job) + if err != nil { n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err) return err } @@ -338,7 +339,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { // is added to when it was suppose to launch, leader election occurs and the // job was not launched. In this case, we use the insertion time to // determine if a launch was missed. - if req.Job.IsPeriodic() { + if added { prevLaunch, err := n.state.PeriodicLaunchByID(ws, req.Namespace, req.Job.ID) if err != nil { n.logger.Printf("[ERR] nomad.fsm: PeriodicLaunchByID failed: %v", err) diff --git a/nomad/leader.go b/nomad/leader.go index 814c167ea3f..f09825d2378 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -314,7 +314,16 @@ func (s *Server) restorePeriodicDispatcher() error { continue } - s.periodicDispatcher.Add(job) + added, err := s.periodicDispatcher.Add(job) + if err != nil { + return err + } + + // We did not add the job to the tracker, this can be for a variety of + // reasons, but it means that we do not need to force run it. + if !added { + continue + } // If the periodic job has never been launched before, launch will hold // the time the periodic job was added. Otherwise it has the last launch diff --git a/nomad/periodic.go b/nomad/periodic.go index 94ab1e768c1..69c6f7050f4 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -186,18 +186,20 @@ func (p *PeriodicDispatch) Tracked() []*structs.Job { } // Add begins tracking of a periodic job. If it is already tracked, it acts as -// an update to the jobs periodic spec. -func (p *PeriodicDispatch) Add(job *structs.Job) error { +// an update to the jobs periodic spec. The method returns whether the job was +// added and any error that may have occured. +func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) { p.l.Lock() defer p.l.Unlock() // Do nothing if not enabled if !p.enabled { - return nil + return false, nil } - // If we were tracking a job and it has been disabled or made non-periodic remove it. - disabled := !job.IsPeriodic() || !job.Periodic.Enabled + // If we were tracking a job and it has been disabled, made non-periodic, + // stopped or is parameterized, remove it + disabled := !job.IsPeriodic() || !job.Periodic.Enabled || job.Stopped() || job.IsParameterized() tuple := structs.NamespacedID{ ID: job.ID, @@ -210,13 +212,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error { } // If the job is disabled and we aren't tracking it, do nothing. - return nil - } - - // Check if the job is also a parameterized job. If it is, then we do not want to - // treat it as a periodic job but only its dispatched children. - if job.IsParameterized() { - return nil + return false, nil } // Add or update the job. @@ -224,12 +220,12 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error { next := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation())) if tracked { if err := p.heap.Update(job, next); err != nil { - return fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err) + return false, fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err) } p.logger.Printf("[DEBUG] nomad.periodic: updated periodic job %q (%s)", job.ID, job.Namespace) } else { if err := p.heap.Push(job, next); err != nil { - return fmt.Errorf("failed to add job %v: %v", job.ID, err) + return false, fmt.Errorf("failed to add job %v: %v", job.ID, err) } p.logger.Printf("[DEBUG] nomad.periodic: registered periodic job %q (%s)", job.ID, job.Namespace) } @@ -240,7 +236,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error { default: } - return nil + return true, nil } // Remove stops tracking the passed job. If the job is not tracked, it is a diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go index dec56c0601f..1d038430da9 100644 --- a/nomad/periodic_test.go +++ b/nomad/periodic_test.go @@ -116,8 +116,8 @@ func TestPeriodicDispatch_SetEnabled(t *testing.T) { // Enable and track something p.SetEnabled(true) job := mock.PeriodicJob() - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job); err != nil || !added { + t.Fatalf("Add failed %v %v", added, err) } tracked := p.Tracked() @@ -130,8 +130,10 @@ func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) { t.Parallel() p, _ := testPeriodicDispatcher() job := mock.Job() - if err := p.Add(job); err != nil { + if added, err := p.Add(job); err != nil { t.Fatalf("Add of non-periodic job failed: %v; expect no-op", err) + } else if added { + t.Fatalf("Add of non-periodic job happened, expect no-op") } tracked := p.Tracked() @@ -145,8 +147,23 @@ func TestPeriodicDispatch_Add_Periodic_Parameterized(t *testing.T) { p, _ := testPeriodicDispatcher() job := mock.PeriodicJob() job.ParameterizedJob = &structs.ParameterizedJobConfig{} - if err := p.Add(job); err != nil { - t.Fatalf("Add of periodic parameterized job failed: %v; expect no-op", err) + if added, err := p.Add(job); err != nil || added { + t.Fatalf("Add of periodic parameterized job failed: %v %v", added, err) + } + + tracked := p.Tracked() + if len(tracked) != 0 { + t.Fatalf("Add of periodic parameterized job should be no-op: %v", tracked) + } +} + +func TestPeriodicDispatch_Add_Periodic_Stopped(t *testing.T) { + t.Parallel() + p, _ := testPeriodicDispatcher() + job := mock.PeriodicJob() + job.Stop = true + if added, err := p.Add(job); err != nil || added { + t.Fatalf("Add of stopped periodic job failed: %v %v", added, err) } tracked := p.Tracked() @@ -159,8 +176,8 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) { t.Parallel() p, _ := testPeriodicDispatcher() job := mock.PeriodicJob() - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job); err != nil || !added { + t.Fatalf("Add failed %v %v", added, err) } tracked := p.Tracked() @@ -170,8 +187,8 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) { // Update the job and add it again. job.Periodic.Spec = "foo" - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job); err != nil || !added { + t.Fatalf("Add failed %v", added, err) } tracked = p.Tracked() @@ -191,8 +208,15 @@ func TestPeriodicDispatch_Add_Remove_Namespaced(t *testing.T) { job := mock.PeriodicJob() job2 := mock.PeriodicJob() job2.Namespace = "test" - assert.Nil(p.Add(job)) - assert.Nil(p.Add(job2)) + + added, err := p.Add(job) + assert.Nil(err) + assert.True(added) + + added, err = p.Add(job2) + assert.Nil(err) + assert.True(added) + assert.Len(p.Tracked(), 2) assert.Nil(p.Remove(job2.Namespace, job2.ID)) @@ -204,8 +228,8 @@ func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) { t.Parallel() p, _ := testPeriodicDispatcher() job := mock.PeriodicJob() - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job); err != nil || !added { + t.Fatalf("Add failed %v", added, err) } tracked := p.Tracked() @@ -215,8 +239,8 @@ func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) { // Update the job to be non-periodic and add it again. job.Periodic = nil - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job); err != nil || added { + t.Fatalf("Add failed %v", added, err) } tracked = p.Tracked() @@ -233,15 +257,15 @@ func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) { job := testPeriodicJob(time.Now().Add(10 * time.Second)) // Add it. - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job); err != nil || !added { + t.Fatalf("Add failed %v", added, err) } // Update it to be sooner and re-add. expected := time.Now().Round(1 * time.Second).Add(1 * time.Second) job.Periodic.Spec = fmt.Sprintf("%d", expected.Unix()) - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job); err != nil || !added { + t.Fatalf("Add failed %v", added, err) } // Check that nothing is created. @@ -281,8 +305,8 @@ func TestPeriodicDispatch_Remove_Tracked(t *testing.T) { p, _ := testPeriodicDispatcher() job := mock.PeriodicJob() - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job); err != nil || !added { + t.Fatalf("Add failed %v", added, err) } tracked := p.Tracked() @@ -308,8 +332,8 @@ func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) { job := testPeriodicJob(time.Now().Add(1 * time.Second)) // Add it. - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job); err != nil || !added { + t.Fatalf("Add failed %v", added, err) } // Remove the job. @@ -347,8 +371,8 @@ func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) { job := testPeriodicJob(time.Now().Add(10 * time.Second)) // Add it. - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job); err != nil || !added { + t.Fatalf("Add failed %v", added, err) } // ForceRun the job @@ -379,8 +403,8 @@ func TestPeriodicDispatch_Run_DisallowOverlaps(t *testing.T) { job.Periodic.ProhibitOverlap = true // Add it. - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job); err != nil || !added { + t.Fatalf("Add failed %v", added, err) } time.Sleep(3 * time.Second) @@ -408,8 +432,8 @@ func TestPeriodicDispatch_Run_Multiple(t *testing.T) { job := testPeriodicJob(launch1, launch2) // Add it. - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job); err != nil || !added { + t.Fatalf("Add failed %v", added, err) } time.Sleep(3 * time.Second) @@ -440,11 +464,11 @@ func TestPeriodicDispatch_Run_SameTime(t *testing.T) { job2 := testPeriodicJob(launch) // Add them. - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job); err != nil || !added { + t.Fatalf("Add failed %v", added, err) } - if err := p.Add(job2); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job2); err != nil || !added { + t.Fatalf("Add failed %v", added, err) } if l := len(p.Tracked()); l != 2 { @@ -480,11 +504,11 @@ func TestPeriodicDispatch_Run_SameID_Different_Namespace(t *testing.T) { job2.Namespace = "test" // Add them. - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job); err != nil || !added { + t.Fatalf("Add failed %v", added, err) } - if err := p.Add(job2); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job2); err != nil || !added { + t.Fatalf("Add failed %v", added, err) } if l := len(p.Tracked()); l != 2 { @@ -560,8 +584,8 @@ func TestPeriodicDispatch_Complex(t *testing.T) { shuffle(toDelete) for _, job := range jobs { - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) + if added, err := p.Add(job); err != nil || !added { + t.Fatalf("Add failed %v", added, err) } }