Skip to content

Commit

Permalink
Set job status in state store
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Jan 12, 2016
1 parent 815761b commit 894b3e3
Show file tree
Hide file tree
Showing 2 changed files with 293 additions and 0 deletions.
138 changes: 138 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,24 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
}

// Setup the indexes correctly
forceStatus := ""
if existing != nil {
job.CreateIndex = existing.(*structs.Job).CreateIndex
job.ModifyIndex = index
} else {
job.CreateIndex = index
job.ModifyIndex = index

if job.IsPeriodic() {
forceStatus = structs.JobStatusRunning
} else {
forceStatus = structs.JobStatusPending
}
}

// Set the job's status
if err := s.setJobStatus(watcher, txn, job, false, forceStatus); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

// Insert the job
Expand Down Expand Up @@ -524,11 +536,19 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
watcher.Add(watch.Item{Table: "evals"})

// Do a nested upsert
jobs := make(map[string]string, len(evals))
for _, eval := range evals {
watcher.Add(watch.Item{Eval: eval.ID})
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
return err
}

jobs[eval.JobID] = ""
}

// Set the job's status
if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

txn.Defer(func() { s.watch.notify(watcher) })
Expand Down Expand Up @@ -571,6 +591,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
watcher.Add(watch.Item{Table: "evals"})
watcher.Add(watch.Item{Table: "allocs"})

jobs := make(map[string]string, len(evals))
for _, eval := range evals {
existing, err := txn.First("evals", "id", eval)
if err != nil {
Expand All @@ -583,6 +604,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
return fmt.Errorf("eval delete failed: %v", err)
}
watcher.Add(watch.Item{Eval: eval})
jobs[existing.(*structs.Evaluation).JobID] = ""
}

for _, alloc := range allocs {
Expand All @@ -601,6 +623,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
watcher.Add(watch.Item{AllocEval: realAlloc.EvalID})
watcher.Add(watch.Item{AllocJob: realAlloc.JobID})
watcher.Add(watch.Item{AllocNode: realAlloc.NodeID})
jobs[realAlloc.JobID] = ""
}

// Update the indexes
Expand All @@ -611,6 +634,11 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
return fmt.Errorf("index update failed: %v", err)
}

// Set the job's status
if err := s.setJobStatuses(watcher, txn, jobs, true); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
Expand Down Expand Up @@ -726,6 +754,16 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati
return fmt.Errorf("index update failed: %v", err)
}

// Set the job's status
forceStatus := ""
if !copyAlloc.TerminalStatus() {
forceStatus = structs.JobStatusRunning
}
jobs := map[string]string{alloc.JobID: forceStatus}
if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
Expand All @@ -741,6 +779,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
watcher.Add(watch.Item{Table: "allocs"})

// Handle the allocations
jobs := make(map[string]string, len(allocs))
for _, alloc := range allocs {
existing, err := txn.First("allocs", "id", alloc.ID)
if err != nil {
Expand All @@ -761,6 +800,12 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
return fmt.Errorf("alloc insert failed: %v", err)
}

forceStatus := ""
if !alloc.TerminalStatus() {
forceStatus = structs.JobStatusRunning
}
jobs[alloc.JobID] = forceStatus

watcher.Add(watch.Item{Alloc: alloc.ID})
watcher.Add(watch.Item{AllocEval: alloc.EvalID})
watcher.Add(watch.Item{AllocJob: alloc.JobID})
Expand All @@ -772,6 +817,11 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
return fmt.Errorf("index update failed: %v", err)
}

// Set the job's status
if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
Expand Down Expand Up @@ -906,6 +956,94 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) {
return iter, nil
}

// setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID.
// It takes a map of job IDs to an optional forceStatus string. It returns an
// error if the job doesn't exist or setJobStatus fails.
func (s *StateStore) setJobStatuses(watcher watch.Items, txn *memdb.Txn,
jobs map[string]string, evalDelete bool) error {
for job, forceStatus := range jobs {
existing, err := txn.First("jobs", "id", job)
if err != nil {
return fmt.Errorf("job lookup failed: %v", err)
}

if existing == nil {
continue
}

if err := s.setJobStatus(watcher, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil {
return err
}
}

return nil
}

// setJobStatus sets the status of the job by looking up associated evaluations
// and allocations. evalDelete should be set to true if setJobStatus is being
// called because an evaluation is being deleted (potentially because of garbage
// collection). If forceStatus is non-empty, the job's status will be set to the
// passed status.
func (s *StateStore) setJobStatus(watcher watch.Items, txn *memdb.Txn,
job *structs.Job, evalDelete bool, forceStatus string) error {

watcher.Add(watch.Item{Table: "jobs"})
watcher.Add(watch.Item{Job: job.ID})

// If forceStatus is set, immediately set the job's status
if forceStatus != "" {
job.Status = forceStatus
return nil
}

allocs, err := txn.Get("allocs", "job", job.ID)
if err != nil {
return err
}

// If there is a non-terminal allocation, the job is running.
hasAlloc := false
for alloc := allocs.Next(); alloc != nil; alloc = allocs.Next() {
hasAlloc = true
if !alloc.(*structs.Allocation).TerminalStatus() {
job.Status = structs.JobStatusRunning
return nil
}
}

evals, err := txn.Get("evals", "job", job.ID)
if err != nil {
return err
}

hasEval := false
for eval := evals.Next(); eval != nil; eval = evals.Next() {
hasEval = true
if !eval.(*structs.Evaluation).TerminalStatus() {
job.Status = structs.JobStatusPending
return nil
}
}

// The job is dead if all the allocations and evals are terminal or if there
// are no evals because of garbage collection.
if evalDelete || hasEval || hasAlloc {
job.Status = structs.JobStatusDead
return nil
}

// If there are no allocations or evaluations it is a new job. If the job is
// periodic, we mark it as running as it will never have an
// allocation/evaluation against it.
if job.IsPeriodic() {
job.Status = structs.JobStatusRunning
} else {
job.Status = structs.JobStatusPending
}

return nil
}

// StateSnapshot is used to provide a point-in-time snapshot
type StateSnapshot struct {
StateStore
Expand Down
155 changes: 155 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1748,6 +1748,161 @@ func TestStateStore_RestoreAlloc(t *testing.T) {
notify.verify(t)
}

func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) {
// Create a mock job.
job := mock.Job()
job.Status = ""

state := testStateStore(t)
watcher := watch.NewItems()
txn := state.db.Txn(false)
exp := "foobar"
if err := state.setJobStatus(watcher, txn, job, false, exp); err != nil {
t.Fatalf("setJobStatus() failed: %v", err)
}

if job.Status != exp {
t.Fatalf("setJobStatus() set %v; expected %v", job.Status, exp)
}
}

func TestStateStore_SetJobStatus_NoEvalsOrAllocs(t *testing.T) {
// Create a mock job.
job := mock.Job()
job.Status = ""

state := testStateStore(t)
watcher := watch.NewItems()
txn := state.db.Txn(false)
if err := state.setJobStatus(watcher, txn, job, false, ""); err != nil {
t.Fatalf("setJobStatus() failed: %v", err)
}

if job.Status != structs.JobStatusPending {
t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusPending)
}
}

func TestStateStore_SetJobStatus_NoEvalsOrAllocs_Periodic(t *testing.T) {
// Create a mock job.
job := mock.PeriodicJob()
job.Status = ""

state := testStateStore(t)
watcher := watch.NewItems()
txn := state.db.Txn(false)
if err := state.setJobStatus(watcher, txn, job, false, ""); err != nil {
t.Fatalf("setJobStatus() failed: %v", err)
}

if job.Status != structs.JobStatusRunning {
t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusRunning)
}
}

func TestStateStore_SetJobStatus_NoEvalsOrAllocs_EvalDelete(t *testing.T) {
// Create a mock job.
job := mock.Job()
job.Status = ""

state := testStateStore(t)
watcher := watch.NewItems()
txn := state.db.Txn(false)
if err := state.setJobStatus(watcher, txn, job, true, ""); err != nil {
t.Fatalf("setJobStatus() failed: %v", err)
}

if job.Status != structs.JobStatusDead {
t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusDead)
}
}

func TestStateStore_SetJobStatus_DeadEvalsAndAllocs(t *testing.T) {
state := testStateStore(t)

// Create a mock job.
job := mock.Job()
job.Status = ""

// Create a mock alloc that is dead.
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}

// Create a mock eval that is complete
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = structs.EvalStatusComplete
if err := state.UpsertEvals(1001, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}

watcher := watch.NewItems()
txn := state.db.Txn(false)
if err := state.setJobStatus(watcher, txn, job, false, ""); err != nil {
t.Fatalf("setJobStatus() failed: %v", err)
}

if job.Status != structs.JobStatusDead {
t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusDead)
}
}

func TestStateStore_SetJobStatus_RunningAlloc(t *testing.T) {
state := testStateStore(t)

// Create a mock job.
job := mock.Job()
job.Status = ""

// Create a mock alloc that is running.
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.DesiredStatus = structs.AllocDesiredStatusRun
if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}

watcher := watch.NewItems()
txn := state.db.Txn(false)
if err := state.setJobStatus(watcher, txn, job, true, ""); err != nil {
t.Fatalf("setJobStatus() failed: %v", err)
}

if job.Status != structs.JobStatusRunning {
t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusRunning)
}
}

func TestStateStore_SetJobStatus_PendingEval(t *testing.T) {
state := testStateStore(t)

// Create a mock job.
job := mock.Job()
job.Status = ""

// Create a mock eval that is pending.
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = structs.EvalStatusPending
if err := state.UpsertEvals(1000, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}

watcher := watch.NewItems()
txn := state.db.Txn(false)
if err := state.setJobStatus(watcher, txn, job, true, ""); err != nil {
t.Fatalf("setJobStatus() failed: %v", err)
}

if job.Status != structs.JobStatusPending {
t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusPending)
}
}

func TestStateWatch_watch(t *testing.T) {
sw := newStateWatch()
notify1 := make(chan struct{}, 1)
Expand Down

0 comments on commit 894b3e3

Please sign in to comment.