Skip to content

Commit

Permalink
Updated some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanu committed Jul 26, 2016
1 parent d1da78a commit 2892224
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 82 deletions.
1 change: 1 addition & 0 deletions nomad/plan_apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func TestPlanApply_applyPlan(t *testing.T) {
allocEvict.Job = nil
alloc2 := mock.Alloc()
alloc2.Job = nil
s1.State().UpsertJobSummary(1500, mock.JobSummary(alloc2.JobID))
plan = &structs.PlanResult{
NodeUpdate: map[string][]*structs.Allocation{
node.ID: []*structs.Allocation{allocEvict},
Expand Down
145 changes: 70 additions & 75 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error
if alloc.ClientStatus == structs.AllocClientStatusPending ||
alloc.ClientStatus == structs.AllocClientStatusRunning {
copyAlloc.ClientStatus = structs.AllocClientStatusLost
if err := s.updateSummaryWithAlloc(index, copyAlloc, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}
if err := txn.Insert("allocs", copyAlloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}
Expand Down Expand Up @@ -852,12 +855,11 @@ func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allo
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "allocs"})

if err := s.updateSummaryWithAlloc(index, allocs, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}

// Handle each of the updated allocations
for _, alloc := range allocs {
if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}
if err := s.nestedUpdateAllocFromClient(txn, watcher, index, alloc); err != nil {
return err
}
Expand Down Expand Up @@ -931,13 +933,12 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "allocs"})

if err := s.updateSummaryWithAlloc(index, allocs, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}

// Handle the allocations
jobs := make(map[string]string, 1)
for _, alloc := range allocs {
if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}
existing, err := txn.First("allocs", "id", alloc.ID)
if err != nil {
return fmt.Errorf("alloc lookup failed: %v", err)
Expand Down Expand Up @@ -1327,93 +1328,87 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,

// updateSummaryWithAlloc updates the job summary when allocations are updated
// or inserted
func (s *StateStore) updateSummaryWithAlloc(index uint64, allocs []*structs.Allocation,
func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocation,
watcher watch.Items, txn *memdb.Txn) error {
if len(allocs) == 0 {
return nil
}

jobID := allocs[0].JobID
jobSummary, err := s.JobSummaryByID(jobID)
summaryRaw, err := txn.First("job_summary", "id", alloc.JobID)
if err != nil {
return fmt.Errorf("unable to look up job summary: %v", err)
return fmt.Errorf("unable to lookup job summary for job id %q: %v", err)
}
if jobSummary == nil {
return fmt.Errorf("job summary not found")
jobSummary, ok := summaryRaw.(structs.JobSummary)
if !ok {
return fmt.Errorf("job summary for job %q is not present", alloc.JobID)
}
currentJSModifyIndex := jobSummary.ModifyIndex

for _, alloc := range allocs {
// Look for existing alloc
existing, err := s.AllocByID(alloc.ID)
if err != nil {
return fmt.Errorf("alloc lookup failed: %v", err)
}
currentJSModifyIndex := jobSummary.ModifyIndex
// Look for existing alloc
existing, err := s.AllocByID(alloc.ID)
if err != nil {
return fmt.Errorf("alloc lookup failed: %v", err)
}

tgSummary, ok := jobSummary.Summary[alloc.TaskGroup]
if !ok {
return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup)
tgSummary, ok := jobSummary.Summary[alloc.TaskGroup]
if !ok {
return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup)
}
if existing == nil {
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v",
alloc.ID, alloc.DesiredStatus)
}
if existing == nil {
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v",
alloc.ID, alloc.DesiredStatus)
}
switch alloc.ClientStatus {
case structs.AllocClientStatusPending:
tgSummary.Starting += 1
if tgSummary.Queued > 0 {
tgSummary.Queued -= 1
}
jobSummary.ModifyIndex = index
case structs.AllocClientStatusRunning, structs.AllocClientStatusFailed,
structs.AllocClientStatusComplete:
s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v",
alloc.ID, alloc.ClientStatus)
}
} else if existing.ClientStatus != alloc.ClientStatus {
// Incrementing the client of the bin of the current state
switch alloc.ClientStatus {
case structs.AllocClientStatusRunning:
tgSummary.Running += 1
case structs.AllocClientStatusFailed:
tgSummary.Failed += 1
case structs.AllocClientStatusPending:
tgSummary.Starting += 1
case structs.AllocClientStatusComplete:
tgSummary.Complete += 1
case structs.AllocClientStatusLost:
tgSummary.Lost += 1
}

// Decrementing the count of the bin of the last state
switch existing.ClientStatus {
case structs.AllocClientStatusRunning:
tgSummary.Running -= 1
case structs.AllocClientStatusPending:
tgSummary.Starting -= 1
case structs.AllocClientStatusLost:
tgSummary.Lost -= 1
case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete:
s.logger.Printf("[ERR] state_store: invalid old state of allocation with id:%v, and state: %v",
existing.ID, existing.ClientStatus)
switch alloc.ClientStatus {
case structs.AllocClientStatusPending:
tgSummary.Starting += 1
if tgSummary.Queued > 0 {
tgSummary.Queued -= 1
}
jobSummary.ModifyIndex = index
case structs.AllocClientStatusRunning, structs.AllocClientStatusFailed,
structs.AllocClientStatusComplete:
s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v",
alloc.ID, alloc.ClientStatus)
}
} else if existing.ClientStatus != alloc.ClientStatus {
// Incrementing the client of the bin of the current state
switch alloc.ClientStatus {
case structs.AllocClientStatusRunning:
tgSummary.Running += 1
case structs.AllocClientStatusFailed:
tgSummary.Failed += 1
case structs.AllocClientStatusPending:
tgSummary.Starting += 1
case structs.AllocClientStatusComplete:
tgSummary.Complete += 1
case structs.AllocClientStatusLost:
tgSummary.Lost += 1
}

// Decrementing the count of the bin of the last state
switch existing.ClientStatus {
case structs.AllocClientStatusRunning:
tgSummary.Running -= 1
case structs.AllocClientStatusPending:
tgSummary.Starting -= 1
case structs.AllocClientStatusLost:
tgSummary.Lost -= 1
case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete:
s.logger.Printf("[ERR] state_store: invalid old state of allocation with id:%v, and state: %v",
existing.ID, existing.ClientStatus)
}
jobSummary.Summary[alloc.TaskGroup] = tgSummary
jobSummary.ModifyIndex = index
}
jobSummary.Summary[alloc.TaskGroup] = tgSummary

if currentJSModifyIndex < jobSummary.ModifyIndex {
watcher.Add(watch.Item{Table: "job_summary"})
watcher.Add(watch.Item{JobSummary: jobID})
watcher.Add(watch.Item{JobSummary: alloc.JobID})

// Update the indexes table for job summary
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

if err := txn.Insert("job_summary", *jobSummary); err != nil {
if err := txn.Insert("job_summary", jobSummary); err != nil {
return fmt.Errorf("updating job summary failed: %v", err)
}
}
Expand Down
39 changes: 32 additions & 7 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,9 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
alloc.NodeID = node.ID
alloc1.NodeID = node.ID
alloc2.NodeID = node.ID
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc1.ClientStatus = structs.AllocClientStatusFailed
alloc.ClientStatus = structs.AllocClientStatusPending
alloc1.ClientStatus = structs.AllocClientStatusPending
alloc2.ClientStatus = structs.AllocClientStatusPending
if err := state.UpsertJobSummary(990, mock.JobSummary(alloc.JobID)); err != nil {
t.Fatal(err)
}
Expand All @@ -153,12 +154,22 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
if err := state.UpsertJobSummary(990, mock.JobSummary(alloc2.JobID)); err != nil {
t.Fatal(err)
}
alloc2.ClientStatus = structs.AllocClientStatusPending

if err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc1, alloc2}); err != nil {
t.Fatalf("err: %v", err)
}
if err = state.UpdateNodeStatus(1003, node.ID, structs.NodeStatusDown); err != nil {

// Change the state of the allocs to running and failed
newAlloc := new(structs.Allocation)
*newAlloc = *alloc
newAlloc.ClientStatus = structs.AllocClientStatusRunning
newAlloc1 := new(structs.Allocation)
*newAlloc1 = *alloc1
newAlloc1.ClientStatus = structs.AllocClientStatusFailed
if err = state.UpdateAllocsFromClient(1003, []*structs.Allocation{newAlloc, newAlloc1}); err != nil {
t.Fatalf("err: %v", err)
}

if err = state.UpdateNodeStatus(1004, node.ID, structs.NodeStatusDown); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down Expand Up @@ -186,6 +197,20 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
t.Fatalf("expected alloc status: %v, actual: %v", structs.AllocClientStatusLost, alloc2Out.ClientStatus)
}

js1, _ := state.JobSummaryByID(alloc.JobID)
js2, _ := state.JobSummaryByID(alloc1.JobID)
js3, _ := state.JobSummaryByID(alloc2.JobID)

if js1.Summary["web"].Lost != 1 {
t.Fatalf("expected: %v, got: %v", 1, js1.Summary["web"].Lost)
}
if js2.Summary["web"].Failed != 1 {
t.Fatalf("expected: %v, got: %v", 1, js2.Summary["web"].Failed)
}
if js3.Summary["web"].Lost != 1 {
t.Fatalf("expected: %v, got: %v", 1, js3.Summary["web"].Lost)
}

notify.verify(t)
}

Expand Down Expand Up @@ -1718,8 +1743,8 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) {
t.Fatalf("err: %v", err)
}
tgSummary2 := summary2.Summary["web"]
if tgSummary2.Running != 0 {
t.Fatalf("expected running: %v, actual: %v", 0, tgSummary2.Failed)
if tgSummary2.Running != 1 {
t.Fatalf("expected running: %v, actual: %v", 1, tgSummary2.Running)
}

notify.verify(t)
Expand Down

0 comments on commit 2892224

Please sign in to comment.