From 875bf476879c5d6380239de5a539c18aac16e848 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 11 Jan 2016 17:34:25 -0800 Subject: [PATCH] Address comments --- nomad/state/state_store.go | 94 +++++++++++------ nomad/state/state_store_test.go | 177 +++++++++++++++++++++----------- 2 files changed, 179 insertions(+), 92 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index c0848559660..338fab7fea5 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -292,26 +292,29 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { } // Setup the indexes correctly - forceStatus := "" if existing != nil { job.CreateIndex = existing.(*structs.Job).CreateIndex job.ModifyIndex = index + + // Compute the job status + var err error + job.Status, err = s.getJobStatus(txn, job, false) + if err != nil { + return fmt.Errorf("setting job status for %q failed: %v", job.ID, err) + } } else { job.CreateIndex = index job.ModifyIndex = index + // If we are inserting the job for the first time, we don't need to + // calculate the jobs status as it is known. if job.IsPeriodic() { - forceStatus = structs.JobStatusRunning + job.Status = structs.JobStatusRunning } else { - forceStatus = structs.JobStatusPending + job.Status = structs.JobStatusPending } } - // Set the job's status - if err := s.setJobStatus(watcher, txn, job, false, forceStatus); err != nil { - return fmt.Errorf("setting job status failed: %v", err) - } - // Insert the job if err := txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) @@ -547,7 +550,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro } // Set the job's status - if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil { + if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil { return fmt.Errorf("setting job status failed: %v", err) } @@ -623,7 +626,6 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e watcher.Add(watch.Item{AllocEval: realAlloc.EvalID}) watcher.Add(watch.Item{AllocJob: realAlloc.JobID}) watcher.Add(watch.Item{AllocNode: realAlloc.NodeID}) - jobs[realAlloc.JobID] = "" } // Update the indexes @@ -635,7 +637,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e } // Set the job's status - if err := s.setJobStatuses(watcher, txn, jobs, true); err != nil { + if err := s.setJobStatuses(index, watcher, txn, jobs, true); err != nil { return fmt.Errorf("setting job status failed: %v", err) } @@ -760,7 +762,7 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati forceStatus = structs.JobStatusRunning } jobs := map[string]string{alloc.JobID: forceStatus} - if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil { + if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil { return fmt.Errorf("setting job status failed: %v", err) } @@ -779,7 +781,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er watcher.Add(watch.Item{Table: "allocs"}) // Handle the allocations - jobs := make(map[string]string, len(allocs)) + jobs := make(map[string]string, 1) for _, alloc := range allocs { existing, err := txn.First("allocs", "id", alloc.ID) if err != nil { @@ -800,6 +802,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er return fmt.Errorf("alloc insert failed: %v", err) } + // If the allocation is running, force the job to running status. forceStatus := "" if !alloc.TerminalStatus() { forceStatus = structs.JobStatusRunning @@ -818,7 +821,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er } // Set the job's status - if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil { + if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil { return fmt.Errorf("setting job status failed: %v", err) } @@ -959,7 +962,7 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) { // setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID. // It takes a map of job IDs to an optional forceStatus string. It returns an // error if the job doesn't exist or setJobStatus fails. -func (s *StateStore) setJobStatuses(watcher watch.Items, txn *memdb.Txn, +func (s *StateStore) setJobStatuses(index uint64, watcher watch.Items, txn *memdb.Txn, jobs map[string]string, evalDelete bool) error { for job, forceStatus := range jobs { existing, err := txn.First("jobs", "id", job) @@ -971,7 +974,7 @@ func (s *StateStore) setJobStatuses(watcher watch.Items, txn *memdb.Txn, continue } - if err := s.setJobStatus(watcher, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil { + if err := s.setJobStatus(index, watcher, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil { return err } } @@ -984,21 +987,50 @@ func (s *StateStore) setJobStatuses(watcher watch.Items, txn *memdb.Txn, // called because an evaluation is being deleted (potentially because of garbage // collection). If forceStatus is non-empty, the job's status will be set to the // passed status. -func (s *StateStore) setJobStatus(watcher watch.Items, txn *memdb.Txn, +func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb.Txn, job *structs.Job, evalDelete bool, forceStatus string) error { + // Capture the current status so we can check if there is a change + oldStatus := job.Status + newStatus := forceStatus + + // If forceStatus is not set, compute the jobs status. + if forceStatus == "" { + var err error + newStatus, err = s.getJobStatus(txn, job, evalDelete) + if err != nil { + return err + } + } + + // Fast-path if nothing has changed. + if oldStatus == newStatus { + return nil + } + + // The job has changed, so add to watcher. watcher.Add(watch.Item{Table: "jobs"}) watcher.Add(watch.Item{Job: job.ID}) - // If forceStatus is set, immediately set the job's status - if forceStatus != "" { - job.Status = forceStatus - return nil + // Copy and update the existing job + updated := job.Copy() + updated.Status = newStatus + updated.ModifyIndex = index + + // Insert the job + if err := txn.Insert("jobs", updated); err != nil { + return fmt.Errorf("job insert failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) } + return nil +} +func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete bool) (string, error) { allocs, err := txn.Get("allocs", "job", job.ID) if err != nil { - return err + return "", err } // If there is a non-terminal allocation, the job is running. @@ -1006,42 +1038,36 @@ func (s *StateStore) setJobStatus(watcher watch.Items, txn *memdb.Txn, for alloc := allocs.Next(); alloc != nil; alloc = allocs.Next() { hasAlloc = true if !alloc.(*structs.Allocation).TerminalStatus() { - job.Status = structs.JobStatusRunning - return nil + return structs.JobStatusRunning, nil } } evals, err := txn.Get("evals", "job", job.ID) if err != nil { - return err + return "", err } hasEval := false for eval := evals.Next(); eval != nil; eval = evals.Next() { hasEval = true if !eval.(*structs.Evaluation).TerminalStatus() { - job.Status = structs.JobStatusPending - return nil + return structs.JobStatusPending, nil } } // The job is dead if all the allocations and evals are terminal or if there // are no evals because of garbage collection. if evalDelete || hasEval || hasAlloc { - job.Status = structs.JobStatusDead - return nil + return structs.JobStatusDead, nil } // If there are no allocations or evaluations it is a new job. If the job is // periodic, we mark it as running as it will never have an // allocation/evaluation against it. if job.IsPeriodic() { - job.Status = structs.JobStatusRunning - } else { - job.Status = structs.JobStatusPending + return structs.JobStatusRunning, nil } - - return nil + return structs.JobStatusPending, nil } // StateSnapshot is used to provide a point-in-time snapshot diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 1aec05395ef..ff7b690f8a4 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1749,80 +1749,147 @@ func TestStateStore_RestoreAlloc(t *testing.T) { } func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) { - // Create a mock job. + state := testStateStore(t) + watcher := watch.NewItems() + txn := state.db.Txn(true) + + // Create and insert a mock job. job := mock.Job() job.Status = "" + job.ModifyIndex = 0 + if err := txn.Insert("jobs", job); err != nil { + t.Fatalf("job insert failed: %v", err) + } - state := testStateStore(t) - watcher := watch.NewItems() - txn := state.db.Txn(false) exp := "foobar" - if err := state.setJobStatus(watcher, txn, job, false, exp); err != nil { + index := uint64(1000) + if err := state.setJobStatus(index, watcher, txn, job, false, exp); err != nil { t.Fatalf("setJobStatus() failed: %v", err) } - if job.Status != exp { - t.Fatalf("setJobStatus() set %v; expected %v", job.Status, exp) + i, err := txn.First("jobs", "id", job.ID) + if err != nil { + t.Fatalf("job lookup failed: %v", err) } -} + updated := i.(*structs.Job) -func TestStateStore_SetJobStatus_NoEvalsOrAllocs(t *testing.T) { - // Create a mock job. - job := mock.Job() - job.Status = "" + if updated.Status != exp { + t.Fatalf("setJobStatus() set %v; expected %v", updated.Status, exp) + } + if updated.ModifyIndex != index { + t.Fatalf("setJobStatus() set %d; expected %d", updated.ModifyIndex, index) + } +} + +func TestStateStore_SetJobStatus_NoOp(t *testing.T) { state := testStateStore(t) watcher := watch.NewItems() - txn := state.db.Txn(false) - if err := state.setJobStatus(watcher, txn, job, false, ""); err != nil { + txn := state.db.Txn(true) + + // Create and insert a mock job that should be pending. + job := mock.Job() + job.Status = structs.JobStatusPending + job.ModifyIndex = 10 + if err := txn.Insert("jobs", job); err != nil { + t.Fatalf("job insert failed: %v", err) + } + + index := uint64(1000) + if err := state.setJobStatus(index, watcher, txn, job, false, ""); err != nil { t.Fatalf("setJobStatus() failed: %v", err) } - if job.Status != structs.JobStatusPending { - t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusPending) + i, err := txn.First("jobs", "id", job.ID) + if err != nil { + t.Fatalf("job lookup failed: %v", err) } -} + updated := i.(*structs.Job) -func TestStateStore_SetJobStatus_NoEvalsOrAllocs_Periodic(t *testing.T) { - // Create a mock job. - job := mock.PeriodicJob() - job.Status = "" + if updated.ModifyIndex == index { + t.Fatalf("setJobStatus() should have been a no-op") + } +} +func TestStateStore_SetJobStatus(t *testing.T) { state := testStateStore(t) watcher := watch.NewItems() - txn := state.db.Txn(false) - if err := state.setJobStatus(watcher, txn, job, false, ""); err != nil { + txn := state.db.Txn(true) + + // Create and insert a mock job that should be pending but has an incorrect + // status. + job := mock.Job() + job.Status = "foobar" + job.ModifyIndex = 10 + if err := txn.Insert("jobs", job); err != nil { + t.Fatalf("job insert failed: %v", err) + } + + index := uint64(1000) + if err := state.setJobStatus(index, watcher, txn, job, false, ""); err != nil { t.Fatalf("setJobStatus() failed: %v", err) } - if job.Status != structs.JobStatusRunning { - t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusRunning) + i, err := txn.First("jobs", "id", job.ID) + if err != nil { + t.Fatalf("job lookup failed: %v", err) + } + updated := i.(*structs.Job) + + if updated.Status != structs.JobStatusPending { + t.Fatalf("setJobStatus() set %v; expected %v", updated.Status, structs.JobStatusPending) + } + + if updated.ModifyIndex != index { + t.Fatalf("setJobStatus() set %d; expected %d", updated.ModifyIndex, index) } } -func TestStateStore_SetJobStatus_NoEvalsOrAllocs_EvalDelete(t *testing.T) { - // Create a mock job. +func TestStateStore_GetJobStatus_NoEvalsOrAllocs(t *testing.T) { job := mock.Job() - job.Status = "" + state := testStateStore(t) + txn := state.db.Txn(false) + status, err := state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusPending { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusPending) + } +} +func TestStateStore_GetJobStatus_NoEvalsOrAllocs_Periodic(t *testing.T) { + job := mock.PeriodicJob() state := testStateStore(t) - watcher := watch.NewItems() txn := state.db.Txn(false) - if err := state.setJobStatus(watcher, txn, job, true, ""); err != nil { - t.Fatalf("setJobStatus() failed: %v", err) + status, err := state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) } - if job.Status != structs.JobStatusDead { - t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusDead) + if status != structs.JobStatusRunning { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning) } } -func TestStateStore_SetJobStatus_DeadEvalsAndAllocs(t *testing.T) { +func TestStateStore_GetJobStatus_NoEvalsOrAllocs_EvalDelete(t *testing.T) { + job := mock.Job() state := testStateStore(t) + txn := state.db.Txn(false) + status, err := state.getJobStatus(txn, job, true) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } - // Create a mock job. + if status != structs.JobStatusDead { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead) + } +} + +func TestStateStore_GetJobStatus_DeadEvalsAndAllocs(t *testing.T) { + state := testStateStore(t) job := mock.Job() - job.Status = "" // Create a mock alloc that is dead. alloc := mock.Alloc() @@ -1840,23 +1907,20 @@ func TestStateStore_SetJobStatus_DeadEvalsAndAllocs(t *testing.T) { t.Fatalf("err: %v", err) } - watcher := watch.NewItems() txn := state.db.Txn(false) - if err := state.setJobStatus(watcher, txn, job, false, ""); err != nil { - t.Fatalf("setJobStatus() failed: %v", err) + status, err := state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) } - if job.Status != structs.JobStatusDead { - t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusDead) + if status != structs.JobStatusDead { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead) } } -func TestStateStore_SetJobStatus_RunningAlloc(t *testing.T) { +func TestStateStore_GetJobStatus_RunningAlloc(t *testing.T) { state := testStateStore(t) - - // Create a mock job. job := mock.Job() - job.Status = "" // Create a mock alloc that is running. alloc := mock.Alloc() @@ -1866,23 +1930,20 @@ func TestStateStore_SetJobStatus_RunningAlloc(t *testing.T) { t.Fatalf("err: %v", err) } - watcher := watch.NewItems() txn := state.db.Txn(false) - if err := state.setJobStatus(watcher, txn, job, true, ""); err != nil { - t.Fatalf("setJobStatus() failed: %v", err) + status, err := state.getJobStatus(txn, job, true) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) } - if job.Status != structs.JobStatusRunning { - t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusRunning) + if status != structs.JobStatusRunning { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning) } } func TestStateStore_SetJobStatus_PendingEval(t *testing.T) { state := testStateStore(t) - - // Create a mock job. job := mock.Job() - job.Status = "" // Create a mock eval that is pending. eval := mock.Eval() @@ -1892,14 +1953,14 @@ func TestStateStore_SetJobStatus_PendingEval(t *testing.T) { t.Fatalf("err: %v", err) } - watcher := watch.NewItems() txn := state.db.Txn(false) - if err := state.setJobStatus(watcher, txn, job, true, ""); err != nil { - t.Fatalf("setJobStatus() failed: %v", err) + status, err := state.getJobStatus(txn, job, true) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) } - if job.Status != structs.JobStatusPending { - t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusPending) + if status != structs.JobStatusPending { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusPending) } }