Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconcile child summaries correctly #5205

Merged
merged 7 commits into from
Jan 19, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dev/cluster/client1.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ name = "client1"
# Enable the client
client {
enabled = true

options = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

"driver.raw_exec.enable" = "1"
}
server_join {
retry_join = ["127.0.0.1:4647", "127.0.0.1:5647", "127.0.0.1:6647"]
}
Expand Down
5 changes: 5 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,11 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
break
}
job := rawJob.(*structs.Job)

// Nothing to do for queued allocations if the job is a parent periodic/parameterized job
if job.IsParameterized() || job.IsPeriodic() {
continue
}
planner := &scheduler.Harness{
State: &snap.StateStore,
}
Expand Down
72 changes: 72 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2816,6 +2816,78 @@ func TestFSM_ReconcileSummaries(t *testing.T) {
}
}

// COMPAT: Remove in 0.11
func TestFSM_ReconcileParentJobSummary(t *testing.T) {
// This test exercises code to handle https://github.com/hashicorp/nomad/issues/3886
t.Parallel()

require := require.New(t)
// Add some state
fsm := testFSM(t)
state := fsm.State()

// Add a node
node := mock.Node()
state.UpsertNode(800, node)

// Make a parameterized job
job1 := mock.BatchJob()
job1.ID = "test"
job1.ParameterizedJob = &structs.ParameterizedJobConfig{
Payload: "random",
}
job1.TaskGroups[0].Count = 1
state.UpsertJob(1000, job1)

// Make a child job
childJob := job1.Copy()
childJob.ID = job1.ID + "dispatch-23423423"
childJob.ParentID = job1.ID
childJob.Dispatched = true
childJob.Status = structs.JobStatusRunning

// Create an alloc for child job
alloc := mock.Alloc()
alloc.NodeID = node.ID
alloc.Job = childJob
alloc.JobID = childJob.ID
alloc.ClientStatus = structs.AllocClientStatusRunning

state.UpsertJob(1010, childJob)
state.UpsertAllocs(1011, []*structs.Allocation{alloc})

// Make the summary incorrect in the state store
summary, err := state.JobSummaryByID(nil, job1.Namespace, job1.ID)
require.Nil(err)

summary.Children = nil
summary.Summary = make(map[string]structs.TaskGroupSummary)
summary.Summary["web"] = structs.TaskGroupSummary{
Queued: 1,
}

req := structs.GenericRequest{}
buf, err := structs.Encode(structs.ReconcileJobSummariesRequestType, req)
require.Nil(err)

resp := fsm.Apply(makeLog(buf))
require.Nil(resp)

ws := memdb.NewWatchSet()
out1, _ := state.JobSummaryByID(ws, job1.Namespace, job1.ID)
expected := structs.JobSummary{
JobID: job1.ID,
Namespace: job1.Namespace,
Summary: make(map[string]structs.TaskGroupSummary),
CreateIndex: 1000,
ModifyIndex: out1.ModifyIndex,
Children: &structs.JobChildrenSummary{
Running: 1,
},
}
require.Equal(&expected, out1)
}

func TestFSM_LeakedDeployments(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down
76 changes: 76 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sort"
"time"

"reflect"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -3049,12 +3051,86 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error {
if err != nil {
return err
}
// COMPAT: Remove after 0.11
// Iterate over jobs to build a list of parent jobs and their children
parentMap := make(map[string][]*structs.Job)
for {
rawJob := iter.Next()
if rawJob == nil {
break
}
job := rawJob.(*structs.Job)
if job.ParentID != "" {
children := parentMap[job.ParentID]
children = append(children, job)
parentMap[job.ParentID] = children
}
}

// Get all the jobs again
iter, err = txn.Get("jobs", "id")
if err != nil {
return err
}

for {
rawJob := iter.Next()
if rawJob == nil {
break
}
job := rawJob.(*structs.Job)

if job.IsParameterized() || job.IsPeriodic() {
// COMPAT: Remove after 0.11

// The following block of code fixes incorrect child summaries due to a bug
// See https://github.com/hashicorp/nomad/issues/3886 for details
summaryIter, err := txn.Get("job_summary", "id", job.Namespace, job.ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use txn.First?

if err != nil {
return err
}

rawSummary := summaryIter.Next()
if rawSummary != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if rawSummary == nil { continue}

oldSummary := rawSummary.(*structs.JobSummary)

// Create an empty summary
summary := &structs.JobSummary{
JobID: job.ID,
Namespace: job.Namespace,
Summary: make(map[string]structs.TaskGroupSummary),
Children: &structs.JobChildrenSummary{},
}

// Iterate over children of this job if any to fix summary counts
children := parentMap[job.ID]
for _, childJob := range children {
switch childJob.Status {
case structs.JobStatusPending:
summary.Children.Pending++
case structs.JobStatusDead:
summary.Children.Dead++
case structs.JobStatusRunning:
summary.Children.Running++
}
}

// Insert the job summary if its different
if !reflect.DeepEqual(summary, oldSummary) {
// Set the create index of the summary same as the job's create index
// and the modify index to the current index
summary.CreateIndex = job.CreateIndex
summary.ModifyIndex = index

if err := txn.Insert("job_summary", summary); err != nil {
return fmt.Errorf("error inserting job summary: %v", err)
}
}
}

// Done with handling a parent job, continue to next
continue
}

// Create a job summary for the job
summary := &structs.JobSummary{
Expand Down
89 changes: 89 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4354,6 +4354,95 @@ func TestStateStore_ReconcileJobSummary(t *testing.T) {
}
}

func TestStateStore_ReconcileParentJobSummary(t *testing.T) {
t.Parallel()
require := require.New(t)
state := testStateStore(t)

// Add a node
node := mock.Node()
state.UpsertNode(80, node)

// Make a parameterized job
job1 := mock.BatchJob()
job1.ID = "test"
job1.ParameterizedJob = &structs.ParameterizedJobConfig{
Payload: "random",
}
job1.TaskGroups[0].Count = 1
state.UpsertJob(100, job1)

// Make a child job
childJob := job1.Copy()
childJob.ID = job1.ID + "dispatch-23423423"
childJob.ParentID = job1.ID
childJob.Dispatched = true
childJob.Status = structs.JobStatusRunning

// Make some allocs for child job
alloc := mock.Alloc()
alloc.NodeID = node.ID
alloc.Job = childJob
alloc.JobID = childJob.ID
alloc.ClientStatus = structs.AllocClientStatusRunning

alloc2 := mock.Alloc()
alloc2.NodeID = node.ID
alloc2.Job = childJob
alloc2.JobID = childJob.ID
alloc2.ClientStatus = structs.AllocClientStatusFailed

require.Nil(state.UpsertJob(110, childJob))
require.Nil(state.UpsertAllocs(111, []*structs.Allocation{alloc, alloc2}))

// Make the summary incorrect in the state store
summary, err := state.JobSummaryByID(nil, job1.Namespace, job1.ID)
require.Nil(err)

summary.Children = nil
summary.Summary = make(map[string]structs.TaskGroupSummary)
summary.Summary["web"] = structs.TaskGroupSummary{
Queued: 1,
}

// Delete the child job summary
state.DeleteJobSummary(125, childJob.Namespace, childJob.ID)

state.ReconcileJobSummaries(120)

ws := memdb.NewWatchSet()

// Verify parent summary is corrected
summary, _ = state.JobSummaryByID(ws, alloc.Namespace, job1.ID)
expectedSummary := structs.JobSummary{
JobID: job1.ID,
Namespace: job1.Namespace,
Summary: make(map[string]structs.TaskGroupSummary),
Children: &structs.JobChildrenSummary{
Running: 1,
},
CreateIndex: 100,
ModifyIndex: 120,
}
require.Equal(&expectedSummary, summary)

// Verify child job summary is also correct
childSummary, _ := state.JobSummaryByID(ws, childJob.Namespace, childJob.ID)
expectedChildSummary := structs.JobSummary{
JobID: childJob.ID,
Namespace: childJob.Namespace,
Summary: map[string]structs.TaskGroupSummary{
"web": {
Running: 1,
Failed: 1,
},
},
CreateIndex: 110,
ModifyIndex: 120,
}
require.Equal(&expectedChildSummary, childSummary)
}

func TestStateStore_UpdateAlloc_JobNotPresent(t *testing.T) {
state := testStateStore(t)

Expand Down