Skip to content

Commit

Permalink
Merge pull request #5205 from hashicorp/b-queuedallocs-bugfix
Browse files Browse the repository at this point in the history
Reconcile child summaries correctly
  • Loading branch information
Preetha authored Jan 19, 2019
2 parents 9f76193 + b403c9b commit dbb2e26
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 1 deletion.
1 change: 0 additions & 1 deletion dev/cluster/client1.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ name = "client1"
# Enable the client
client {
enabled = true

server_join {
retry_join = ["127.0.0.1:4647", "127.0.0.1:5647", "127.0.0.1:6647"]
}
Expand Down
5 changes: 5 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,11 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
break
}
job := rawJob.(*structs.Job)

// Nothing to do for queued allocations if the job is a parent periodic/parameterized job
if job.IsParameterized() || job.IsPeriodic() {
continue
}
planner := &scheduler.Harness{
State: &snap.StateStore,
}
Expand Down
72 changes: 72 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2816,6 +2816,78 @@ func TestFSM_ReconcileSummaries(t *testing.T) {
}
}

// COMPAT: Remove in 0.11
func TestFSM_ReconcileParentJobSummary(t *testing.T) {
// This test exercises code to handle https://github.com/hashicorp/nomad/issues/3886
t.Parallel()

require := require.New(t)
// Add some state
fsm := testFSM(t)
state := fsm.State()

// Add a node
node := mock.Node()
state.UpsertNode(800, node)

// Make a parameterized job
job1 := mock.BatchJob()
job1.ID = "test"
job1.ParameterizedJob = &structs.ParameterizedJobConfig{
Payload: "random",
}
job1.TaskGroups[0].Count = 1
state.UpsertJob(1000, job1)

// Make a child job
childJob := job1.Copy()
childJob.ID = job1.ID + "dispatch-23423423"
childJob.ParentID = job1.ID
childJob.Dispatched = true
childJob.Status = structs.JobStatusRunning

// Create an alloc for child job
alloc := mock.Alloc()
alloc.NodeID = node.ID
alloc.Job = childJob
alloc.JobID = childJob.ID
alloc.ClientStatus = structs.AllocClientStatusRunning

state.UpsertJob(1010, childJob)
state.UpsertAllocs(1011, []*structs.Allocation{alloc})

// Make the summary incorrect in the state store
summary, err := state.JobSummaryByID(nil, job1.Namespace, job1.ID)
require.Nil(err)

summary.Children = nil
summary.Summary = make(map[string]structs.TaskGroupSummary)
summary.Summary["web"] = structs.TaskGroupSummary{
Queued: 1,
}

req := structs.GenericRequest{}
buf, err := structs.Encode(structs.ReconcileJobSummariesRequestType, req)
require.Nil(err)

resp := fsm.Apply(makeLog(buf))
require.Nil(resp)

ws := memdb.NewWatchSet()
out1, _ := state.JobSummaryByID(ws, job1.Namespace, job1.ID)
expected := structs.JobSummary{
JobID: job1.ID,
Namespace: job1.Namespace,
Summary: make(map[string]structs.TaskGroupSummary),
CreateIndex: 1000,
ModifyIndex: out1.ModifyIndex,
Children: &structs.JobChildrenSummary{
Running: 1,
},
}
require.Equal(&expected, out1)
}

func TestFSM_LeakedDeployments(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down
76 changes: 76 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sort"
"time"

"reflect"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -3049,12 +3051,86 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error {
if err != nil {
return err
}
// COMPAT: Remove after 0.11
// Iterate over jobs to build a list of parent jobs and their children
parentMap := make(map[string][]*structs.Job)
for {
rawJob := iter.Next()
if rawJob == nil {
break
}
job := rawJob.(*structs.Job)
if job.ParentID != "" {
children := parentMap[job.ParentID]
children = append(children, job)
parentMap[job.ParentID] = children
}
}

// Get all the jobs again
iter, err = txn.Get("jobs", "id")
if err != nil {
return err
}

for {
rawJob := iter.Next()
if rawJob == nil {
break
}
job := rawJob.(*structs.Job)

if job.IsParameterized() || job.IsPeriodic() {
// COMPAT: Remove after 0.11

// The following block of code fixes incorrect child summaries due to a bug
// See https://github.com/hashicorp/nomad/issues/3886 for details
rawSummary, err := txn.First("job_summary", "id", job.Namespace, job.ID)
if err != nil {
return err
}
if rawSummary == nil {
continue
}

oldSummary := rawSummary.(*structs.JobSummary)

// Create an empty summary
summary := &structs.JobSummary{
JobID: job.ID,
Namespace: job.Namespace,
Summary: make(map[string]structs.TaskGroupSummary),
Children: &structs.JobChildrenSummary{},
}

// Iterate over children of this job if any to fix summary counts
children := parentMap[job.ID]
for _, childJob := range children {
switch childJob.Status {
case structs.JobStatusPending:
summary.Children.Pending++
case structs.JobStatusDead:
summary.Children.Dead++
case structs.JobStatusRunning:
summary.Children.Running++
}
}

// Insert the job summary if its different
if !reflect.DeepEqual(summary, oldSummary) {
// Set the create index of the summary same as the job's create index
// and the modify index to the current index
summary.CreateIndex = job.CreateIndex
summary.ModifyIndex = index

if err := txn.Insert("job_summary", summary); err != nil {
return fmt.Errorf("error inserting job summary: %v", err)
}
}

// Done with handling a parent job, continue to next
continue
}

// Create a job summary for the job
summary := &structs.JobSummary{
Expand Down
89 changes: 89 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4354,6 +4354,95 @@ func TestStateStore_ReconcileJobSummary(t *testing.T) {
}
}

func TestStateStore_ReconcileParentJobSummary(t *testing.T) {
t.Parallel()
require := require.New(t)
state := testStateStore(t)

// Add a node
node := mock.Node()
state.UpsertNode(80, node)

// Make a parameterized job
job1 := mock.BatchJob()
job1.ID = "test"
job1.ParameterizedJob = &structs.ParameterizedJobConfig{
Payload: "random",
}
job1.TaskGroups[0].Count = 1
state.UpsertJob(100, job1)

// Make a child job
childJob := job1.Copy()
childJob.ID = job1.ID + "dispatch-23423423"
childJob.ParentID = job1.ID
childJob.Dispatched = true
childJob.Status = structs.JobStatusRunning

// Make some allocs for child job
alloc := mock.Alloc()
alloc.NodeID = node.ID
alloc.Job = childJob
alloc.JobID = childJob.ID
alloc.ClientStatus = structs.AllocClientStatusRunning

alloc2 := mock.Alloc()
alloc2.NodeID = node.ID
alloc2.Job = childJob
alloc2.JobID = childJob.ID
alloc2.ClientStatus = structs.AllocClientStatusFailed

require.Nil(state.UpsertJob(110, childJob))
require.Nil(state.UpsertAllocs(111, []*structs.Allocation{alloc, alloc2}))

// Make the summary incorrect in the state store
summary, err := state.JobSummaryByID(nil, job1.Namespace, job1.ID)
require.Nil(err)

summary.Children = nil
summary.Summary = make(map[string]structs.TaskGroupSummary)
summary.Summary["web"] = structs.TaskGroupSummary{
Queued: 1,
}

// Delete the child job summary
state.DeleteJobSummary(125, childJob.Namespace, childJob.ID)

state.ReconcileJobSummaries(120)

ws := memdb.NewWatchSet()

// Verify parent summary is corrected
summary, _ = state.JobSummaryByID(ws, alloc.Namespace, job1.ID)
expectedSummary := structs.JobSummary{
JobID: job1.ID,
Namespace: job1.Namespace,
Summary: make(map[string]structs.TaskGroupSummary),
Children: &structs.JobChildrenSummary{
Running: 1,
},
CreateIndex: 100,
ModifyIndex: 120,
}
require.Equal(&expectedSummary, summary)

// Verify child job summary is also correct
childSummary, _ := state.JobSummaryByID(ws, childJob.Namespace, childJob.ID)
expectedChildSummary := structs.JobSummary{
JobID: childJob.ID,
Namespace: childJob.Namespace,
Summary: map[string]structs.TaskGroupSummary{
"web": {
Running: 1,
Failed: 1,
},
},
CreateIndex: 110,
ModifyIndex: 120,
}
require.Equal(&expectedChildSummary, childSummary)
}

func TestStateStore_UpdateAlloc_JobNotPresent(t *testing.T) {
state := testStateStore(t)

Expand Down

0 comments on commit dbb2e26

Please sign in to comment.