Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix restoration of stopped periodic jobs #3201

Merged
merged 1 commit into from
Sep 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// We always add the job to the periodic dispatcher because there is the
// possibility that the periodic spec was removed and then we should stop
// tracking it.
if err := n.periodicDispatcher.Add(req.Job); err != nil {
added, err := n.periodicDispatcher.Add(req.Job)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err)
return err
}
Expand All @@ -338,7 +339,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// is added to when it was suppose to launch, leader election occurs and the
// job was not launched. In this case, we use the insertion time to
// determine if a launch was missed.
if req.Job.IsPeriodic() {
if added {
prevLaunch, err := n.state.PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: PeriodicLaunchByID failed: %v", err)
Expand Down
11 changes: 10 additions & 1 deletion nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,16 @@ func (s *Server) restorePeriodicDispatcher() error {
continue
}

s.periodicDispatcher.Add(job)
added, err := s.periodicDispatcher.Add(job)
if err != nil {
return err
}

// We did not add the job to the tracker, this can be for a variety of
// reasons, but it means that we do not need to force run it.
if !added {
continue
}

// If the periodic job has never been launched before, launch will hold
// the time the periodic job was added. Otherwise it has the last launch
Expand Down
26 changes: 11 additions & 15 deletions nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,18 +186,20 @@ func (p *PeriodicDispatch) Tracked() []*structs.Job {
}

// Add begins tracking of a periodic job. If it is already tracked, it acts as
// an update to the jobs periodic spec.
func (p *PeriodicDispatch) Add(job *structs.Job) error {
// an update to the jobs periodic spec. The method returns whether the job was
// added and any error that may have occured.
func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
p.l.Lock()
defer p.l.Unlock()

// Do nothing if not enabled
if !p.enabled {
return nil
return false, nil
}

// If we were tracking a job and it has been disabled or made non-periodic remove it.
disabled := !job.IsPeriodic() || !job.Periodic.Enabled
// If we were tracking a job and it has been disabled, made non-periodic,
// stopped or is parameterized, remove it
disabled := !job.IsPeriodic() || !job.Periodic.Enabled || job.Stopped() || job.IsParameterized()

tuple := structs.NamespacedID{
ID: job.ID,
Expand All @@ -210,26 +212,20 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {
}

// If the job is disabled and we aren't tracking it, do nothing.
return nil
}

// Check if the job is also a parameterized job. If it is, then we do not want to
// treat it as a periodic job but only its dispatched children.
if job.IsParameterized() {
return nil
return false, nil
}

// Add or update the job.
p.tracked[tuple] = job
next := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
if tracked {
if err := p.heap.Update(job, next); err != nil {
return fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
return false, fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: updated periodic job %q (%s)", job.ID, job.Namespace)
} else {
if err := p.heap.Push(job, next); err != nil {
return fmt.Errorf("failed to add job %v: %v", job.ID, err)
return false, fmt.Errorf("failed to add job %v: %v", job.ID, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: registered periodic job %q (%s)", job.ID, job.Namespace)
}
Expand All @@ -240,7 +236,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {
default:
}

return nil
return true, nil
}

// Remove stops tracking the passed job. If the job is not tracked, it is a
Expand Down
102 changes: 63 additions & 39 deletions nomad/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func TestPeriodicDispatch_SetEnabled(t *testing.T) {
// Enable and track something
p.SetEnabled(true)
job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
}

tracked := p.Tracked()
Expand All @@ -130,8 +130,10 @@ func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.Job()
if err := p.Add(job); err != nil {
if added, err := p.Add(job); err != nil {
t.Fatalf("Add of non-periodic job failed: %v; expect no-op", err)
} else if added {
t.Fatalf("Add of non-periodic job happened, expect no-op")
}

tracked := p.Tracked()
Expand All @@ -145,8 +147,23 @@ func TestPeriodicDispatch_Add_Periodic_Parameterized(t *testing.T) {
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
if err := p.Add(job); err != nil {
t.Fatalf("Add of periodic parameterized job failed: %v; expect no-op", err)
if added, err := p.Add(job); err != nil || added {
t.Fatalf("Add of periodic parameterized job failed: %v %v", added, err)
}

tracked := p.Tracked()
if len(tracked) != 0 {
t.Fatalf("Add of periodic parameterized job should be no-op: %v", tracked)
}
}

func TestPeriodicDispatch_Add_Periodic_Stopped(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
job.Stop = true
if added, err := p.Add(job); err != nil || added {
t.Fatalf("Add of stopped periodic job failed: %v %v", added, err)
}

tracked := p.Tracked()
Expand All @@ -159,8 +176,8 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
}

tracked := p.Tracked()
Expand All @@ -170,8 +187,8 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {

// Update the job and add it again.
job.Periodic.Spec = "foo"
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

tracked = p.Tracked()
Expand All @@ -191,8 +208,15 @@ func TestPeriodicDispatch_Add_Remove_Namespaced(t *testing.T) {
job := mock.PeriodicJob()
job2 := mock.PeriodicJob()
job2.Namespace = "test"
assert.Nil(p.Add(job))
assert.Nil(p.Add(job2))

added, err := p.Add(job)
assert.Nil(err)
assert.True(added)

added, err = p.Add(job2)
assert.Nil(err)
assert.True(added)

assert.Len(p.Tracked(), 2)

assert.Nil(p.Remove(job2.Namespace, job2.ID))
Expand All @@ -204,8 +228,8 @@ func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

tracked := p.Tracked()
Expand All @@ -215,8 +239,8 @@ func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) {

// Update the job to be non-periodic and add it again.
job.Periodic = nil
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || added {
t.Fatalf("Add failed %v", added, err)
}

tracked = p.Tracked()
Expand All @@ -233,15 +257,15 @@ func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) {
job := testPeriodicJob(time.Now().Add(10 * time.Second))

// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

// Update it to be sooner and re-add.
expected := time.Now().Round(1 * time.Second).Add(1 * time.Second)
job.Periodic.Spec = fmt.Sprintf("%d", expected.Unix())
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

// Check that nothing is created.
Expand Down Expand Up @@ -281,8 +305,8 @@ func TestPeriodicDispatch_Remove_Tracked(t *testing.T) {
p, _ := testPeriodicDispatcher()

job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

tracked := p.Tracked()
Expand All @@ -308,8 +332,8 @@ func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) {
job := testPeriodicJob(time.Now().Add(1 * time.Second))

// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

// Remove the job.
Expand Down Expand Up @@ -347,8 +371,8 @@ func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) {
job := testPeriodicJob(time.Now().Add(10 * time.Second))

// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

// ForceRun the job
Expand Down Expand Up @@ -379,8 +403,8 @@ func TestPeriodicDispatch_Run_DisallowOverlaps(t *testing.T) {
job.Periodic.ProhibitOverlap = true

// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

time.Sleep(3 * time.Second)
Expand Down Expand Up @@ -408,8 +432,8 @@ func TestPeriodicDispatch_Run_Multiple(t *testing.T) {
job := testPeriodicJob(launch1, launch2)

// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

time.Sleep(3 * time.Second)
Expand Down Expand Up @@ -440,11 +464,11 @@ func TestPeriodicDispatch_Run_SameTime(t *testing.T) {
job2 := testPeriodicJob(launch)

// Add them.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}
if err := p.Add(job2); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job2); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

if l := len(p.Tracked()); l != 2 {
Expand Down Expand Up @@ -480,11 +504,11 @@ func TestPeriodicDispatch_Run_SameID_Different_Namespace(t *testing.T) {
job2.Namespace = "test"

// Add them.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}
if err := p.Add(job2); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job2); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}

if l := len(p.Tracked()); l != 2 {
Expand Down Expand Up @@ -560,8 +584,8 @@ func TestPeriodicDispatch_Complex(t *testing.T) {
shuffle(toDelete)

for _, job := range jobs {
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v", added, err)
}
}

Expand Down