diff --git a/nomad/leader.go b/nomad/leader.go index 3c308a7863e..d54dfbbb286 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -71,8 +71,7 @@ RECONCILE: // Check if we need to handle initial leadership actions if !establishedLeader { if err := s.establishLeadership(stopCh); err != nil { - s.logger.Printf("[ERR] nomad: failed to establish leadership: %v", - err) + s.logger.Printf("[ERR] nomad: failed to establish leadership: %v", err) goto WAIT } establishedLeader = true @@ -149,7 +148,6 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Enable the periodic dispatcher, since we are now the leader. s.periodicDispatcher.SetEnabled(true) - s.periodicDispatcher.Start() // Restore the periodic dispatcher state if err := s.restorePeriodicDispatcher(); err != nil { @@ -288,6 +286,13 @@ func (s *Server) restorePeriodicDispatcher() error { now := time.Now() for i := iter.Next(); i != nil; i = iter.Next() { job := i.(*structs.Job) + + // We skip adding parameterized jobs because they themselves aren't + // tracked, only the dispatched children are. + if job.IsParameterized() { + continue + } + s.periodicDispatcher.Add(job) // If the periodic job has never been launched before, launch will hold diff --git a/nomad/leader_test.go b/nomad/leader_test.go index eda24c2b631..4fc8cb06dae 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -328,10 +328,12 @@ func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) { t.Fatalf("Should have a leader") } - // Inject a periodic job and non-periodic job + // Inject a periodic job, a parameterized periodic job and a non-periodic job periodic := mock.PeriodicJob() nonPeriodic := mock.Job() - for _, job := range []*structs.Job{nonPeriodic, periodic} { + parameterizedPeriodic := mock.PeriodicJob() + parameterizedPeriodic.ParameterizedJob = &structs.ParameterizedJobConfig{} + for _, job := range []*structs.Job{nonPeriodic, periodic, parameterizedPeriodic} { req := structs.JobRegisterRequest{ Job: job, } @@ -359,12 +361,20 @@ func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) { t.Fatalf("should have leader") }) - // Check that the new leader is tracking the periodic job. + // Check that the new leader is tracking the periodic job only testutil.WaitForResult(func() (bool, error) { - _, tracked := leader.periodicDispatcher.tracked[periodic.ID] - return tracked, nil + if _, tracked := leader.periodicDispatcher.tracked[periodic.ID]; !tracked { + return false, fmt.Errorf("periodic job not tracked") + } + if _, tracked := leader.periodicDispatcher.tracked[nonPeriodic.ID]; tracked { + return false, fmt.Errorf("non periodic job tracked") + } + if _, tracked := leader.periodicDispatcher.tracked[parameterizedPeriodic.ID]; tracked { + return false, fmt.Errorf("parameterized periodic job tracked") + } + return true, nil }, func(err error) { - t.Fatalf("periodic job not tracked") + t.Fatalf(err.Error()) }) } @@ -398,7 +408,6 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) { // Restore the periodic dispatcher. s1.periodicDispatcher.SetEnabled(true) - s1.periodicDispatcher.Start() s1.restorePeriodicDispatcher() // Ensure the job is tracked. @@ -450,7 +459,6 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) { // Restore the periodic dispatcher. s1.periodicDispatcher.SetEnabled(true) - s1.periodicDispatcher.Start() s1.restorePeriodicDispatcher() // Ensure the job is tracked. diff --git a/nomad/periodic.go b/nomad/periodic.go index d01f26aceca..11145e62ac7 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -2,6 +2,7 @@ package nomad import ( "container/heap" + "context" "fmt" "log" "strconv" @@ -19,14 +20,12 @@ import ( type PeriodicDispatch struct { dispatcher JobEvalDispatcher enabled bool - running bool tracked map[string]*structs.Job heap *periodicHeap updateCh chan struct{} - stopCh chan struct{} - waitCh chan struct{} + stopFn context.CancelFunc logger *log.Logger l sync.RWMutex } @@ -141,8 +140,6 @@ func NewPeriodicDispatch(logger *log.Logger, dispatcher JobEvalDispatcher) *Peri tracked: make(map[string]*structs.Job), heap: NewPeriodicHeap(), updateCh: make(chan struct{}, 1), - stopCh: make(chan struct{}), - waitCh: make(chan struct{}), logger: logger, } } @@ -152,24 +149,21 @@ func NewPeriodicDispatch(logger *log.Logger, dispatcher JobEvalDispatcher) *Peri // will stop any launched go routine and flush the dispatcher. func (p *PeriodicDispatch) SetEnabled(enabled bool) { p.l.Lock() + defer p.l.Unlock() + wasRunning := p.enabled p.enabled = enabled - p.l.Unlock() - if !enabled { - if p.running { - close(p.stopCh) - <-p.waitCh - p.running = false - } - p.Flush() - } -} -// Start begins the goroutine that creates derived jobs and evals. -func (p *PeriodicDispatch) Start() { - p.l.Lock() - p.running = true - p.l.Unlock() - go p.run() + // If we are transistioning from enabled to disabled, stop the daemon and + // flush. + if !enabled && wasRunning { + p.stopFn() + p.flush() + } else if enabled && !wasRunning { + // If we are transitioning from disabled to enabled, run the daemon. + ctx, cancel := context.WithCancel(context.Background()) + p.stopFn = cancel + go p.run(ctx) + } } // Tracked returns the set of tracked job IDs. @@ -230,11 +224,9 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error { } // Signal an update. - if p.running { - select { - case p.updateCh <- struct{}{}: - default: - } + select { + case p.updateCh <- struct{}{}: + default: } return nil @@ -267,11 +259,9 @@ func (p *PeriodicDispatch) removeLocked(jobID string) error { } // Signal an update. - if p.running { - select { - case p.updateCh <- struct{}{}: - default: - } + select { + case p.updateCh <- struct{}{}: + default: } p.logger.Printf("[DEBUG] nomad.periodic: deregistered periodic job %q", jobID) @@ -303,13 +293,12 @@ func (p *PeriodicDispatch) ForceRun(jobID string) (*structs.Evaluation, error) { func (p *PeriodicDispatch) shouldRun() bool { p.l.RLock() defer p.l.RUnlock() - return p.enabled && p.running + return p.enabled } // run is a long-lived function that waits till a job's periodic spec is met and // then creates an evaluation to run the job. -func (p *PeriodicDispatch) run() { - defer close(p.waitCh) +func (p *PeriodicDispatch) run(ctx context.Context) { var launchCh <-chan time.Time for p.shouldRun() { job, launch := p.nextLaunch() @@ -322,7 +311,7 @@ func (p *PeriodicDispatch) run() { } select { - case <-p.stopCh: + case <-ctx.Done(): return case <-p.updateCh: continue @@ -453,15 +442,12 @@ func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) { return time.Unix(int64(launch), 0), nil } -// Flush clears the state of the PeriodicDispatcher -func (p *PeriodicDispatch) Flush() { - p.l.Lock() - defer p.l.Unlock() - p.stopCh = make(chan struct{}) +// flush clears the state of the PeriodicDispatcher +func (p *PeriodicDispatch) flush() { p.updateCh = make(chan struct{}, 1) - p.waitCh = make(chan struct{}) p.tracked = make(map[string]*structs.Job) p.heap = NewPeriodicHeap() + p.stopFn = nil } // periodicHeap wraps a heap and gives operations other than Push/Pop. diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go index 9d63148adc4..3aacabf5002 100644 --- a/nomad/periodic_test.go +++ b/nomad/periodic_test.go @@ -78,7 +78,6 @@ func testPeriodicDispatcher() (*PeriodicDispatch, *MockJobEvalDispatcher) { m := NewMockJobEvalDispatcher() d := NewPeriodicDispatch(logger, m) d.SetEnabled(true) - d.Start() return d, m } @@ -97,6 +96,31 @@ func testPeriodicJob(times ...time.Time) *structs.Job { return job } +// TestPeriodicDispatch_SetEnabled test that setting enabled twice is a no-op. +// This tests the reported issue: https://github.com/hashicorp/nomad/issues/2829 +func TestPeriodicDispatch_SetEnabled(t *testing.T) { + t.Parallel() + p, _ := testPeriodicDispatcher() + + // SetEnabled has been called once but do it again. + p.SetEnabled(true) + + // Now disable and make sure everything is fine. + p.SetEnabled(false) + + // Enable and track something + p.SetEnabled(true) + job := mock.PeriodicJob() + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) + } + + tracked := p.Tracked() + if len(tracked) != 1 { + t.Fatalf("Add didn't track the job: %v", tracked) + } +} + func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) { t.Parallel() p, _ := testPeriodicDispatcher()