Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix restoration of parameterized, periodic jobs #2959

Merged
merged 2 commits into from
Aug 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ RECONCILE:
// Check if we need to handle initial leadership actions
if !establishedLeader {
if err := s.establishLeadership(stopCh); err != nil {
s.logger.Printf("[ERR] nomad: failed to establish leadership: %v",
err)
s.logger.Printf("[ERR] nomad: failed to establish leadership: %v", err)
goto WAIT
}
establishedLeader = true
Expand Down Expand Up @@ -149,7 +148,6 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {

// Enable the periodic dispatcher, since we are now the leader.
s.periodicDispatcher.SetEnabled(true)
s.periodicDispatcher.Start()

// Restore the periodic dispatcher state
if err := s.restorePeriodicDispatcher(); err != nil {
Expand Down Expand Up @@ -288,6 +286,13 @@ func (s *Server) restorePeriodicDispatcher() error {
now := time.Now()
for i := iter.Next(); i != nil; i = iter.Next() {
job := i.(*structs.Job)

// We skip adding parameterized jobs because they themselves aren't
// tracked, only the dispatched children are.
if job.IsParameterized() {
continue
}

s.periodicDispatcher.Add(job)

// If the periodic job has never been launched before, launch will hold
Expand Down
24 changes: 16 additions & 8 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,12 @@ func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) {
t.Fatalf("Should have a leader")
}

// Inject a periodic job and non-periodic job
// Inject a periodic job, a parameterized periodic job and a non-periodic job
periodic := mock.PeriodicJob()
nonPeriodic := mock.Job()
for _, job := range []*structs.Job{nonPeriodic, periodic} {
parameterizedPeriodic := mock.PeriodicJob()
parameterizedPeriodic.ParameterizedJob = &structs.ParameterizedJobConfig{}
for _, job := range []*structs.Job{nonPeriodic, periodic, parameterizedPeriodic} {
req := structs.JobRegisterRequest{
Job: job,
}
Expand Down Expand Up @@ -359,12 +361,20 @@ func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) {
t.Fatalf("should have leader")
})

// Check that the new leader is tracking the periodic job.
// Check that the new leader is tracking the periodic job only
testutil.WaitForResult(func() (bool, error) {
_, tracked := leader.periodicDispatcher.tracked[periodic.ID]
return tracked, nil
if _, tracked := leader.periodicDispatcher.tracked[periodic.ID]; !tracked {
return false, fmt.Errorf("periodic job not tracked")
}
if _, tracked := leader.periodicDispatcher.tracked[nonPeriodic.ID]; tracked {
return false, fmt.Errorf("non periodic job tracked")
}
if _, tracked := leader.periodicDispatcher.tracked[parameterizedPeriodic.ID]; tracked {
return false, fmt.Errorf("parameterized periodic job tracked")
}
return true, nil
}, func(err error) {
t.Fatalf("periodic job not tracked")
t.Fatalf(err.Error())
})
}

Expand Down Expand Up @@ -398,7 +408,6 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
s1.periodicDispatcher.Start()
s1.restorePeriodicDispatcher()

// Ensure the job is tracked.
Expand Down Expand Up @@ -450,7 +459,6 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
s1.periodicDispatcher.Start()
s1.restorePeriodicDispatcher()

// Ensure the job is tracked.
Expand Down
68 changes: 27 additions & 41 deletions nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"container/heap"
"context"
"fmt"
"log"
"strconv"
Expand All @@ -19,14 +20,12 @@ import (
type PeriodicDispatch struct {
dispatcher JobEvalDispatcher
enabled bool
running bool

tracked map[string]*structs.Job
heap *periodicHeap

updateCh chan struct{}
stopCh chan struct{}
waitCh chan struct{}
stopFn context.CancelFunc
logger *log.Logger
l sync.RWMutex
}
Expand Down Expand Up @@ -141,8 +140,6 @@ func NewPeriodicDispatch(logger *log.Logger, dispatcher JobEvalDispatcher) *Peri
tracked: make(map[string]*structs.Job),
heap: NewPeriodicHeap(),
updateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
waitCh: make(chan struct{}),
logger: logger,
}
}
Expand All @@ -152,24 +149,21 @@ func NewPeriodicDispatch(logger *log.Logger, dispatcher JobEvalDispatcher) *Peri
// will stop any launched go routine and flush the dispatcher.
func (p *PeriodicDispatch) SetEnabled(enabled bool) {
p.l.Lock()
defer p.l.Unlock()
wasRunning := p.enabled
p.enabled = enabled
p.l.Unlock()
if !enabled {
if p.running {
close(p.stopCh)
<-p.waitCh
p.running = false
}
p.Flush()
}
}

// Start begins the goroutine that creates derived jobs and evals.
func (p *PeriodicDispatch) Start() {
p.l.Lock()
p.running = true
p.l.Unlock()
go p.run()
// If we are transistioning from enabled to disabled, stop the daemon and
// flush.
if !enabled && wasRunning {
p.stopFn()
p.flush()
} else if enabled && !wasRunning {
// If we are transitioning from disabled to enabled, run the daemon.
ctx, cancel := context.WithCancel(context.Background())
p.stopFn = cancel
go p.run(ctx)
}
}

// Tracked returns the set of tracked job IDs.
Expand Down Expand Up @@ -230,11 +224,9 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {
}

// Signal an update.
if p.running {
select {
case p.updateCh <- struct{}{}:
default:
}
select {
case p.updateCh <- struct{}{}:
default:
}

return nil
Expand Down Expand Up @@ -267,11 +259,9 @@ func (p *PeriodicDispatch) removeLocked(jobID string) error {
}

// Signal an update.
if p.running {
select {
case p.updateCh <- struct{}{}:
default:
}
select {
case p.updateCh <- struct{}{}:
default:
}

p.logger.Printf("[DEBUG] nomad.periodic: deregistered periodic job %q", jobID)
Expand Down Expand Up @@ -303,13 +293,12 @@ func (p *PeriodicDispatch) ForceRun(jobID string) (*structs.Evaluation, error) {
func (p *PeriodicDispatch) shouldRun() bool {
p.l.RLock()
defer p.l.RUnlock()
return p.enabled && p.running
return p.enabled
}

// run is a long-lived function that waits till a job's periodic spec is met and
// then creates an evaluation to run the job.
func (p *PeriodicDispatch) run() {
defer close(p.waitCh)
func (p *PeriodicDispatch) run(ctx context.Context) {
var launchCh <-chan time.Time
for p.shouldRun() {
job, launch := p.nextLaunch()
Expand All @@ -322,7 +311,7 @@ func (p *PeriodicDispatch) run() {
}

select {
case <-p.stopCh:
case <-ctx.Done():
return
case <-p.updateCh:
continue
Expand Down Expand Up @@ -453,15 +442,12 @@ func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) {
return time.Unix(int64(launch), 0), nil
}

// Flush clears the state of the PeriodicDispatcher
func (p *PeriodicDispatch) Flush() {
p.l.Lock()
defer p.l.Unlock()
p.stopCh = make(chan struct{})
// flush clears the state of the PeriodicDispatcher
func (p *PeriodicDispatch) flush() {
p.updateCh = make(chan struct{}, 1)
p.waitCh = make(chan struct{})
p.tracked = make(map[string]*structs.Job)
p.heap = NewPeriodicHeap()
p.stopFn = nil
}

// periodicHeap wraps a heap and gives operations other than Push/Pop.
Expand Down
26 changes: 25 additions & 1 deletion nomad/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func testPeriodicDispatcher() (*PeriodicDispatch, *MockJobEvalDispatcher) {
m := NewMockJobEvalDispatcher()
d := NewPeriodicDispatch(logger, m)
d.SetEnabled(true)
d.Start()
return d, m
}

Expand All @@ -97,6 +96,31 @@ func testPeriodicJob(times ...time.Time) *structs.Job {
return job
}

// TestPeriodicDispatch_SetEnabled test that setting enabled twice is a no-op.
// This tests the reported issue: https://github.com/hashicorp/nomad/issues/2829
func TestPeriodicDispatch_SetEnabled(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()

// SetEnabled has been called once but do it again.
p.SetEnabled(true)

// Now disable and make sure everything is fine.
p.SetEnabled(false)

// Enable and track something
p.SetEnabled(true)
job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}

tracked := p.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't track the job: %v", tracked)
}
}

func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
Expand Down