From 8ccb6bf3d6b28807a9e242872f9a16e33a804059 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Mon, 10 Jun 2019 17:09:20 -0400 Subject: [PATCH 1/3] test that stopped alloc jobs aren't modified When an alloc is stopped, test that we don't update the job found in alloc with new job that is no longer relevent for this alloc. --- nomad/state/state_store_test.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 7db7a2c7ff9..390deb13703 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -144,6 +144,7 @@ func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing // This test checks that: // 1) The job is denormalized // 2) Allocations are denormalized and updated with the diff +// That stopped allocs Job is unmodified func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { state := testStateStore(t) alloc := mock.Alloc() @@ -168,6 +169,12 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { require.NoError(state.UpsertAllocs(900, []*structs.Allocation{stoppedAlloc, preemptedAlloc})) require.NoError(state.UpsertJob(999, job)) + // modify job and ensure that stopped and preempted alloc point to original Job + mJob := job.Copy() + mJob.TaskGroups[0].Name = "other" + + require.NoError(state.UpsertJob(1001, mJob)) + eval := mock.Eval() eval.JobID = job.ID @@ -179,7 +186,7 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { AllocUpdateRequest: structs.AllocUpdateRequest{ AllocsUpdated: []*structs.Allocation{alloc}, AllocsStopped: []*structs.AllocationDiff{stoppedAllocDiff}, - Job: job, + Job: mJob, }, EvalID: eval.ID, AllocsPreempted: []*structs.AllocationDiff{preemptedAllocDiff}, @@ -194,6 +201,11 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { require.NoError(err) assert.Equal(alloc, out) + outJob, err := state.JobByID(ws, job.Namespace, job.ID) + require.NoError(err) + require.Equal(mJob.TaskGroups, outJob.TaskGroups) + require.NotEmpty(job.TaskGroups, outJob.TaskGroups) + updatedStoppedAlloc, err := state.AllocByID(ws, stoppedAlloc.ID) require.NoError(err) assert.Equal(stoppedAllocDiff.DesiredDescription, updatedStoppedAlloc.DesiredDescription) @@ -201,6 +213,7 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { assert.Equal(stoppedAllocDiff.ClientStatus, updatedStoppedAlloc.ClientStatus) assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex) assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex) + assert.Equal(job.TaskGroups, updatedStoppedAlloc.Job.TaskGroups) updatedPreemptedAlloc, err := state.AllocByID(ws, preemptedAlloc.ID) require.NoError(err) @@ -208,6 +221,7 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { assert.Equal(preemptedAllocDiff.PreemptedByAllocation, updatedPreemptedAlloc.PreemptedByAllocation) assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex) assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex) + assert.Equal(job.TaskGroups, updatedPreemptedAlloc.Job.TaskGroups) index, err := state.Index("allocs") require.NoError(err) @@ -219,6 +233,7 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { require.NoError(err) require.NotNil(evalOut) assert.EqualValues(planModifyIndex, evalOut.ModifyIndex) + } // This test checks that the deployment is created and allocations count towards From 7825a3ca4d6c8a4f2f89381893d63e809c5b49db Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Mon, 10 Jun 2019 17:19:54 -0400 Subject: [PATCH 2/3] Stop updating allocs.Job on stopping or preemption --- nomad/plan_apply.go | 4 ++-- nomad/state/state_store.go | 20 +++++++++----------- nomad/state/state_store_test.go | 2 +- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index f9b8d7c63a2..92873d0d52b 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -339,12 +339,12 @@ func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.P defer metrics.MeasureSince([]string{"nomad", "plan", "evaluate"}, time.Now()) // Denormalize without the job - err := snap.DenormalizeAllocationsMap(plan.NodeUpdate, nil) + err := snap.DenormalizeAllocationsMap(plan.NodeUpdate) if err != nil { return nil, err } // Denormalize without the job - err = snap.DenormalizeAllocationsMap(plan.NodePreemptions, nil) + err = snap.DenormalizeAllocationsMap(plan.NodePreemptions) if err != nil { return nil, err } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6da9f6e8f01..e38099091f4 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -230,18 +230,18 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR return err } - allocsStopped, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsStopped, results.Job) + allocsStopped, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsStopped) if err != nil { return err } - allocsPreempted, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsPreempted, results.Job) + allocsPreempted, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsPreempted) if err != nil { return err } // COMPAT 0.11: Remove this denormalization when NodePreemptions is removed - results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions, results.Job) + results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions) if err != nil { return err } @@ -4192,9 +4192,9 @@ type StateSnapshot struct { // DenormalizeAllocationsMap takes in a map of nodes to allocations, and queries the // Allocation for each of the Allocation diffs and merges the updated attributes with // the existing Allocation, and attaches the Job provided -func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation, job *structs.Job) error { +func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation) error { for nodeID, allocs := range nodeAllocations { - denormalizedAllocs, err := s.DenormalizeAllocationSlice(allocs, job) + denormalizedAllocs, err := s.DenormalizeAllocationSlice(allocs) if err != nil { return err } @@ -4207,18 +4207,18 @@ func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]* // DenormalizeAllocationSlice queries the Allocation for each allocation diff // represented as an Allocation and merges the updated attributes with the existing // Allocation, and attaches the Job provided. -func (s *StateSnapshot) DenormalizeAllocationSlice(allocs []*structs.Allocation, job *structs.Job) ([]*structs.Allocation, error) { +func (s *StateSnapshot) DenormalizeAllocationSlice(allocs []*structs.Allocation) ([]*structs.Allocation, error) { allocDiffs := make([]*structs.AllocationDiff, len(allocs)) for i, alloc := range allocs { allocDiffs[i] = alloc.AllocationDiff() } - return s.DenormalizeAllocationDiffSlice(allocDiffs, job) + return s.DenormalizeAllocationDiffSlice(allocDiffs) } // DenormalizeAllocationDiffSlice queries the Allocation for each AllocationDiff and merges // the updated attributes with the existing Allocation, and attaches the Job provided -func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.AllocationDiff, planJob *structs.Job) ([]*structs.Allocation, error) { +func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.AllocationDiff) ([]*structs.Allocation, error) { // Output index for denormalized Allocations j := 0 @@ -4233,17 +4233,15 @@ func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.All } // Merge the updates to the Allocation - allocCopy := alloc.CopySkipJob() + allocCopy := alloc.Copy() if allocDiff.PreemptedByAllocation != "" { // If alloc is a preemption set the job from the alloc read from the state store - allocCopy.Job = alloc.Job.Copy() allocCopy.PreemptedByAllocation = allocDiff.PreemptedByAllocation allocCopy.DesiredDescription = getPreemptedAllocDesiredDescription(allocDiff.PreemptedByAllocation) allocCopy.DesiredStatus = structs.AllocDesiredStatusEvict } else { // If alloc is a stopped alloc - allocCopy.Job = planJob allocCopy.DesiredDescription = allocDiff.DesiredDescription allocCopy.DesiredStatus = structs.AllocDesiredStatusStop if allocDiff.ClientStatus != "" { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 390deb13703..d21e4bbdefb 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -7132,7 +7132,7 @@ func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testi snap, err := state.Snapshot() require.NoError(err) - denormalizedAllocs, err := snap.DenormalizeAllocationDiffSlice(allocDiffs, alloc.Job) + denormalizedAllocs, err := snap.DenormalizeAllocationDiffSlice(allocDiffs) require.EqualError(err, fmt.Sprintf("alloc %v doesn't exist", alloc.ID)) require.Nil(denormalizedAllocs) From 41a7fe8530954e9b5de76bc1f301d7e78aeb73a4 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Mon, 10 Jun 2019 17:24:41 -0400 Subject: [PATCH 3/3] client/allocrunner: depend on internal task state Alloc runner already tracks tasks associated with alloc. Here, we become defensive by relying on the alloc runner tracked tasks, rather than depend on server never updating the job unexpectedly. --- client/allocrunner/alloc_runner.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index e9efa120f15..544f9ba6ffa 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -571,11 +571,11 @@ func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *st // Make sure we have marked the finished at for every task. This is used // to calculate the reschedule time for failed allocations. now := time.Now() - for _, task := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks { - ts, ok := a.TaskStates[task.Name] + for taskName := range ar.tasks { + ts, ok := a.TaskStates[taskName] if !ok { ts = &structs.TaskState{} - a.TaskStates[task.Name] = ts + a.TaskStates[taskName] = ts } if ts.FinishedAt.IsZero() { ts.FinishedAt = now