diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 236028c4b9a..7c313e8d01a 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -532,6 +532,7 @@ func TestJobEndpoint_GetJob(t *testing.T) { } job.CreateIndex = resp.JobModifyIndex job.ModifyIndex = resp.JobModifyIndex + job.JobModifyIndex = resp.JobModifyIndex // Lookup the job get := &structs.JobSpecificRequest{ diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 3408e0f972e..38b4a49af58 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -124,9 +124,10 @@ func Job() *structs.Job { Meta: map[string]string{ "owner": "armon", }, - Status: structs.JobStatusPending, - CreateIndex: 42, - ModifyIndex: 99, + Status: structs.JobStatusPending, + CreateIndex: 42, + ModifyIndex: 99, + JobModifyIndex: 99, } job.InitFields() return job diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 651e152ddf2..553bcbf0777 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -295,9 +295,26 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { if existing != nil { job.CreateIndex = existing.(*structs.Job).CreateIndex job.ModifyIndex = index + job.JobModifyIndex = index + + // Compute the job status + var err error + job.Status, err = s.getJobStatus(txn, job, false) + if err != nil { + return fmt.Errorf("setting job status for %q failed: %v", job.ID, err) + } } else { job.CreateIndex = index job.ModifyIndex = index + job.JobModifyIndex = index + + // If we are inserting the job for the first time, we don't need to + // calculate the jobs status as it is known. + if job.IsPeriodic() { + job.Status = structs.JobStatusRunning + } else { + job.Status = structs.JobStatusPending + } } // Insert the job @@ -524,11 +541,19 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro watcher.Add(watch.Item{Table: "evals"}) // Do a nested upsert + jobs := make(map[string]string, len(evals)) for _, eval := range evals { watcher.Add(watch.Item{Eval: eval.ID}) if err := s.nestedUpsertEval(txn, index, eval); err != nil { return err } + + jobs[eval.JobID] = "" + } + + // Set the job's status + if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil { + return fmt.Errorf("setting job status failed: %v", err) } txn.Defer(func() { s.watch.notify(watcher) }) @@ -571,6 +596,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e watcher.Add(watch.Item{Table: "evals"}) watcher.Add(watch.Item{Table: "allocs"}) + jobs := make(map[string]string, len(evals)) for _, eval := range evals { existing, err := txn.First("evals", "id", eval) if err != nil { @@ -583,6 +609,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e return fmt.Errorf("eval delete failed: %v", err) } watcher.Add(watch.Item{Eval: eval}) + jobs[existing.(*structs.Evaluation).JobID] = "" } for _, alloc := range allocs { @@ -611,6 +638,11 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e return fmt.Errorf("index update failed: %v", err) } + // Set the job's status + if err := s.setJobStatuses(index, watcher, txn, jobs, true); err != nil { + return fmt.Errorf("setting job status failed: %v", err) + } + txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil @@ -726,6 +758,16 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati return fmt.Errorf("index update failed: %v", err) } + // Set the job's status + forceStatus := "" + if !copyAlloc.TerminalStatus() { + forceStatus = structs.JobStatusRunning + } + jobs := map[string]string{alloc.JobID: forceStatus} + if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil { + return fmt.Errorf("setting job status failed: %v", err) + } + txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil @@ -741,6 +783,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er watcher.Add(watch.Item{Table: "allocs"}) // Handle the allocations + jobs := make(map[string]string, 1) for _, alloc := range allocs { existing, err := txn.First("allocs", "id", alloc.ID) if err != nil { @@ -761,6 +804,13 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er return fmt.Errorf("alloc insert failed: %v", err) } + // If the allocation is running, force the job to running status. + forceStatus := "" + if !alloc.TerminalStatus() { + forceStatus = structs.JobStatusRunning + } + jobs[alloc.JobID] = forceStatus + watcher.Add(watch.Item{Alloc: alloc.ID}) watcher.Add(watch.Item{AllocEval: alloc.EvalID}) watcher.Add(watch.Item{AllocJob: alloc.JobID}) @@ -772,6 +822,11 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er return fmt.Errorf("index update failed: %v", err) } + // Set the job's status + if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil { + return fmt.Errorf("setting job status failed: %v", err) + } + txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil @@ -906,6 +961,117 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) { return iter, nil } +// setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID. +// It takes a map of job IDs to an optional forceStatus string. It returns an +// error if the job doesn't exist or setJobStatus fails. +func (s *StateStore) setJobStatuses(index uint64, watcher watch.Items, txn *memdb.Txn, + jobs map[string]string, evalDelete bool) error { + for job, forceStatus := range jobs { + existing, err := txn.First("jobs", "id", job) + if err != nil { + return fmt.Errorf("job lookup failed: %v", err) + } + + if existing == nil { + continue + } + + if err := s.setJobStatus(index, watcher, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil { + return err + } + } + + return nil +} + +// setJobStatus sets the status of the job by looking up associated evaluations +// and allocations. evalDelete should be set to true if setJobStatus is being +// called because an evaluation is being deleted (potentially because of garbage +// collection). If forceStatus is non-empty, the job's status will be set to the +// passed status. +func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb.Txn, + job *structs.Job, evalDelete bool, forceStatus string) error { + + // Capture the current status so we can check if there is a change + oldStatus := job.Status + newStatus := forceStatus + + // If forceStatus is not set, compute the jobs status. + if forceStatus == "" { + var err error + newStatus, err = s.getJobStatus(txn, job, evalDelete) + if err != nil { + return err + } + } + + // Fast-path if nothing has changed. + if oldStatus == newStatus { + return nil + } + + // The job has changed, so add to watcher. + watcher.Add(watch.Item{Table: "jobs"}) + watcher.Add(watch.Item{Job: job.ID}) + + // Copy and update the existing job + updated := job.Copy() + updated.Status = newStatus + updated.ModifyIndex = index + + // Insert the job + if err := txn.Insert("jobs", updated); err != nil { + return fmt.Errorf("job insert failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + return nil +} + +func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete bool) (string, error) { + allocs, err := txn.Get("allocs", "job", job.ID) + if err != nil { + return "", err + } + + // If there is a non-terminal allocation, the job is running. + hasAlloc := false + for alloc := allocs.Next(); alloc != nil; alloc = allocs.Next() { + hasAlloc = true + if !alloc.(*structs.Allocation).TerminalStatus() { + return structs.JobStatusRunning, nil + } + } + + evals, err := txn.Get("evals", "job", job.ID) + if err != nil { + return "", err + } + + hasEval := false + for eval := evals.Next(); eval != nil; eval = evals.Next() { + hasEval = true + if !eval.(*structs.Evaluation).TerminalStatus() { + return structs.JobStatusPending, nil + } + } + + // The job is dead if all the allocations and evals are terminal or if there + // are no evals because of garbage collection. + if evalDelete || hasEval || hasAlloc { + return structs.JobStatusDead, nil + } + + // If there are no allocations or evaluations it is a new job. If the job is + // periodic, we mark it as running as it will never have an + // allocation/evaluation against it. + if job.IsPeriodic() { + return structs.JobStatusRunning, nil + } + return structs.JobStatusPending, nil +} + // StateSnapshot is used to provide a point-in-time snapshot type StateSnapshot struct { StateStore diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 5f10d7173ee..ff7b690f8a4 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1748,6 +1748,222 @@ func TestStateStore_RestoreAlloc(t *testing.T) { notify.verify(t) } +func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) { + state := testStateStore(t) + watcher := watch.NewItems() + txn := state.db.Txn(true) + + // Create and insert a mock job. + job := mock.Job() + job.Status = "" + job.ModifyIndex = 0 + if err := txn.Insert("jobs", job); err != nil { + t.Fatalf("job insert failed: %v", err) + } + + exp := "foobar" + index := uint64(1000) + if err := state.setJobStatus(index, watcher, txn, job, false, exp); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + i, err := txn.First("jobs", "id", job.ID) + if err != nil { + t.Fatalf("job lookup failed: %v", err) + } + updated := i.(*structs.Job) + + if updated.Status != exp { + t.Fatalf("setJobStatus() set %v; expected %v", updated.Status, exp) + } + + if updated.ModifyIndex != index { + t.Fatalf("setJobStatus() set %d; expected %d", updated.ModifyIndex, index) + } +} + +func TestStateStore_SetJobStatus_NoOp(t *testing.T) { + state := testStateStore(t) + watcher := watch.NewItems() + txn := state.db.Txn(true) + + // Create and insert a mock job that should be pending. + job := mock.Job() + job.Status = structs.JobStatusPending + job.ModifyIndex = 10 + if err := txn.Insert("jobs", job); err != nil { + t.Fatalf("job insert failed: %v", err) + } + + index := uint64(1000) + if err := state.setJobStatus(index, watcher, txn, job, false, ""); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + i, err := txn.First("jobs", "id", job.ID) + if err != nil { + t.Fatalf("job lookup failed: %v", err) + } + updated := i.(*structs.Job) + + if updated.ModifyIndex == index { + t.Fatalf("setJobStatus() should have been a no-op") + } +} + +func TestStateStore_SetJobStatus(t *testing.T) { + state := testStateStore(t) + watcher := watch.NewItems() + txn := state.db.Txn(true) + + // Create and insert a mock job that should be pending but has an incorrect + // status. + job := mock.Job() + job.Status = "foobar" + job.ModifyIndex = 10 + if err := txn.Insert("jobs", job); err != nil { + t.Fatalf("job insert failed: %v", err) + } + + index := uint64(1000) + if err := state.setJobStatus(index, watcher, txn, job, false, ""); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + i, err := txn.First("jobs", "id", job.ID) + if err != nil { + t.Fatalf("job lookup failed: %v", err) + } + updated := i.(*structs.Job) + + if updated.Status != structs.JobStatusPending { + t.Fatalf("setJobStatus() set %v; expected %v", updated.Status, structs.JobStatusPending) + } + + if updated.ModifyIndex != index { + t.Fatalf("setJobStatus() set %d; expected %d", updated.ModifyIndex, index) + } +} + +func TestStateStore_GetJobStatus_NoEvalsOrAllocs(t *testing.T) { + job := mock.Job() + state := testStateStore(t) + txn := state.db.Txn(false) + status, err := state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusPending { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusPending) + } +} + +func TestStateStore_GetJobStatus_NoEvalsOrAllocs_Periodic(t *testing.T) { + job := mock.PeriodicJob() + state := testStateStore(t) + txn := state.db.Txn(false) + status, err := state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusRunning { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning) + } +} + +func TestStateStore_GetJobStatus_NoEvalsOrAllocs_EvalDelete(t *testing.T) { + job := mock.Job() + state := testStateStore(t) + txn := state.db.Txn(false) + status, err := state.getJobStatus(txn, job, true) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusDead { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead) + } +} + +func TestStateStore_GetJobStatus_DeadEvalsAndAllocs(t *testing.T) { + state := testStateStore(t) + job := mock.Job() + + // Create a mock alloc that is dead. + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.DesiredStatus = structs.AllocDesiredStatusFailed + if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + // Create a mock eval that is complete + eval := mock.Eval() + eval.JobID = job.ID + eval.Status = structs.EvalStatusComplete + if err := state.UpsertEvals(1001, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + + txn := state.db.Txn(false) + status, err := state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusDead { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead) + } +} + +func TestStateStore_GetJobStatus_RunningAlloc(t *testing.T) { + state := testStateStore(t) + job := mock.Job() + + // Create a mock alloc that is running. + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.DesiredStatus = structs.AllocDesiredStatusRun + if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + txn := state.db.Txn(false) + status, err := state.getJobStatus(txn, job, true) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusRunning { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning) + } +} + +func TestStateStore_SetJobStatus_PendingEval(t *testing.T) { + state := testStateStore(t) + job := mock.Job() + + // Create a mock eval that is pending. + eval := mock.Eval() + eval.JobID = job.ID + eval.Status = structs.EvalStatusPending + if err := state.UpsertEvals(1000, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + + txn := state.db.Txn(false) + status, err := state.getJobStatus(txn, job, true) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusPending { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusPending) + } +} + func TestStateWatch_watch(t *testing.T) { sw := newStateWatch() notify1 := make(chan struct{}, 1) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7970f43fe65..68b7e6e8d42 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -689,10 +689,9 @@ const ( ) const ( - JobStatusPending = "pending" // Pending means the job is waiting on scheduling - JobStatusRunning = "running" // Running means the entire job is running - JobStatusComplete = "complete" // Complete means there was a clean termination - JobStatusDead = "dead" // Dead means there was abnormal termination + JobStatusPending = "pending" // Pending means the job is waiting on scheduling + JobStatusRunning = "running" // Running means the job has non-terminal allocations + JobStatusDead = "dead" // Dead means all evaluation's and allocations are terminal ) const ( @@ -778,8 +777,9 @@ type Job struct { StatusDescription string // Raft Indexes - CreateIndex uint64 - ModifyIndex uint64 + CreateIndex uint64 + ModifyIndex uint64 + JobModifyIndex uint64 } // InitFields is used to initialize fields in the Job. This should be called diff --git a/scheduler/util.go b/scheduler/util.go index f448687c5ec..29fed93427f 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -92,10 +92,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, } // If the definition is updated we need to update - // XXX: This is an extremely conservative approach. We can check - // if the job definition has changed in a way that affects - // this allocation and potentially ignore it. - if job.ModifyIndex != exist.Job.ModifyIndex { + if job.JobModifyIndex != exist.Job.JobModifyIndex { result.update = append(result.update, allocTuple{ Name: name, TaskGroup: tg, diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 7b213da77b5..3ee813090f8 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -38,7 +38,7 @@ func TestDiffAllocs(t *testing.T) { // The "old" job has a previous modify index oldJob := new(structs.Job) *oldJob = *job - oldJob.ModifyIndex -= 1 + oldJob.JobModifyIndex -= 1 tainted := map[string]bool{ "dead": true, @@ -119,7 +119,7 @@ func TestDiffSystemAllocs(t *testing.T) { // The "old" job has a previous modify index oldJob := new(structs.Job) *oldJob = *job - oldJob.ModifyIndex -= 1 + oldJob.JobModifyIndex -= 1 tainted := map[string]bool{ "dead": true,