diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 3e81f701e95..2584556b0b7 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -118,6 +118,7 @@ func TestPlanApply_applyPlan(t *testing.T) { allocEvict.Job = nil alloc2 := mock.Alloc() alloc2.Job = nil + s1.State().UpsertJobSummary(1500, mock.JobSummary(alloc2.JobID)) plan = &structs.PlanResult{ NodeUpdate: map[string][]*structs.Allocation{ node.ID: []*structs.Allocation{allocEvict}, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 779d54be1d1..521519300b7 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -231,6 +231,9 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error if alloc.ClientStatus == structs.AllocClientStatusPending || alloc.ClientStatus == structs.AllocClientStatusRunning { copyAlloc.ClientStatus = structs.AllocClientStatusLost + if err := s.updateSummaryWithAlloc(index, copyAlloc, watcher, txn); err != nil { + return fmt.Errorf("error updating job summary: %v", err) + } if err := txn.Insert("allocs", copyAlloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } @@ -852,12 +855,11 @@ func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allo watcher := watch.NewItems() watcher.Add(watch.Item{Table: "allocs"}) - if err := s.updateSummaryWithAlloc(index, allocs, watcher, txn); err != nil { - return fmt.Errorf("error updating job summary: %v", err) - } - // Handle each of the updated allocations for _, alloc := range allocs { + if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil { + return fmt.Errorf("error updating job summary: %v", err) + } if err := s.nestedUpdateAllocFromClient(txn, watcher, index, alloc); err != nil { return err } @@ -931,13 +933,12 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er watcher := watch.NewItems() watcher.Add(watch.Item{Table: "allocs"}) - if err := s.updateSummaryWithAlloc(index, allocs, watcher, txn); err != nil { - return fmt.Errorf("error updating job summary: %v", err) - } - // Handle the allocations jobs := make(map[string]string, 1) for _, alloc := range allocs { + if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil { + return fmt.Errorf("error updating job summary: %v", err) + } existing, err := txn.First("allocs", "id", alloc.ID) if err != nil { return fmt.Errorf("alloc lookup failed: %v", err) @@ -1327,93 +1328,87 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, // updateSummaryWithAlloc updates the job summary when allocations are updated // or inserted -func (s *StateStore) updateSummaryWithAlloc(index uint64, allocs []*structs.Allocation, +func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocation, watcher watch.Items, txn *memdb.Txn) error { - if len(allocs) == 0 { - return nil - } - - jobID := allocs[0].JobID - jobSummary, err := s.JobSummaryByID(jobID) + summaryRaw, err := txn.First("job_summary", "id", alloc.JobID) if err != nil { - return fmt.Errorf("unable to look up job summary: %v", err) + return fmt.Errorf("unable to lookup job summary for job id %q: %v", err) } - if jobSummary == nil { - return fmt.Errorf("job summary not found") + jobSummary, ok := summaryRaw.(structs.JobSummary) + if !ok { + return fmt.Errorf("job summary for job %q is not present", alloc.JobID) } - currentJSModifyIndex := jobSummary.ModifyIndex - for _, alloc := range allocs { - // Look for existing alloc - existing, err := s.AllocByID(alloc.ID) - if err != nil { - return fmt.Errorf("alloc lookup failed: %v", err) - } + currentJSModifyIndex := jobSummary.ModifyIndex + // Look for existing alloc + existing, err := s.AllocByID(alloc.ID) + if err != nil { + return fmt.Errorf("alloc lookup failed: %v", err) + } - tgSummary, ok := jobSummary.Summary[alloc.TaskGroup] - if !ok { - return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup) + tgSummary, ok := jobSummary.Summary[alloc.TaskGroup] + if !ok { + return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup) + } + if existing == nil { + switch alloc.DesiredStatus { + case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: + s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v", + alloc.ID, alloc.DesiredStatus) } - if existing == nil { - switch alloc.DesiredStatus { - case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: - s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v", - alloc.ID, alloc.DesiredStatus) - } - switch alloc.ClientStatus { - case structs.AllocClientStatusPending: - tgSummary.Starting += 1 - if tgSummary.Queued > 0 { - tgSummary.Queued -= 1 - } - jobSummary.ModifyIndex = index - case structs.AllocClientStatusRunning, structs.AllocClientStatusFailed, - structs.AllocClientStatusComplete: - s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v", - alloc.ID, alloc.ClientStatus) - } - } else if existing.ClientStatus != alloc.ClientStatus { - // Incrementing the client of the bin of the current state - switch alloc.ClientStatus { - case structs.AllocClientStatusRunning: - tgSummary.Running += 1 - case structs.AllocClientStatusFailed: - tgSummary.Failed += 1 - case structs.AllocClientStatusPending: - tgSummary.Starting += 1 - case structs.AllocClientStatusComplete: - tgSummary.Complete += 1 - case structs.AllocClientStatusLost: - tgSummary.Lost += 1 - } - - // Decrementing the count of the bin of the last state - switch existing.ClientStatus { - case structs.AllocClientStatusRunning: - tgSummary.Running -= 1 - case structs.AllocClientStatusPending: - tgSummary.Starting -= 1 - case structs.AllocClientStatusLost: - tgSummary.Lost -= 1 - case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete: - s.logger.Printf("[ERR] state_store: invalid old state of allocation with id:%v, and state: %v", - existing.ID, existing.ClientStatus) + switch alloc.ClientStatus { + case structs.AllocClientStatusPending: + tgSummary.Starting += 1 + if tgSummary.Queued > 0 { + tgSummary.Queued -= 1 } jobSummary.ModifyIndex = index + case structs.AllocClientStatusRunning, structs.AllocClientStatusFailed, + structs.AllocClientStatusComplete: + s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v", + alloc.ID, alloc.ClientStatus) + } + } else if existing.ClientStatus != alloc.ClientStatus { + // Incrementing the client of the bin of the current state + switch alloc.ClientStatus { + case structs.AllocClientStatusRunning: + tgSummary.Running += 1 + case structs.AllocClientStatusFailed: + tgSummary.Failed += 1 + case structs.AllocClientStatusPending: + tgSummary.Starting += 1 + case structs.AllocClientStatusComplete: + tgSummary.Complete += 1 + case structs.AllocClientStatusLost: + tgSummary.Lost += 1 + } + + // Decrementing the count of the bin of the last state + switch existing.ClientStatus { + case structs.AllocClientStatusRunning: + tgSummary.Running -= 1 + case structs.AllocClientStatusPending: + tgSummary.Starting -= 1 + case structs.AllocClientStatusLost: + tgSummary.Lost -= 1 + case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete: + s.logger.Printf("[ERR] state_store: invalid old state of allocation with id:%v, and state: %v", + existing.ID, existing.ClientStatus) } - jobSummary.Summary[alloc.TaskGroup] = tgSummary + jobSummary.ModifyIndex = index } + jobSummary.Summary[alloc.TaskGroup] = tgSummary if currentJSModifyIndex < jobSummary.ModifyIndex { watcher.Add(watch.Item{Table: "job_summary"}) - watcher.Add(watch.Item{JobSummary: jobID}) + watcher.Add(watch.Item{JobSummary: alloc.JobID}) // Update the indexes table for job summary if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } - if err := txn.Insert("job_summary", *jobSummary); err != nil { + if err := txn.Insert("job_summary", jobSummary); err != nil { return fmt.Errorf("updating job summary failed: %v", err) } } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index fe996383d9f..893045a7da4 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -142,8 +142,9 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { alloc.NodeID = node.ID alloc1.NodeID = node.ID alloc2.NodeID = node.ID - alloc.ClientStatus = structs.AllocClientStatusRunning - alloc1.ClientStatus = structs.AllocClientStatusFailed + alloc.ClientStatus = structs.AllocClientStatusPending + alloc1.ClientStatus = structs.AllocClientStatusPending + alloc2.ClientStatus = structs.AllocClientStatusPending if err := state.UpsertJobSummary(990, mock.JobSummary(alloc.JobID)); err != nil { t.Fatal(err) } @@ -153,12 +154,22 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { if err := state.UpsertJobSummary(990, mock.JobSummary(alloc2.JobID)); err != nil { t.Fatal(err) } - alloc2.ClientStatus = structs.AllocClientStatusPending - if err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc1, alloc2}); err != nil { t.Fatalf("err: %v", err) } - if err = state.UpdateNodeStatus(1003, node.ID, structs.NodeStatusDown); err != nil { + + // Change the state of the allocs to running and failed + newAlloc := new(structs.Allocation) + *newAlloc = *alloc + newAlloc.ClientStatus = structs.AllocClientStatusRunning + newAlloc1 := new(structs.Allocation) + *newAlloc1 = *alloc1 + newAlloc1.ClientStatus = structs.AllocClientStatusFailed + if err = state.UpdateAllocsFromClient(1003, []*structs.Allocation{newAlloc, newAlloc1}); err != nil { + t.Fatalf("err: %v", err) + } + + if err = state.UpdateNodeStatus(1004, node.ID, structs.NodeStatusDown); err != nil { t.Fatalf("err: %v", err) } @@ -186,6 +197,20 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { t.Fatalf("expected alloc status: %v, actual: %v", structs.AllocClientStatusLost, alloc2Out.ClientStatus) } + js1, _ := state.JobSummaryByID(alloc.JobID) + js2, _ := state.JobSummaryByID(alloc1.JobID) + js3, _ := state.JobSummaryByID(alloc2.JobID) + + if js1.Summary["web"].Lost != 1 { + t.Fatalf("expected: %v, got: %v", 1, js1.Summary["web"].Lost) + } + if js2.Summary["web"].Failed != 1 { + t.Fatalf("expected: %v, got: %v", 1, js2.Summary["web"].Failed) + } + if js3.Summary["web"].Lost != 1 { + t.Fatalf("expected: %v, got: %v", 1, js3.Summary["web"].Lost) + } + notify.verify(t) } @@ -1718,8 +1743,8 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) { t.Fatalf("err: %v", err) } tgSummary2 := summary2.Summary["web"] - if tgSummary2.Running != 0 { - t.Fatalf("expected running: %v, actual: %v", 0, tgSummary2.Failed) + if tgSummary2.Running != 1 { + t.Fatalf("expected running: %v, actual: %v", 1, tgSummary2.Running) } notify.verify(t)