From 7f4f55bad554201e42d6022ee542f06febd8e0ee Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Thu, 17 Jan 2019 10:18:19 -0600 Subject: [PATCH 1/7] wip cluster with raw exec enabled --- dev/cluster/client1.hcl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dev/cluster/client1.hcl b/dev/cluster/client1.hcl index 93db926b75e..60597aa69aa 100644 --- a/dev/cluster/client1.hcl +++ b/dev/cluster/client1.hcl @@ -10,7 +10,9 @@ name = "client1" # Enable the client client { enabled = true - + options = { + "driver.raw_exec.enable" = "1" + } server_join { retry_join = ["127.0.0.1:4647", "127.0.0.1:5647", "127.0.0.1:6647"] } From b03833307b2c6ee6531ae83678767eaca4a45617 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Thu, 17 Jan 2019 11:30:43 -0600 Subject: [PATCH 2/7] Fix bug in reconcile summaries that affects periodic/parameterized jobs This fixes incorrect parent job summaries by recomputing them in the ReconcileJobSummaries method in the state store --- nomad/fsm.go | 5 +++ nomad/fsm_test.go | 76 ++++++++++++++++++++++++++++++++++++++ nomad/state/state_store.go | 65 ++++++++++++++++++++++++++++++++ 3 files changed, 146 insertions(+) diff --git a/nomad/fsm.go b/nomad/fsm.go index 6c7ed2e91d0..c25e99fdcf0 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1401,6 +1401,11 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { break } job := rawJob.(*structs.Job) + + // Nothing to do for queued allocations if the job is a parent periodic/parameterized job + if job.ParentID == "" && (job.Periodic != nil || job.ParameterizedJob != nil) { + continue + } planner := &scheduler.Harness{ State: &snap.StateStore, } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 28e69ae71ac..bb1a5d91c95 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2816,6 +2816,82 @@ func TestFSM_ReconcileSummaries(t *testing.T) { } } +// COMPAT: Remove in 0.11 +func TestFSM_ReconcileParentJobSummaries(t *testing.T) { + // This test exercises code to handle https://github.com/hashicorp/nomad/issues/3886 + t.Parallel() + + require := require.New(t) + // Add some state + fsm := testFSM(t) + state := fsm.State() + + // Add a node + node := mock.Node() + state.UpsertNode(800, node) + + // Make a parameterized job + job1 := mock.BatchJob() + job1.ID = "test" + job1.ParameterizedJob = &structs.ParameterizedJobConfig{ + Payload: "random", + } + job1.TaskGroups[0].Count = 1 + state.UpsertJob(1000, job1) + + // Make an alloc for its child job + childJob := job1.Copy() + childJob.ID = job1.ID + "dispatch-23423423" + childJob.ParentID = job1.ID + childJob.Status = structs.JobStatusRunning + + alloc := mock.Alloc() + alloc.NodeID = node.ID + alloc.Job = childJob + alloc.JobID = childJob.ID + + state.UpsertJob(1010, childJob) + alloc.ClientStatus = structs.AllocClientStatusRunning + state.UpsertAllocs(1011, []*structs.Allocation{alloc}) + + // Make the summary incorrect in the state store + summary, err := state.JobSummaryByID(nil, job1.Namespace, job1.ID) + require.Nil(err) + + summary.Children = nil + summary.Summary = make(map[string]structs.TaskGroupSummary) + summary.Summary["web"] = structs.TaskGroupSummary{ + Queued: 1, + } + + // state.DeleteJobSummary(1030, job1.Namespace, job1.ID) + + req := structs.GenericRequest{} + buf, err := structs.Encode(structs.ReconcileJobSummariesRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + ws := memdb.NewWatchSet() + out1, _ := state.JobSummaryByID(ws, job1.Namespace, job1.ID) + expected := structs.JobSummary{ + JobID: job1.ID, + Namespace: job1.Namespace, + Summary: make(map[string]structs.TaskGroupSummary), + CreateIndex: 1000, + ModifyIndex: out1.ModifyIndex, + Children: &structs.JobChildrenSummary{ + Running: 1, + }, + } + require.Equal(&expected, out1) +} + func TestFSM_LeakedDeployments(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 646d9e42a19..30072e768e6 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -11,6 +11,7 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" + "reflect" ) // Txn is a transaction against a state store. @@ -3056,6 +3057,70 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { } job := rawJob.(*structs.Job) + if (job.ParameterizedJob != nil || job.Periodic != nil) && job.ParentID == "" { + + // COMPAT: Remove after 0.11 + + // The following block of code fixes incorrect child summaries due to a bug + // See https://github.com/hashicorp/nomad/issues/3886 for details + summaryIter, err := txn.Get("job_summary", "id", job.Namespace, job.ID) + if err != nil { + return err + } + + rawSummary := summaryIter.Next() + if rawSummary != nil { + oldSummary := rawSummary.(*structs.JobSummary) + + // Create an empty summary + summary := &structs.JobSummary{ + JobID: job.ID, + Namespace: job.Namespace, + Summary: make(map[string]structs.TaskGroupSummary), + Children: &structs.JobChildrenSummary{}, + } + + // Calculate children summary by iterating over all jobs + // and finding the ones that have this job as its parent + jobIter, err := txn.Get("jobs", "id") + if err != nil { + return err + } + for { + rawJob := jobIter.Next() + if rawJob == nil { + break + } + childJob := rawJob.(*structs.Job) + if childJob.ParentID == job.ID { + switch childJob.Status { + case structs.JobStatusPending: + summary.Children.Pending++ + case structs.JobStatusDead: + summary.Children.Dead++ + case structs.JobStatusRunning: + summary.Children.Running++ + } + } + } + + // Insert the job summary if its different + if !reflect.DeepEqual(summary, oldSummary) { + // Set the create index of the summary same as the job's create index + // and the modify index to the current index + summary.CreateIndex = job.CreateIndex + summary.ModifyIndex = index + + if err := txn.Insert("job_summary", summary); err != nil { + return fmt.Errorf("error inserting job summary: %v", err) + } + } + } + + // Done with handling a parent job, continue to next + continue + } + // Create a job summary for the job summary := &structs.JobSummary{ JobID: job.ID, From 945181205b41e876196b16196217b34cd9d778a1 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Thu, 17 Jan 2019 12:15:42 -0600 Subject: [PATCH 3/7] Use IsParameterized/isPeriodic methods --- nomad/fsm.go | 2 +- nomad/state/state_store.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index c25e99fdcf0..c78b1fb51b5 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1403,7 +1403,7 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { job := rawJob.(*structs.Job) // Nothing to do for queued allocations if the job is a parent periodic/parameterized job - if job.ParentID == "" && (job.Periodic != nil || job.ParameterizedJob != nil) { + if job.IsParameterized() || job.IsPeriodic() { continue } planner := &scheduler.Harness{ diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 30072e768e6..79ceb91d22c 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3057,8 +3057,7 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { } job := rawJob.(*structs.Job) - if (job.ParameterizedJob != nil || job.Periodic != nil) && job.ParentID == "" { - + if job.IsParameterized() || job.IsPeriodic() { // COMPAT: Remove after 0.11 // The following block of code fixes incorrect child summaries due to a bug From be323011ceae0a19cb06e6132c323d8f9b11a9b7 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Thu, 17 Jan 2019 14:29:48 -0600 Subject: [PATCH 4/7] Refactor to find jobs with child instances more effeciently also added unit tests --- nomad/fsm_test.go | 18 +++---- nomad/state/state_store.go | 53 ++++++++++++-------- nomad/state/state_store_test.go | 89 +++++++++++++++++++++++++++++++++ 3 files changed, 128 insertions(+), 32 deletions(-) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index bb1a5d91c95..0c3a846a85f 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2817,7 +2817,7 @@ func TestFSM_ReconcileSummaries(t *testing.T) { } // COMPAT: Remove in 0.11 -func TestFSM_ReconcileParentJobSummaries(t *testing.T) { +func TestFSM_ReconcileParentJobSummary(t *testing.T) { // This test exercises code to handle https://github.com/hashicorp/nomad/issues/3886 t.Parallel() @@ -2839,19 +2839,21 @@ func TestFSM_ReconcileParentJobSummaries(t *testing.T) { job1.TaskGroups[0].Count = 1 state.UpsertJob(1000, job1) - // Make an alloc for its child job + // Make a child job childJob := job1.Copy() childJob.ID = job1.ID + "dispatch-23423423" childJob.ParentID = job1.ID + childJob.Dispatched = true childJob.Status = structs.JobStatusRunning + // Create an alloc for child job alloc := mock.Alloc() alloc.NodeID = node.ID alloc.Job = childJob alloc.JobID = childJob.ID + alloc.ClientStatus = structs.AllocClientStatusRunning state.UpsertJob(1010, childJob) - alloc.ClientStatus = structs.AllocClientStatusRunning state.UpsertAllocs(1011, []*structs.Allocation{alloc}) // Make the summary incorrect in the state store @@ -2864,18 +2866,12 @@ func TestFSM_ReconcileParentJobSummaries(t *testing.T) { Queued: 1, } - // state.DeleteJobSummary(1030, job1.Namespace, job1.ID) - req := structs.GenericRequest{} buf, err := structs.Encode(structs.ReconcileJobSummariesRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } + require.Nil(err) resp := fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } + require.Nil(resp) ws := memdb.NewWatchSet() out1, _ := state.JobSummaryByID(ws, job1.Namespace, job1.ID) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 79ceb91d22c..0f1b75432ab 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3050,6 +3050,28 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { if err != nil { return err } + // COMPAT: Remove after 0.11 + // Iterate over jobs to build a list of parent jobs and their children + parentMap := make(map[string][]*structs.Job) + for { + rawJob := iter.Next() + if rawJob == nil { + break + } + job := rawJob.(*structs.Job) + if job.ParentID != "" { + children := parentMap[job.ParentID] + children = append(children, job) + parentMap[job.ParentID] = children + } + } + + // Get all the jobs again + iter, err = txn.Get("jobs", "id") + if err != nil { + return err + } + for { rawJob := iter.Next() if rawJob == nil { @@ -3079,27 +3101,16 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { Children: &structs.JobChildrenSummary{}, } - // Calculate children summary by iterating over all jobs - // and finding the ones that have this job as its parent - jobIter, err := txn.Get("jobs", "id") - if err != nil { - return err - } - for { - rawJob := jobIter.Next() - if rawJob == nil { - break - } - childJob := rawJob.(*structs.Job) - if childJob.ParentID == job.ID { - switch childJob.Status { - case structs.JobStatusPending: - summary.Children.Pending++ - case structs.JobStatusDead: - summary.Children.Dead++ - case structs.JobStatusRunning: - summary.Children.Running++ - } + // Iterate over children of this job if any to fix summary counts + children := parentMap[job.ID] + for _, childJob := range children { + switch childJob.Status { + case structs.JobStatusPending: + summary.Children.Pending++ + case structs.JobStatusDead: + summary.Children.Dead++ + case structs.JobStatusRunning: + summary.Children.Running++ } } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 48316e8c072..3cf3b977606 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -4354,6 +4354,95 @@ func TestStateStore_ReconcileJobSummary(t *testing.T) { } } +func TestStateStore_ReconcileParentJobSummary(t *testing.T) { + t.Parallel() + require := require.New(t) + state := testStateStore(t) + + // Add a node + node := mock.Node() + state.UpsertNode(80, node) + + // Make a parameterized job + job1 := mock.BatchJob() + job1.ID = "test" + job1.ParameterizedJob = &structs.ParameterizedJobConfig{ + Payload: "random", + } + job1.TaskGroups[0].Count = 1 + state.UpsertJob(100, job1) + + // Make a child job + childJob := job1.Copy() + childJob.ID = job1.ID + "dispatch-23423423" + childJob.ParentID = job1.ID + childJob.Dispatched = true + childJob.Status = structs.JobStatusRunning + + // Make some allocs for child job + alloc := mock.Alloc() + alloc.NodeID = node.ID + alloc.Job = childJob + alloc.JobID = childJob.ID + alloc.ClientStatus = structs.AllocClientStatusRunning + + alloc2 := mock.Alloc() + alloc2.NodeID = node.ID + alloc2.Job = childJob + alloc2.JobID = childJob.ID + alloc2.ClientStatus = structs.AllocClientStatusFailed + + require.Nil(state.UpsertJob(110, childJob)) + require.Nil(state.UpsertAllocs(111, []*structs.Allocation{alloc, alloc2})) + + // Make the summary incorrect in the state store + summary, err := state.JobSummaryByID(nil, job1.Namespace, job1.ID) + require.Nil(err) + + summary.Children = nil + summary.Summary = make(map[string]structs.TaskGroupSummary) + summary.Summary["web"] = structs.TaskGroupSummary{ + Queued: 1, + } + + // Delete the child job summary + state.DeleteJobSummary(125, childJob.Namespace, childJob.ID) + + state.ReconcileJobSummaries(120) + + ws := memdb.NewWatchSet() + + // Verify parent summary is corrected + summary, _ = state.JobSummaryByID(ws, alloc.Namespace, job1.ID) + expectedSummary := structs.JobSummary{ + JobID: job1.ID, + Namespace: job1.Namespace, + Summary: make(map[string]structs.TaskGroupSummary), + Children: &structs.JobChildrenSummary{ + Running: 1, + }, + CreateIndex: 100, + ModifyIndex: 120, + } + require.Equal(&expectedSummary, summary) + + // Verify child job summary is also correct + childSummary, _ := state.JobSummaryByID(ws, childJob.Namespace, childJob.ID) + expectedChildSummary := structs.JobSummary{ + JobID: childJob.ID, + Namespace: childJob.Namespace, + Summary: map[string]structs.TaskGroupSummary{ + "web": { + Running: 1, + Failed: 1, + }, + }, + CreateIndex: 110, + ModifyIndex: 120, + } + require.Equal(&expectedChildSummary, childSummary) +} + func TestStateStore_UpdateAlloc_JobNotPresent(t *testing.T) { state := testStateStore(t) From 946d47712bfc2ac8da04465fdcaed846790751b2 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Thu, 17 Jan 2019 15:36:33 -0600 Subject: [PATCH 5/7] fix linting --- nomad/state/state_store.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 0f1b75432ab..25dbbf8294a 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -6,12 +6,13 @@ import ( "sort" "time" + "reflect" + log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" - "reflect" ) // Txn is a transaction against a state store. From e58fcf7ac74105f4470418f39c71b8ada188f5f9 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Fri, 18 Jan 2019 17:36:35 -0600 Subject: [PATCH 6/7] revert unintended change --- dev/cluster/client1.hcl | 3 --- 1 file changed, 3 deletions(-) diff --git a/dev/cluster/client1.hcl b/dev/cluster/client1.hcl index 60597aa69aa..7008dba243f 100644 --- a/dev/cluster/client1.hcl +++ b/dev/cluster/client1.hcl @@ -10,9 +10,6 @@ name = "client1" # Enable the client client { enabled = true - options = { - "driver.raw_exec.enable" = "1" - } server_join { retry_join = ["127.0.0.1:4647", "127.0.0.1:5647", "127.0.0.1:6647"] } From b403c9bf14c6b1639b9856058cc23ea69933f261 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Fri, 18 Jan 2019 17:41:39 -0600 Subject: [PATCH 7/7] code review comments --- nomad/state/state_store.go | 62 +++++++++++++++++++------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 25dbbf8294a..49626547bcd 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3085,46 +3085,46 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { // The following block of code fixes incorrect child summaries due to a bug // See https://github.com/hashicorp/nomad/issues/3886 for details - summaryIter, err := txn.Get("job_summary", "id", job.Namespace, job.ID) + rawSummary, err := txn.First("job_summary", "id", job.Namespace, job.ID) if err != nil { return err } + if rawSummary == nil { + continue + } - rawSummary := summaryIter.Next() - if rawSummary != nil { - oldSummary := rawSummary.(*structs.JobSummary) + oldSummary := rawSummary.(*structs.JobSummary) - // Create an empty summary - summary := &structs.JobSummary{ - JobID: job.ID, - Namespace: job.Namespace, - Summary: make(map[string]structs.TaskGroupSummary), - Children: &structs.JobChildrenSummary{}, - } + // Create an empty summary + summary := &structs.JobSummary{ + JobID: job.ID, + Namespace: job.Namespace, + Summary: make(map[string]structs.TaskGroupSummary), + Children: &structs.JobChildrenSummary{}, + } - // Iterate over children of this job if any to fix summary counts - children := parentMap[job.ID] - for _, childJob := range children { - switch childJob.Status { - case structs.JobStatusPending: - summary.Children.Pending++ - case structs.JobStatusDead: - summary.Children.Dead++ - case structs.JobStatusRunning: - summary.Children.Running++ - } + // Iterate over children of this job if any to fix summary counts + children := parentMap[job.ID] + for _, childJob := range children { + switch childJob.Status { + case structs.JobStatusPending: + summary.Children.Pending++ + case structs.JobStatusDead: + summary.Children.Dead++ + case structs.JobStatusRunning: + summary.Children.Running++ } + } - // Insert the job summary if its different - if !reflect.DeepEqual(summary, oldSummary) { - // Set the create index of the summary same as the job's create index - // and the modify index to the current index - summary.CreateIndex = job.CreateIndex - summary.ModifyIndex = index + // Insert the job summary if its different + if !reflect.DeepEqual(summary, oldSummary) { + // Set the create index of the summary same as the job's create index + // and the modify index to the current index + summary.CreateIndex = job.CreateIndex + summary.ModifyIndex = index - if err := txn.Insert("job_summary", summary); err != nil { - return fmt.Errorf("error inserting job summary: %v", err) - } + if err := txn.Insert("job_summary", summary); err != nil { + return fmt.Errorf("error inserting job summary: %v", err) } }