Skip to content

Commit

Permalink
Merge pull request #3201 from hashicorp/b-periodic-restore
Browse files Browse the repository at this point in the history
Fix restoration of stopped periodic jobs
  • Loading branch information
dadgar authored Sep 13, 2017
2 parents dc9f3f1 + 0a078d3 commit 84c187f
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 57 deletions.
5 changes: 3 additions & 2 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// We always add the job to the periodic dispatcher because there is the
// possibility that the periodic spec was removed and then we should stop
// tracking it.
if err := n.periodicDispatcher.Add(req.Job); err != nil {
added, err := n.periodicDispatcher.Add(req.Job)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err)
return err
}
Expand All @@ -338,7 +339,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// is added to when it was suppose to launch, leader election occurs and the
// job was not launched. In this case, we use the insertion time to
// determine if a launch was missed.
if req.Job.IsPeriodic() {
if added {
prevLaunch, err := n.state.PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: PeriodicLaunchByID failed: %v", err)
Expand Down
11 changes: 10 additions & 1 deletion nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,16 @@ func (s *Server) restorePeriodicDispatcher() error {
continue
}

s.periodicDispatcher.Add(job)
added, err := s.periodicDispatcher.Add(job)
if err != nil {
return err
}

// We did not add the job to the tracker, this can be for a variety of
// reasons, but it means that we do not need to force run it.
if !added {
continue
}

// If the periodic job has never been launched before, launch will hold
// the time the periodic job was added. Otherwise it has the last launch
Expand Down
26 changes: 11 additions & 15 deletions nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,18 +186,20 @@ func (p *PeriodicDispatch) Tracked() []*structs.Job {
}

// Add begins tracking of a periodic job. If it is already tracked, it acts as
// an update to the jobs periodic spec.
func (p *PeriodicDispatch) Add(job *structs.Job) error {
// an update to the jobs periodic spec. The method returns whether the job was
// added and any error that may have occured.
func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
p.l.Lock()
defer p.l.Unlock()

// Do nothing if not enabled
if !p.enabled {
return nil
return false, nil
}

// If we were tracking a job and it has been disabled or made non-periodic remove it.
disabled := !job.IsPeriodic() || !job.Periodic.Enabled
// If we were tracking a job and it has been disabled, made non-periodic,
// stopped or is parameterized, remove it
disabled := !job.IsPeriodic() || !job.Periodic.Enabled || job.Stopped() || job.IsParameterized()

tuple := structs.NamespacedID{
ID: job.ID,
Expand All @@ -210,26 +212,20 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {
}

// If the job is disabled and we aren't tracking it, do nothing.
return nil
}

// Check if the job is also a parameterized job. If it is, then we do not want to
// treat it as a periodic job but only its dispatched children.
if job.IsParameterized() {
return nil
return false, nil
}

// Add or update the job.
p.tracked[tuple] = job
next := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
if tracked {
if err := p.heap.Update(job, next); err != nil {
return fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
return false, fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: updated periodic job %q (%s)", job.ID, job.Namespace)
} else {
if err := p.heap.Push(job, next); err != nil {
return fmt.Errorf("failed to add job %v: %v", job.ID, err)
return false, fmt.Errorf("failed to add job %v: %v", job.ID, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: registered periodic job %q (%s)", job.ID, job.Namespace)
}
Expand All @@ -240,7 +236,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {
default:
}

return nil
return true, nil
}

// Remove stops tracking the passed job. If the job is not tracked, it is a
Expand Down
102 changes: 63 additions & 39 deletions nomad/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func TestPeriodicDispatch_SetEnabled(t *testing.T) {
// Enable and track something
p.SetEnabled(true)
job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
}

tracked := p.Tracked()
Expand All @@ -130,8 +130,10 @@ func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.Job()
if err := p.Add(job); err != nil {
if added, err := p.Add(job); err != nil {
t.Fatalf("Add of non-periodic job failed: %v; expect no-op", err)
} else if added {
t.Fatalf("Add of non-periodic job happened, expect no-op")
}

tracked := p.Tracked()
Expand All @@ -145,8 +147,23 @@ func TestPeriodicDispatch_Add_Periodic_Parameterized(t *testing.T) {
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
if err := p.Add(job); err != nil {
t.Fatalf("Add of periodic parameterized job failed: %v; expect no-op", err)
if added, err := p.Add(job); err != nil || added {
t.Fatalf("Add of periodic parameterized job failed: %v %v", added, err)
}

tracked := p.Tracked()
if len(tracked) != 0 {
t.Fatalf("Add of periodic parameterized job should be no-op: %v", tracked)
}
}

func TestPeriodicDispatch_Add_Periodic_Stopped(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
job.Stop = true
if added, err := p.Add(job); err != nil || added {
t.Fatalf("Add of stopped periodic job failed: %v %v", added, err)
}

tracked := p.Tracked()
Expand All @@ -159,8 +176,8 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
}

tracked := p.Tracked()
Expand All @@ -170,8 +187,8 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {

// Update the job and add it again.
job.Periodic.Spec = "foo"
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

tracked = p.Tracked()
Expand All @@ -191,8 +208,15 @@ func TestPeriodicDispatch_Add_Remove_Namespaced(t *testing.T) {
job := mock.PeriodicJob()
job2 := mock.PeriodicJob()
job2.Namespace = "test"
assert.Nil(p.Add(job))
assert.Nil(p.Add(job2))

added, err := p.Add(job)
assert.Nil(err)
assert.True(added)

added, err = p.Add(job2)
assert.Nil(err)
assert.True(added)

assert.Len(p.Tracked(), 2)

assert.Nil(p.Remove(job2.Namespace, job2.ID))
Expand All @@ -204,8 +228,8 @@ func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

tracked := p.Tracked()
Expand All @@ -215,8 +239,8 @@ func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) {

// Update the job to be non-periodic and add it again.
job.Periodic = nil
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || added {
t.Fatalf("Add failed %v", added, err)
}

tracked = p.Tracked()
Expand All @@ -233,15 +257,15 @@ func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) {
job := testPeriodicJob(time.Now().Add(10 * time.Second))

// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

// Update it to be sooner and re-add.
expected := time.Now().Round(1 * time.Second).Add(1 * time.Second)
job.Periodic.Spec = fmt.Sprintf("%d", expected.Unix())
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

// Check that nothing is created.
Expand Down Expand Up @@ -281,8 +305,8 @@ func TestPeriodicDispatch_Remove_Tracked(t *testing.T) {
p, _ := testPeriodicDispatcher()

job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

tracked := p.Tracked()
Expand All @@ -308,8 +332,8 @@ func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) {
job := testPeriodicJob(time.Now().Add(1 * time.Second))

// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

// Remove the job.
Expand Down Expand Up @@ -347,8 +371,8 @@ func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) {
job := testPeriodicJob(time.Now().Add(10 * time.Second))

// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

// ForceRun the job
Expand Down Expand Up @@ -379,8 +403,8 @@ func TestPeriodicDispatch_Run_DisallowOverlaps(t *testing.T) {
job.Periodic.ProhibitOverlap = true

// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

time.Sleep(3 * time.Second)
Expand Down Expand Up @@ -408,8 +432,8 @@ func TestPeriodicDispatch_Run_Multiple(t *testing.T) {
job := testPeriodicJob(launch1, launch2)

// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

time.Sleep(3 * time.Second)
Expand Down Expand Up @@ -440,11 +464,11 @@ func TestPeriodicDispatch_Run_SameTime(t *testing.T) {
job2 := testPeriodicJob(launch)

// Add them.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}
if err := p.Add(job2); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job2); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

if l := len(p.Tracked()); l != 2 {
Expand Down Expand Up @@ -480,11 +504,11 @@ func TestPeriodicDispatch_Run_SameID_Different_Namespace(t *testing.T) {
job2.Namespace = "test"

// Add them.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}
if err := p.Add(job2); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job2); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

if l := len(p.Tracked()); l != 2 {
Expand Down Expand Up @@ -560,8 +584,8 @@ func TestPeriodicDispatch_Complex(t *testing.T) {
shuffle(toDelete)

for _, job := range jobs {
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}
}

Expand Down

0 comments on commit 84c187f

Please sign in to comment.