Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track multiple job versions and introduce a stop state for jobs #2566

Merged
merged 12 commits into from
Apr 19, 2017
Prev Previous commit
Next Next commit
GC and some fixes
  • Loading branch information
dadgar committed Apr 16, 2017
commit 5e1e7afc62e5be3a9de6117d76de362f1bcd22a0
18 changes: 11 additions & 7 deletions command/status.go
Original file line number Diff line number Diff line change
@@ -147,13 +147,17 @@ func (c *StatusCommand) Run(args []string) int {
}

if periodic && !parameterized {
location, err := job.Periodic.GetLocation()
if err == nil {
now := time.Now().In(location)
next := job.Periodic.Next(now)
basic = append(basic, fmt.Sprintf("Next Periodic Launch|%s",
fmt.Sprintf("%s (%s from now)",
formatTime(next), formatTimeDifference(now, next, time.Second))))
if *job.Stop {
basic = append(basic, fmt.Sprintf("Next Periodic Launch|none (job stopped)"))
} else {
location, err := job.Periodic.GetLocation()
if err == nil {
now := time.Now().In(location)
next := job.Periodic.Next(now)
basic = append(basic, fmt.Sprintf("Next Periodic Launch|%s",
fmt.Sprintf("%s (%s from now)",
formatTime(next), formatTimeDifference(now, next, time.Second))))
}
}
}

18 changes: 17 additions & 1 deletion nomad/core_sched.go
Original file line number Diff line number Diff line change
@@ -149,6 +149,7 @@ OUTER:
for _, job := range gcJob {
req := structs.JobDeregisterRequest{
JobID: job,
Purge: true,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
},
@@ -243,9 +244,24 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
return false, nil, err
}

// Can collect if:
// Job doesn't exist
// Job is Stopped and dead
// allowBatch and the job is dead
collect := false
if job == nil {
collect = true
} else if job.Status != structs.JobStatusDead {
collect = false
} else if job.Stop {
collect = true
} else if allowBatch {
collect = true
}

// We don't want to gc anything related to a job which is not dead
// If the batch job doesn't exist we can GC it regardless of allowBatch
if job != nil && (!allowBatch || job.Status != structs.JobStatusDead) {
if !collect {
return false, nil, nil
}
}
204 changes: 196 additions & 8 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
@@ -1006,6 +1006,103 @@ func TestCoreScheduler_JobGC_OneShot(t *testing.T) {
}
}

// This test ensures that stopped jobs are GCd
func TestCoreScheduler_JobGC_Stopped(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)

// Insert job.
state := s1.fsm.State()
job := mock.Job()
//job.Status = structs.JobStatusDead
job.Stop = true
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert two complete evals
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = structs.EvalStatusComplete

eval2 := mock.Eval()
eval2.JobID = job.ID
eval2.Status = structs.EvalStatusComplete

err = state.UpsertEvals(1001, []*structs.Evaluation{eval, eval2})
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert one complete alloc
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop

err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}

// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold))

// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// Shouldn't still exist
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}

outE, err := state.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE != nil {
t.Fatalf("bad: %v", outE)
}

outE2, err := state.EvalByID(ws, eval2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE2 != nil {
t.Fatalf("bad: %v", outE2)
}

outA, err := state.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA != nil {
t.Fatalf("bad: %v", outA)
}
}

func TestCoreScheduler_JobGC_Force(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
@@ -1066,8 +1163,8 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) {
}
}

// This test ensures parameterized and periodic jobs don't get GCd
func TestCoreScheduler_JobGC_NonGCable(t *testing.T) {
// This test ensures parameterized jobs only get gc'd when stopped
func TestCoreScheduler_JobGC_Parameterized(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
@@ -1088,9 +1185,77 @@ func TestCoreScheduler_JobGC_NonGCable(t *testing.T) {
t.Fatalf("err: %v", err)
}

// Insert a periodic job.
job2 := mock.PeriodicJob()
if err := state.UpsertJob(1001, job2); err != nil {
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC, 1002)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// Should still exist
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}

// Mark the job as stopped and try again
job2 := job.Copy()
job2.Stop = true
err = state.UpsertJob(2000, job2)
if err != nil {
t.Fatalf("err: %v", err)
}

// Create a core scheduler
snap, err = state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core = NewCoreScheduler(s1, snap)

// Attempt the GC
gc = s1.coreJobEval(structs.CoreJobForceGC, 2002)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// Should not exist
out, err = state.JobByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %+v", out)
}
}

// This test ensures periodic jobs don't get GCd til they are stopped
func TestCoreScheduler_JobGC_Periodic(t *testing.T) {

s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)

// Insert a parameterized job.
state := s1.fsm.State()
job := mock.PeriodicJob()
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}

@@ -1118,12 +1283,35 @@ func TestCoreScheduler_JobGC_NonGCable(t *testing.T) {
t.Fatalf("bad: %v", out)
}

outE, err := state.JobByID(ws, job2.ID)
// Mark the job as stopped and try again
job2 := job.Copy()
job2.Stop = true
err = state.UpsertJob(2000, job2)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE == nil {
t.Fatalf("bad: %v", outE)

// Create a core scheduler
snap, err = state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core = NewCoreScheduler(s1, snap)

// Attempt the GC
gc = s1.coreJobEval(structs.CoreJobForceGC, 2002)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// Should not exist
out, err = state.JobByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %+v", out)
}
}

4 changes: 4 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
@@ -846,6 +846,10 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
return fmt.Errorf("Specified job %q is not a parameterized job", args.JobID)
}

if parameterizedJob.Stop {
return fmt.Errorf("Specified job %q is stopped", args.JobID)
}

// Validate the arguments
if err := validateDispatchRequest(args, parameterizedJob); err != nil {
return err
Loading