Skip to content

Commit

Permalink
Merge pull request #2959 from hashicorp/b-periodic-dispatcher
Browse files Browse the repository at this point in the history
Fix restoration of parameterized, periodic jobs
  • Loading branch information
dadgar authored Aug 7, 2017
2 parents a08c5df + 09e90da commit d23b925
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 53 deletions.
11 changes: 8 additions & 3 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ RECONCILE:
// Check if we need to handle initial leadership actions
if !establishedLeader {
if err := s.establishLeadership(stopCh); err != nil {
s.logger.Printf("[ERR] nomad: failed to establish leadership: %v",
err)
s.logger.Printf("[ERR] nomad: failed to establish leadership: %v", err)
goto WAIT
}
establishedLeader = true
Expand Down Expand Up @@ -149,7 +148,6 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {

// Enable the periodic dispatcher, since we are now the leader.
s.periodicDispatcher.SetEnabled(true)
s.periodicDispatcher.Start()

// Restore the periodic dispatcher state
if err := s.restorePeriodicDispatcher(); err != nil {
Expand Down Expand Up @@ -288,6 +286,13 @@ func (s *Server) restorePeriodicDispatcher() error {
now := time.Now()
for i := iter.Next(); i != nil; i = iter.Next() {
job := i.(*structs.Job)

// We skip adding parameterized jobs because they themselves aren't
// tracked, only the dispatched children are.
if job.IsParameterized() {
continue
}

s.periodicDispatcher.Add(job)

// If the periodic job has never been launched before, launch will hold
Expand Down
24 changes: 16 additions & 8 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,12 @@ func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) {
t.Fatalf("Should have a leader")
}

// Inject a periodic job and non-periodic job
// Inject a periodic job, a parameterized periodic job and a non-periodic job
periodic := mock.PeriodicJob()
nonPeriodic := mock.Job()
for _, job := range []*structs.Job{nonPeriodic, periodic} {
parameterizedPeriodic := mock.PeriodicJob()
parameterizedPeriodic.ParameterizedJob = &structs.ParameterizedJobConfig{}
for _, job := range []*structs.Job{nonPeriodic, periodic, parameterizedPeriodic} {
req := structs.JobRegisterRequest{
Job: job,
}
Expand Down Expand Up @@ -359,12 +361,20 @@ func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) {
t.Fatalf("should have leader")
})

// Check that the new leader is tracking the periodic job.
// Check that the new leader is tracking the periodic job only
testutil.WaitForResult(func() (bool, error) {
_, tracked := leader.periodicDispatcher.tracked[periodic.ID]
return tracked, nil
if _, tracked := leader.periodicDispatcher.tracked[periodic.ID]; !tracked {
return false, fmt.Errorf("periodic job not tracked")
}
if _, tracked := leader.periodicDispatcher.tracked[nonPeriodic.ID]; tracked {
return false, fmt.Errorf("non periodic job tracked")
}
if _, tracked := leader.periodicDispatcher.tracked[parameterizedPeriodic.ID]; tracked {
return false, fmt.Errorf("parameterized periodic job tracked")
}
return true, nil
}, func(err error) {
t.Fatalf("periodic job not tracked")
t.Fatalf(err.Error())
})
}

Expand Down Expand Up @@ -398,7 +408,6 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
s1.periodicDispatcher.Start()
s1.restorePeriodicDispatcher()

// Ensure the job is tracked.
Expand Down Expand Up @@ -450,7 +459,6 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
s1.periodicDispatcher.Start()
s1.restorePeriodicDispatcher()

// Ensure the job is tracked.
Expand Down
68 changes: 27 additions & 41 deletions nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"container/heap"
"context"
"fmt"
"log"
"strconv"
Expand All @@ -19,14 +20,12 @@ import (
type PeriodicDispatch struct {
dispatcher JobEvalDispatcher
enabled bool
running bool

tracked map[string]*structs.Job
heap *periodicHeap

updateCh chan struct{}
stopCh chan struct{}
waitCh chan struct{}
stopFn context.CancelFunc
logger *log.Logger
l sync.RWMutex
}
Expand Down Expand Up @@ -141,8 +140,6 @@ func NewPeriodicDispatch(logger *log.Logger, dispatcher JobEvalDispatcher) *Peri
tracked: make(map[string]*structs.Job),
heap: NewPeriodicHeap(),
updateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
waitCh: make(chan struct{}),
logger: logger,
}
}
Expand All @@ -152,24 +149,21 @@ func NewPeriodicDispatch(logger *log.Logger, dispatcher JobEvalDispatcher) *Peri
// will stop any launched go routine and flush the dispatcher.
func (p *PeriodicDispatch) SetEnabled(enabled bool) {
p.l.Lock()
defer p.l.Unlock()
wasRunning := p.enabled
p.enabled = enabled
p.l.Unlock()
if !enabled {
if p.running {
close(p.stopCh)
<-p.waitCh
p.running = false
}
p.Flush()
}
}

// Start begins the goroutine that creates derived jobs and evals.
func (p *PeriodicDispatch) Start() {
p.l.Lock()
p.running = true
p.l.Unlock()
go p.run()
// If we are transistioning from enabled to disabled, stop the daemon and
// flush.
if !enabled && wasRunning {
p.stopFn()
p.flush()
} else if enabled && !wasRunning {
// If we are transitioning from disabled to enabled, run the daemon.
ctx, cancel := context.WithCancel(context.Background())
p.stopFn = cancel
go p.run(ctx)
}
}

// Tracked returns the set of tracked job IDs.
Expand Down Expand Up @@ -230,11 +224,9 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {
}

// Signal an update.
if p.running {
select {
case p.updateCh <- struct{}{}:
default:
}
select {
case p.updateCh <- struct{}{}:
default:
}

return nil
Expand Down Expand Up @@ -267,11 +259,9 @@ func (p *PeriodicDispatch) removeLocked(jobID string) error {
}

// Signal an update.
if p.running {
select {
case p.updateCh <- struct{}{}:
default:
}
select {
case p.updateCh <- struct{}{}:
default:
}

p.logger.Printf("[DEBUG] nomad.periodic: deregistered periodic job %q", jobID)
Expand Down Expand Up @@ -303,13 +293,12 @@ func (p *PeriodicDispatch) ForceRun(jobID string) (*structs.Evaluation, error) {
func (p *PeriodicDispatch) shouldRun() bool {
p.l.RLock()
defer p.l.RUnlock()
return p.enabled && p.running
return p.enabled
}

// run is a long-lived function that waits till a job's periodic spec is met and
// then creates an evaluation to run the job.
func (p *PeriodicDispatch) run() {
defer close(p.waitCh)
func (p *PeriodicDispatch) run(ctx context.Context) {
var launchCh <-chan time.Time
for p.shouldRun() {
job, launch := p.nextLaunch()
Expand All @@ -322,7 +311,7 @@ func (p *PeriodicDispatch) run() {
}

select {
case <-p.stopCh:
case <-ctx.Done():
return
case <-p.updateCh:
continue
Expand Down Expand Up @@ -453,15 +442,12 @@ func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) {
return time.Unix(int64(launch), 0), nil
}

// Flush clears the state of the PeriodicDispatcher
func (p *PeriodicDispatch) Flush() {
p.l.Lock()
defer p.l.Unlock()
p.stopCh = make(chan struct{})
// flush clears the state of the PeriodicDispatcher
func (p *PeriodicDispatch) flush() {
p.updateCh = make(chan struct{}, 1)
p.waitCh = make(chan struct{})
p.tracked = make(map[string]*structs.Job)
p.heap = NewPeriodicHeap()
p.stopFn = nil
}

// periodicHeap wraps a heap and gives operations other than Push/Pop.
Expand Down
26 changes: 25 additions & 1 deletion nomad/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func testPeriodicDispatcher() (*PeriodicDispatch, *MockJobEvalDispatcher) {
m := NewMockJobEvalDispatcher()
d := NewPeriodicDispatch(logger, m)
d.SetEnabled(true)
d.Start()
return d, m
}

Expand All @@ -97,6 +96,31 @@ func testPeriodicJob(times ...time.Time) *structs.Job {
return job
}

// TestPeriodicDispatch_SetEnabled test that setting enabled twice is a no-op.
// This tests the reported issue: https://github.com/hashicorp/nomad/issues/2829
func TestPeriodicDispatch_SetEnabled(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()

// SetEnabled has been called once but do it again.
p.SetEnabled(true)

// Now disable and make sure everything is fine.
p.SetEnabled(false)

// Enable and track something
p.SetEnabled(true)
job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}

tracked := p.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't track the job: %v", tracked)
}
}

func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
Expand Down

0 comments on commit d23b925

Please sign in to comment.