diff --git a/dev/cluster/client1.hcl b/dev/cluster/client1.hcl index 93db926b75e..7008dba243f 100644 --- a/dev/cluster/client1.hcl +++ b/dev/cluster/client1.hcl @@ -10,7 +10,6 @@ name = "client1" # Enable the client client { enabled = true - server_join { retry_join = ["127.0.0.1:4647", "127.0.0.1:5647", "127.0.0.1:6647"] } diff --git a/nomad/fsm.go b/nomad/fsm.go index 6c7ed2e91d0..c78b1fb51b5 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1401,6 +1401,11 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { break } job := rawJob.(*structs.Job) + + // Nothing to do for queued allocations if the job is a parent periodic/parameterized job + if job.IsParameterized() || job.IsPeriodic() { + continue + } planner := &scheduler.Harness{ State: &snap.StateStore, } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 28e69ae71ac..0c3a846a85f 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2816,6 +2816,78 @@ func TestFSM_ReconcileSummaries(t *testing.T) { } } +// COMPAT: Remove in 0.11 +func TestFSM_ReconcileParentJobSummary(t *testing.T) { + // This test exercises code to handle https://github.com/hashicorp/nomad/issues/3886 + t.Parallel() + + require := require.New(t) + // Add some state + fsm := testFSM(t) + state := fsm.State() + + // Add a node + node := mock.Node() + state.UpsertNode(800, node) + + // Make a parameterized job + job1 := mock.BatchJob() + job1.ID = "test" + job1.ParameterizedJob = &structs.ParameterizedJobConfig{ + Payload: "random", + } + job1.TaskGroups[0].Count = 1 + state.UpsertJob(1000, job1) + + // Make a child job + childJob := job1.Copy() + childJob.ID = job1.ID + "dispatch-23423423" + childJob.ParentID = job1.ID + childJob.Dispatched = true + childJob.Status = structs.JobStatusRunning + + // Create an alloc for child job + alloc := mock.Alloc() + alloc.NodeID = node.ID + alloc.Job = childJob + alloc.JobID = childJob.ID + alloc.ClientStatus = structs.AllocClientStatusRunning + + state.UpsertJob(1010, childJob) + state.UpsertAllocs(1011, []*structs.Allocation{alloc}) + + // Make the summary incorrect in the state store + summary, err := state.JobSummaryByID(nil, job1.Namespace, job1.ID) + require.Nil(err) + + summary.Children = nil + summary.Summary = make(map[string]structs.TaskGroupSummary) + summary.Summary["web"] = structs.TaskGroupSummary{ + Queued: 1, + } + + req := structs.GenericRequest{} + buf, err := structs.Encode(structs.ReconcileJobSummariesRequestType, req) + require.Nil(err) + + resp := fsm.Apply(makeLog(buf)) + require.Nil(resp) + + ws := memdb.NewWatchSet() + out1, _ := state.JobSummaryByID(ws, job1.Namespace, job1.ID) + expected := structs.JobSummary{ + JobID: job1.ID, + Namespace: job1.Namespace, + Summary: make(map[string]structs.TaskGroupSummary), + CreateIndex: 1000, + ModifyIndex: out1.ModifyIndex, + Children: &structs.JobChildrenSummary{ + Running: 1, + }, + } + require.Equal(&expected, out1) +} + func TestFSM_LeakedDeployments(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 646d9e42a19..49626547bcd 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -6,6 +6,8 @@ import ( "sort" "time" + "reflect" + log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" multierror "github.com/hashicorp/go-multierror" @@ -3049,12 +3051,86 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { if err != nil { return err } + // COMPAT: Remove after 0.11 + // Iterate over jobs to build a list of parent jobs and their children + parentMap := make(map[string][]*structs.Job) for { rawJob := iter.Next() if rawJob == nil { break } job := rawJob.(*structs.Job) + if job.ParentID != "" { + children := parentMap[job.ParentID] + children = append(children, job) + parentMap[job.ParentID] = children + } + } + + // Get all the jobs again + iter, err = txn.Get("jobs", "id") + if err != nil { + return err + } + + for { + rawJob := iter.Next() + if rawJob == nil { + break + } + job := rawJob.(*structs.Job) + + if job.IsParameterized() || job.IsPeriodic() { + // COMPAT: Remove after 0.11 + + // The following block of code fixes incorrect child summaries due to a bug + // See https://github.com/hashicorp/nomad/issues/3886 for details + rawSummary, err := txn.First("job_summary", "id", job.Namespace, job.ID) + if err != nil { + return err + } + if rawSummary == nil { + continue + } + + oldSummary := rawSummary.(*structs.JobSummary) + + // Create an empty summary + summary := &structs.JobSummary{ + JobID: job.ID, + Namespace: job.Namespace, + Summary: make(map[string]structs.TaskGroupSummary), + Children: &structs.JobChildrenSummary{}, + } + + // Iterate over children of this job if any to fix summary counts + children := parentMap[job.ID] + for _, childJob := range children { + switch childJob.Status { + case structs.JobStatusPending: + summary.Children.Pending++ + case structs.JobStatusDead: + summary.Children.Dead++ + case structs.JobStatusRunning: + summary.Children.Running++ + } + } + + // Insert the job summary if its different + if !reflect.DeepEqual(summary, oldSummary) { + // Set the create index of the summary same as the job's create index + // and the modify index to the current index + summary.CreateIndex = job.CreateIndex + summary.ModifyIndex = index + + if err := txn.Insert("job_summary", summary); err != nil { + return fmt.Errorf("error inserting job summary: %v", err) + } + } + + // Done with handling a parent job, continue to next + continue + } // Create a job summary for the job summary := &structs.JobSummary{ diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 48316e8c072..3cf3b977606 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -4354,6 +4354,95 @@ func TestStateStore_ReconcileJobSummary(t *testing.T) { } } +func TestStateStore_ReconcileParentJobSummary(t *testing.T) { + t.Parallel() + require := require.New(t) + state := testStateStore(t) + + // Add a node + node := mock.Node() + state.UpsertNode(80, node) + + // Make a parameterized job + job1 := mock.BatchJob() + job1.ID = "test" + job1.ParameterizedJob = &structs.ParameterizedJobConfig{ + Payload: "random", + } + job1.TaskGroups[0].Count = 1 + state.UpsertJob(100, job1) + + // Make a child job + childJob := job1.Copy() + childJob.ID = job1.ID + "dispatch-23423423" + childJob.ParentID = job1.ID + childJob.Dispatched = true + childJob.Status = structs.JobStatusRunning + + // Make some allocs for child job + alloc := mock.Alloc() + alloc.NodeID = node.ID + alloc.Job = childJob + alloc.JobID = childJob.ID + alloc.ClientStatus = structs.AllocClientStatusRunning + + alloc2 := mock.Alloc() + alloc2.NodeID = node.ID + alloc2.Job = childJob + alloc2.JobID = childJob.ID + alloc2.ClientStatus = structs.AllocClientStatusFailed + + require.Nil(state.UpsertJob(110, childJob)) + require.Nil(state.UpsertAllocs(111, []*structs.Allocation{alloc, alloc2})) + + // Make the summary incorrect in the state store + summary, err := state.JobSummaryByID(nil, job1.Namespace, job1.ID) + require.Nil(err) + + summary.Children = nil + summary.Summary = make(map[string]structs.TaskGroupSummary) + summary.Summary["web"] = structs.TaskGroupSummary{ + Queued: 1, + } + + // Delete the child job summary + state.DeleteJobSummary(125, childJob.Namespace, childJob.ID) + + state.ReconcileJobSummaries(120) + + ws := memdb.NewWatchSet() + + // Verify parent summary is corrected + summary, _ = state.JobSummaryByID(ws, alloc.Namespace, job1.ID) + expectedSummary := structs.JobSummary{ + JobID: job1.ID, + Namespace: job1.Namespace, + Summary: make(map[string]structs.TaskGroupSummary), + Children: &structs.JobChildrenSummary{ + Running: 1, + }, + CreateIndex: 100, + ModifyIndex: 120, + } + require.Equal(&expectedSummary, summary) + + // Verify child job summary is also correct + childSummary, _ := state.JobSummaryByID(ws, childJob.Namespace, childJob.ID) + expectedChildSummary := structs.JobSummary{ + JobID: childJob.ID, + Namespace: childJob.Namespace, + Summary: map[string]structs.TaskGroupSummary{ + "web": { + Running: 1, + Failed: 1, + }, + }, + CreateIndex: 110, + ModifyIndex: 120, + } + require.Equal(&expectedChildSummary, childSummary) +} + func TestStateStore_UpdateAlloc_JobNotPresent(t *testing.T) { state := testStateStore(t)