diff --git a/nomad/leader.go b/nomad/leader.go index 8e26a56f514..17c8199962f 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -1243,7 +1243,7 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig { return config } - if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion) { + if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion, false) { s.logger.Named("autopilot").Warn("can't initialize until all servers are above minimum version", "min_version", minAutopilotVersion) return nil } @@ -1270,7 +1270,7 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration { if config != nil { return config } - if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion) { + if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion, false) { s.logger.Named("core").Warn("can't initialize scheduler config until all servers are above minimum version", "min_version", minSchedulerConfigVersion) return nil } diff --git a/nomad/operator_endpoint.go b/nomad/operator_endpoint.go index fc9edabdd97..c44e14c31c3 100644 --- a/nomad/operator_endpoint.go +++ b/nomad/operator_endpoint.go @@ -237,7 +237,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe } // All servers should be at or above 0.8.0 to apply this operatation - if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion) { + if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion, false) { return fmt.Errorf("All servers should be running version %v to update autopilot config", minAutopilotVersion) } @@ -305,7 +305,7 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe } // All servers should be at or above 0.9.0 to apply this operatation - if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion) { + if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion, false) { return fmt.Errorf("All servers should be running version %v to update scheduler config", minSchedulerConfigVersion) } // Apply the update diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index f40690d00f2..f9b8d7c63a2 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -15,7 +15,7 @@ import ( "github.com/hashicorp/raft" ) -// planner is used to mange the submitted allocation plans that are waiting +// planner is used to manage the submitted allocation plans that are waiting // to be accessed by the leader type planner struct { *Server @@ -149,52 +149,82 @@ func (p *planner) planApply() { // applyPlan is used to apply the plan result and to return the alloc index func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) { - // Determine the minimum number of updates, could be more if there - // are multiple updates per node - minUpdates := len(result.NodeUpdate) - minUpdates += len(result.NodeAllocation) - // Setup the update request req := structs.ApplyPlanResultsRequest{ AllocUpdateRequest: structs.AllocUpdateRequest{ - Job: plan.Job, - Alloc: make([]*structs.Allocation, 0, minUpdates), + Job: plan.Job, }, Deployment: result.Deployment, DeploymentUpdates: result.DeploymentUpdates, EvalID: plan.EvalID, - NodePreemptions: make([]*structs.Allocation, 0, len(result.NodePreemptions)), - } - for _, updateList := range result.NodeUpdate { - req.Alloc = append(req.Alloc, updateList...) - } - for _, allocList := range result.NodeAllocation { - req.Alloc = append(req.Alloc, allocList...) } - for _, preemptions := range result.NodePreemptions { - req.NodePreemptions = append(req.NodePreemptions, preemptions...) - } - - // Set the time the alloc was applied for the first time. This can be used - // to approximate the scheduling time. + preemptedJobIDs := make(map[structs.NamespacedID]struct{}) now := time.Now().UTC().UnixNano() - for _, alloc := range req.Alloc { - if alloc.CreateTime == 0 { - alloc.CreateTime = now + + if ServersMeetMinimumVersion(p.Members(), MinVersionPlanNormalization, true) { + // Initialize the allocs request using the new optimized log entry format. + // Determine the minimum number of updates, could be more if there + // are multiple updates per node + req.AllocsStopped = make([]*structs.AllocationDiff, 0, len(result.NodeUpdate)) + req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation)) + req.AllocsPreempted = make([]*structs.AllocationDiff, 0, len(result.NodePreemptions)) + + for _, updateList := range result.NodeUpdate { + for _, stoppedAlloc := range updateList { + req.AllocsStopped = append(req.AllocsStopped, normalizeStoppedAlloc(stoppedAlloc, now)) + } } - alloc.ModifyTime = now - } - // Set modify time for preempted allocs if any - // Also gather jobids to create follow up evals - preemptedJobIDs := make(map[structs.NamespacedID]struct{}) - for _, alloc := range req.NodePreemptions { - alloc.ModifyTime = now - id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID} - _, ok := preemptedJobIDs[id] - if !ok { - preemptedJobIDs[id] = struct{}{} + for _, allocList := range result.NodeAllocation { + req.AllocsUpdated = append(req.AllocsUpdated, allocList...) + } + + // Set the time the alloc was applied for the first time. This can be used + // to approximate the scheduling time. + updateAllocTimestamps(req.AllocsUpdated, now) + + for _, preemptions := range result.NodePreemptions { + for _, preemptedAlloc := range preemptions { + req.AllocsPreempted = append(req.AllocsPreempted, normalizePreemptedAlloc(preemptedAlloc, now)) + + // Gather jobids to create follow up evals + appendNamespacedJobID(preemptedJobIDs, preemptedAlloc) + } + } + } else { + // COMPAT 0.11: This branch is deprecated and will only be used to support + // application of older log entries. Expected to be removed in a future version. + + // Determine the minimum number of updates, could be more if there + // are multiple updates per node + minUpdates := len(result.NodeUpdate) + minUpdates += len(result.NodeAllocation) + + // Initialize using the older log entry format for Alloc and NodePreemptions + req.Alloc = make([]*structs.Allocation, 0, minUpdates) + req.NodePreemptions = make([]*structs.Allocation, 0, len(result.NodePreemptions)) + + for _, updateList := range result.NodeUpdate { + req.Alloc = append(req.Alloc, updateList...) + } + for _, allocList := range result.NodeAllocation { + req.Alloc = append(req.Alloc, allocList...) + } + + for _, preemptions := range result.NodePreemptions { + req.NodePreemptions = append(req.NodePreemptions, preemptions...) + } + + // Set the time the alloc was applied for the first time. This can be used + // to approximate the scheduling time. + updateAllocTimestamps(req.Alloc, now) + + // Set modify time for preempted allocs if any + // Also gather jobids to create follow up evals + for _, alloc := range req.NodePreemptions { + alloc.ModifyTime = now + appendNamespacedJobID(preemptedJobIDs, alloc) } } @@ -232,6 +262,50 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap return future, nil } +// normalizePreemptedAlloc removes redundant fields from a preempted allocation and +// returns AllocationDiff. Since a preempted allocation is always an existing allocation, +// the struct returned by this method contains only the differential, which can be +// applied to an existing allocation, to yield the updated struct +func normalizePreemptedAlloc(preemptedAlloc *structs.Allocation, now int64) *structs.AllocationDiff { + return &structs.AllocationDiff{ + ID: preemptedAlloc.ID, + PreemptedByAllocation: preemptedAlloc.PreemptedByAllocation, + ModifyTime: now, + } +} + +// normalizeStoppedAlloc removes redundant fields from a stopped allocation and +// returns AllocationDiff. Since a stopped allocation is always an existing allocation, +// the struct returned by this method contains only the differential, which can be +// applied to an existing allocation, to yield the updated struct +func normalizeStoppedAlloc(stoppedAlloc *structs.Allocation, now int64) *structs.AllocationDiff { + return &structs.AllocationDiff{ + ID: stoppedAlloc.ID, + DesiredDescription: stoppedAlloc.DesiredDescription, + ClientStatus: stoppedAlloc.ClientStatus, + ModifyTime: now, + } +} + +// appendNamespacedJobID appends the namespaced Job ID for the alloc to the jobIDs set +func appendNamespacedJobID(jobIDs map[structs.NamespacedID]struct{}, alloc *structs.Allocation) { + id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID} + if _, ok := jobIDs[id]; !ok { + jobIDs[id] = struct{}{} + } +} + +// updateAllocTimestamps sets the CreateTime and ModifyTime for the allocations +// to the timestamp provided +func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) { + for _, alloc := range allocations { + if alloc.CreateTime == 0 { + alloc.CreateTime = timestamp + } + alloc.ModifyTime = timestamp + } +} + // asyncPlanWait is used to apply and respond to a plan async func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture, result *structs.PlanResult, pending *pendingPlan) { @@ -264,6 +338,17 @@ func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture, func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.Plan, logger log.Logger) (*structs.PlanResult, error) { defer metrics.MeasureSince([]string{"nomad", "plan", "evaluate"}, time.Now()) + // Denormalize without the job + err := snap.DenormalizeAllocationsMap(plan.NodeUpdate, nil) + if err != nil { + return nil, err + } + // Denormalize without the job + err = snap.DenormalizeAllocationsMap(plan.NodePreemptions, nil) + if err != nil { + return nil, err + } + // Check if the plan exceeds quota overQuota, err := evaluatePlanQuota(snap, plan) if err != nil { @@ -521,15 +606,11 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri // Remove any preempted allocs if preempted := plan.NodePreemptions[nodeID]; len(preempted) > 0 { - for _, allocs := range preempted { - remove = append(remove, allocs) - } + remove = append(remove, preempted...) } if updated := plan.NodeAllocation[nodeID]; len(updated) > 0 { - for _, alloc := range updated { - remove = append(remove, alloc) - } + remove = append(remove, updated...) } proposed := structs.RemoveAllocs(existingAlloc, remove) proposed = append(proposed, plan.NodeAllocation[nodeID]...) diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 4dfa7a43b02..e623509842a 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -3,8 +3,9 @@ package nomad import ( "reflect" "testing" + "time" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -62,6 +63,7 @@ func testRegisterJob(t *testing.T, s *Server, j *structs.Job) { } } +// COMPAT 0.11: Tests the older unoptimized code path for applyPlan func TestPlanApply_applyPlan(t *testing.T) { t.Parallel() s1 := TestServer(t, nil) @@ -228,6 +230,158 @@ func TestPlanApply_applyPlan(t *testing.T) { assert.Equal(index, evalOut.ModifyIndex) } +// Verifies that applyPlan properly updates the constituent objects in MemDB, +// when the plan contains normalized allocs. +func TestPlanApply_applyPlanWithNormalizedAllocs(t *testing.T) { + t.Parallel() + s1 := TestServer(t, func(c *Config) { + c.Build = "0.9.2" + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Register node + node := mock.Node() + testRegisterNode(t, s1, node) + + // Register a fake deployment + oldDeployment := mock.Deployment() + if err := s1.State().UpsertDeployment(900, oldDeployment); err != nil { + t.Fatalf("UpsertDeployment failed: %v", err) + } + + // Create a deployment + dnew := mock.Deployment() + + // Create a deployment update for the old deployment id + desiredStatus, desiredStatusDescription := "foo", "bar" + updates := []*structs.DeploymentStatusUpdate{ + { + DeploymentID: oldDeployment.ID, + Status: desiredStatus, + StatusDescription: desiredStatusDescription, + }, + } + + // Register allocs, deployment and deployment update + alloc := mock.Alloc() + stoppedAlloc := mock.Alloc() + stoppedAllocDiff := &structs.Allocation{ + ID: stoppedAlloc.ID, + DesiredDescription: "Desired Description", + ClientStatus: structs.AllocClientStatusLost, + } + preemptedAlloc := mock.Alloc() + preemptedAllocDiff := &structs.Allocation{ + ID: preemptedAlloc.ID, + PreemptedByAllocation: alloc.ID, + } + s1.State().UpsertJobSummary(1000, mock.JobSummary(alloc.JobID)) + s1.State().UpsertAllocs(1100, []*structs.Allocation{stoppedAlloc, preemptedAlloc}) + // Create an eval + eval := mock.Eval() + eval.JobID = alloc.JobID + if err := s1.State().UpsertEvals(1, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + + timestampBeforeCommit := time.Now().UTC().UnixNano() + planRes := &structs.PlanResult{ + NodeAllocation: map[string][]*structs.Allocation{ + node.ID: {alloc}, + }, + NodeUpdate: map[string][]*structs.Allocation{ + stoppedAlloc.NodeID: {stoppedAllocDiff}, + }, + NodePreemptions: map[string][]*structs.Allocation{ + preemptedAlloc.NodeID: {preemptedAllocDiff}, + }, + Deployment: dnew, + DeploymentUpdates: updates, + } + + // Snapshot the state + snap, err := s1.State().Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create the plan with a deployment + plan := &structs.Plan{ + Job: alloc.Job, + Deployment: dnew, + DeploymentUpdates: updates, + EvalID: eval.ID, + } + + require := require.New(t) + assert := assert.New(t) + + // Apply the plan + future, err := s1.applyPlan(plan, planRes, snap) + require.NoError(err) + + // Verify our optimistic snapshot is updated + ws := memdb.NewWatchSet() + allocOut, err := snap.AllocByID(ws, alloc.ID) + require.NoError(err) + require.NotNil(allocOut) + + deploymentOut, err := snap.DeploymentByID(ws, plan.Deployment.ID) + require.NoError(err) + require.NotNil(deploymentOut) + + // Check plan does apply cleanly + index, err := planWaitFuture(future) + require.NoError(err) + assert.NotEqual(0, index) + + // Lookup the allocation + fsmState := s1.fsm.State() + allocOut, err = fsmState.AllocByID(ws, alloc.ID) + require.NoError(err) + require.NotNil(allocOut) + assert.True(allocOut.CreateTime > 0) + assert.True(allocOut.ModifyTime > 0) + assert.Equal(allocOut.CreateTime, allocOut.ModifyTime) + + // Verify stopped alloc diff applied cleanly + updatedStoppedAlloc, err := fsmState.AllocByID(ws, stoppedAlloc.ID) + require.NoError(err) + require.NotNil(updatedStoppedAlloc) + assert.True(updatedStoppedAlloc.ModifyTime > timestampBeforeCommit) + assert.Equal(updatedStoppedAlloc.DesiredDescription, stoppedAllocDiff.DesiredDescription) + assert.Equal(updatedStoppedAlloc.ClientStatus, stoppedAllocDiff.ClientStatus) + assert.Equal(updatedStoppedAlloc.DesiredStatus, structs.AllocDesiredStatusStop) + + // Verify preempted alloc diff applied cleanly + updatedPreemptedAlloc, err := fsmState.AllocByID(ws, preemptedAlloc.ID) + require.NoError(err) + require.NotNil(updatedPreemptedAlloc) + assert.True(updatedPreemptedAlloc.ModifyTime > timestampBeforeCommit) + assert.Equal(updatedPreemptedAlloc.DesiredDescription, + "Preempted by alloc ID "+preemptedAllocDiff.PreemptedByAllocation) + assert.Equal(updatedPreemptedAlloc.DesiredStatus, structs.AllocDesiredStatusEvict) + + // Lookup the new deployment + dout, err := fsmState.DeploymentByID(ws, plan.Deployment.ID) + require.NoError(err) + require.NotNil(dout) + + // Lookup the updated deployment + dout2, err := fsmState.DeploymentByID(ws, oldDeployment.ID) + require.NoError(err) + require.NotNil(dout2) + assert.Equal(desiredStatus, dout2.Status) + assert.Equal(desiredStatusDescription, dout2.StatusDescription) + + // Lookup updated eval + evalOut, err := fsmState.EvalByID(ws, eval.ID) + require.NoError(err) + require.NotNil(evalOut) + assert.Equal(index, evalOut.ModifyIndex) +} + func TestPlanApply_EvalPlan_Simple(t *testing.T) { t.Parallel() state := testStateStore(t) diff --git a/nomad/plan_normalization_test.go b/nomad/plan_normalization_test.go new file mode 100644 index 00000000000..0cd9a4d6eb1 --- /dev/null +++ b/nomad/plan_normalization_test.go @@ -0,0 +1,66 @@ +package nomad + +import ( + "bytes" + "testing" + "time" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/assert" + "github.com/ugorji/go/codec" +) + +// This test compares the size of the normalized + OmitEmpty raft plan log entry +// with the earlier denormalized log. +// +// Whenever this test is changed, care should be taken to ensure the older msgpack size +// is recalculated when new fields are introduced in ApplyPlanResultsRequest +func TestPlanNormalize(t *testing.T) { + // This size was calculated using the older ApplyPlanResultsRequest format, in which allocations + // didn't use OmitEmpty and only the job was normalized in the stopped and preempted allocs. + // The newer format uses OmitEmpty and uses a minimal set of fields for the diff of the + // stopped and preempted allocs. The file for the older format hasn't been checked in, because + // it's not a good idea to check-in a 20mb file to the git repo. + unoptimizedLogSize := 19460168 + + numUpdatedAllocs := 10000 + numStoppedAllocs := 8000 + numPreemptedAllocs := 2000 + mockAlloc := mock.Alloc() + mockAlloc.Job = nil + + mockUpdatedAllocSlice := make([]*structs.Allocation, numUpdatedAllocs) + for i := 0; i < numUpdatedAllocs; i++ { + mockUpdatedAllocSlice = append(mockUpdatedAllocSlice, mockAlloc) + } + + now := time.Now().UTC().UnixNano() + mockStoppedAllocSlice := make([]*structs.AllocationDiff, numStoppedAllocs) + for i := 0; i < numStoppedAllocs; i++ { + mockStoppedAllocSlice = append(mockStoppedAllocSlice, normalizeStoppedAlloc(mockAlloc, now)) + } + + mockPreemptionAllocSlice := make([]*structs.AllocationDiff, numPreemptedAllocs) + for i := 0; i < numPreemptedAllocs; i++ { + mockPreemptionAllocSlice = append(mockPreemptionAllocSlice, normalizePreemptedAlloc(mockAlloc, now)) + } + + // Create a plan result + applyPlanLogEntry := structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + AllocsUpdated: mockUpdatedAllocSlice, + AllocsStopped: mockStoppedAllocSlice, + }, + AllocsPreempted: mockPreemptionAllocSlice, + } + + handle := structs.MsgpackHandle + var buf bytes.Buffer + if err := codec.NewEncoder(&buf, handle).Encode(applyPlanLogEntry); err != nil { + t.Fatalf("Encoding failed: %v", err) + } + + optimizedLogSize := buf.Len() + assert.True(t, float64(optimizedLogSize)/float64(unoptimizedLogSize) < 0.62) +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 84201cbaaf0..276c6920d29 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -170,6 +170,27 @@ RUN_QUERY: // UpsertPlanResults is used to upsert the results of a plan. func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanResultsRequest) error { + snapshot, err := s.Snapshot() + if err != nil { + return err + } + + allocsStopped, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsStopped, results.Job) + if err != nil { + return err + } + + allocsPreempted, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsPreempted, results.Job) + if err != nil { + return err + } + + // COMPAT 0.11: Remove this denormalization when NodePreemptions is removed + results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions, results.Job) + if err != nil { + return err + } + txn := s.db.Txn(true) defer txn.Abort() @@ -185,34 +206,6 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR s.upsertDeploymentUpdates(index, results.DeploymentUpdates, txn) } - // Attach the job to all the allocations. It is pulled out in the payload to - // avoid the redundancy of encoding, but should be denormalized prior to - // being inserted into MemDB. - structs.DenormalizeAllocationJobs(results.Job, results.Alloc) - - // COMPAT(0.11): Remove in 0.11 - // Calculate the total resources of allocations. It is pulled out in the - // payload to avoid encoding something that can be computed, but should be - // denormalized prior to being inserted into MemDB. - for _, alloc := range results.Alloc { - if alloc.Resources != nil { - continue - } - - alloc.Resources = new(structs.Resources) - for _, task := range alloc.TaskResources { - alloc.Resources.Add(task) - } - - // Add the shared resources - alloc.Resources.Add(alloc.SharedResources) - } - - // Upsert the allocations - if err := s.upsertAllocsImpl(index, results.Alloc, txn); err != nil { - return err - } - // COMPAT: Nomad versions before 0.7.1 did not include the eval ID when // applying the plan. Thus while we are upgrading, we ignore updating the // modify index of evaluations from older plans. @@ -223,35 +216,33 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR } } - // Prepare preempted allocs in the plan results for update - var preemptedAllocs []*structs.Allocation - for _, preemptedAlloc := range results.NodePreemptions { - // Look for existing alloc - existing, err := txn.First("allocs", "id", preemptedAlloc.ID) - if err != nil { - return fmt.Errorf("alloc lookup failed: %v", err) - } - - // Nothing to do if this does not exist - if existing == nil { - continue - } - exist := existing.(*structs.Allocation) + numAllocs := 0 + if len(results.Alloc) > 0 || len(results.NodePreemptions) > 0 { + // COMPAT 0.11: This branch will be removed, when Alloc is removed + // Attach the job to all the allocations. It is pulled out in the payload to + // avoid the redundancy of encoding, but should be denormalized prior to + // being inserted into MemDB. + addComputedAllocAttrs(results.Alloc, results.Job) + numAllocs = len(results.Alloc) + len(results.NodePreemptions) + } else { + // Attach the job to all the allocations. It is pulled out in the payload to + // avoid the redundancy of encoding, but should be denormalized prior to + // being inserted into MemDB. + addComputedAllocAttrs(results.AllocsUpdated, results.Job) + numAllocs = len(allocsStopped) + len(results.AllocsUpdated) + len(allocsPreempted) + } - // Copy everything from the existing allocation - copyAlloc := exist.Copy() + allocsToUpsert := make([]*structs.Allocation, 0, numAllocs) - // Only update the fields set by the scheduler - copyAlloc.DesiredStatus = preemptedAlloc.DesiredStatus - copyAlloc.PreemptedByAllocation = preemptedAlloc.PreemptedByAllocation - copyAlloc.DesiredDescription = preemptedAlloc.DesiredDescription - copyAlloc.ModifyTime = preemptedAlloc.ModifyTime - preemptedAllocs = append(preemptedAllocs, copyAlloc) + // COMPAT 0.11: Both these appends should be removed when Alloc and NodePreemptions are removed + allocsToUpsert = append(allocsToUpsert, results.Alloc...) + allocsToUpsert = append(allocsToUpsert, results.NodePreemptions...) - } + allocsToUpsert = append(allocsToUpsert, allocsStopped...) + allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...) + allocsToUpsert = append(allocsToUpsert, allocsPreempted...) - // Upsert the preempted allocations - if err := s.upsertAllocsImpl(index, preemptedAllocs, txn); err != nil { + if err := s.upsertAllocsImpl(index, allocsToUpsert, txn); err != nil { return err } @@ -266,6 +257,30 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR return nil } +// addComputedAllocAttrs adds the computed/derived attributes to the allocation. +// This method is used when an allocation is being denormalized. +func addComputedAllocAttrs(allocs []*structs.Allocation, job *structs.Job) { + structs.DenormalizeAllocationJobs(job, allocs) + + // COMPAT(0.11): Remove in 0.11 + // Calculate the total resources of allocations. It is pulled out in the + // payload to avoid encoding something that can be computed, but should be + // denormalized prior to being inserted into MemDB. + for _, alloc := range allocs { + if alloc.Resources != nil { + continue + } + + alloc.Resources = new(structs.Resources) + for _, task := range alloc.TaskResources { + alloc.Resources.Add(task) + } + + // Add the shared resources + alloc.Resources.Add(alloc.SharedResources) + } +} + // upsertDeploymentUpdates updates the deployments given the passed status // updates. func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *memdb.Txn) error { @@ -4100,6 +4115,83 @@ type StateSnapshot struct { StateStore } +// DenormalizeAllocationsMap takes in a map of nodes to allocations, and queries the +// Allocation for each of the Allocation diffs and merges the updated attributes with +// the existing Allocation, and attaches the Job provided +func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation, job *structs.Job) error { + for nodeID, allocs := range nodeAllocations { + denormalizedAllocs, err := s.DenormalizeAllocationSlice(allocs, job) + if err != nil { + return err + } + + nodeAllocations[nodeID] = denormalizedAllocs + } + return nil +} + +// DenormalizeAllocationSlice queries the Allocation for each allocation diff +// represented as an Allocation and merges the updated attributes with the existing +// Allocation, and attaches the Job provided. +func (s *StateSnapshot) DenormalizeAllocationSlice(allocs []*structs.Allocation, job *structs.Job) ([]*structs.Allocation, error) { + allocDiffs := make([]*structs.AllocationDiff, len(allocs)) + for i, alloc := range allocs { + allocDiffs[i] = alloc.AllocationDiff() + } + + return s.DenormalizeAllocationDiffSlice(allocDiffs, job) +} + +// DenormalizeAllocationDiffSlice queries the Allocation for each AllocationDiff and merges +// the updated attributes with the existing Allocation, and attaches the Job provided +func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.AllocationDiff, job *structs.Job) ([]*structs.Allocation, error) { + // Output index for denormalized Allocations + j := 0 + + denormalizedAllocs := make([]*structs.Allocation, len(allocDiffs)) + for _, allocDiff := range allocDiffs { + alloc, err := s.AllocByID(nil, allocDiff.ID) + if err != nil { + return nil, fmt.Errorf("alloc lookup failed: %v", err) + } + if alloc == nil { + return nil, fmt.Errorf("alloc %v doesn't exist", allocDiff.ID) + } + + // Merge the updates to the Allocation + allocCopy := alloc.CopySkipJob() + allocCopy.Job = job + + if allocDiff.PreemptedByAllocation != "" { + // If alloc is a preemption + allocCopy.PreemptedByAllocation = allocDiff.PreemptedByAllocation + allocCopy.DesiredDescription = getPreemptedAllocDesiredDescription(allocDiff.PreemptedByAllocation) + allocCopy.DesiredStatus = structs.AllocDesiredStatusEvict + } else { + // If alloc is a stopped alloc + allocCopy.DesiredDescription = allocDiff.DesiredDescription + allocCopy.DesiredStatus = structs.AllocDesiredStatusStop + if allocDiff.ClientStatus != "" { + allocCopy.ClientStatus = allocDiff.ClientStatus + } + } + if allocDiff.ModifyTime != 0 { + allocCopy.ModifyTime = allocDiff.ModifyTime + } + + // Update the allocDiff in the slice to equal the denormalized alloc + denormalizedAllocs[j] = allocCopy + j++ + } + // Retain only the denormalized Allocations in the slice + denormalizedAllocs = denormalizedAllocs[:j] + return denormalizedAllocs, nil +} + +func getPreemptedAllocDesiredDescription(PreemptedByAllocID string) string { + return fmt.Sprintf("Preempted by alloc ID %v", PreemptedByAllocID) +} + // StateRestore is used to optimize the performance when // restoring state by only using a single large transaction // instead of thousands of sub transactions diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 3cf3b977606..013f9cab932 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -88,6 +88,7 @@ func TestStateStore_Blocking_MinQuery(t *testing.T) { } } +// COMPAT 0.11: Uses AllocUpdateRequest.Alloc // This test checks that: // 1) The job is denormalized // 2) Allocations are created @@ -140,6 +141,86 @@ func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing assert.EqualValues(1000, evalOut.ModifyIndex) } +// This test checks that: +// 1) The job is denormalized +// 2) Allocations are denormalized and updated with the diff +func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { + state := testStateStore(t) + alloc := mock.Alloc() + job := alloc.Job + alloc.Job = nil + + stoppedAlloc := mock.Alloc() + stoppedAlloc.Job = job + stoppedAllocDiff := &structs.AllocationDiff{ + ID: stoppedAlloc.ID, + DesiredDescription: "desired desc", + ClientStatus: structs.AllocClientStatusLost, + } + preemptedAlloc := mock.Alloc() + preemptedAlloc.Job = job + preemptedAllocDiff := &structs.AllocationDiff{ + ID: preemptedAlloc.ID, + PreemptedByAllocation: alloc.ID, + } + + require := require.New(t) + require.NoError(state.UpsertAllocs(900, []*structs.Allocation{stoppedAlloc, preemptedAlloc})) + require.NoError(state.UpsertJob(999, job)) + + eval := mock.Eval() + eval.JobID = job.ID + + // Create an eval + require.NoError(state.UpsertEvals(1, []*structs.Evaluation{eval})) + + // Create a plan result + res := structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + AllocsUpdated: []*structs.Allocation{alloc}, + AllocsStopped: []*structs.AllocationDiff{stoppedAllocDiff}, + Job: job, + }, + EvalID: eval.ID, + AllocsPreempted: []*structs.AllocationDiff{preemptedAllocDiff}, + } + assert := assert.New(t) + planModifyIndex := uint64(1000) + err := state.UpsertPlanResults(planModifyIndex, &res) + require.NoError(err) + + ws := memdb.NewWatchSet() + out, err := state.AllocByID(ws, alloc.ID) + require.NoError(err) + assert.Equal(alloc, out) + + updatedStoppedAlloc, err := state.AllocByID(ws, stoppedAlloc.ID) + require.NoError(err) + assert.Equal(stoppedAllocDiff.DesiredDescription, updatedStoppedAlloc.DesiredDescription) + assert.Equal(structs.AllocDesiredStatusStop, updatedStoppedAlloc.DesiredStatus) + assert.Equal(stoppedAllocDiff.ClientStatus, updatedStoppedAlloc.ClientStatus) + assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex) + assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex) + + updatedPreemptedAlloc, err := state.AllocByID(ws, preemptedAlloc.ID) + require.NoError(err) + assert.Equal(structs.AllocDesiredStatusEvict, updatedPreemptedAlloc.DesiredStatus) + assert.Equal(preemptedAllocDiff.PreemptedByAllocation, updatedPreemptedAlloc.PreemptedByAllocation) + assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex) + assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex) + + index, err := state.Index("allocs") + require.NoError(err) + assert.EqualValues(planModifyIndex, index) + + require.False(watchFired(ws)) + + evalOut, err := state.EvalByID(ws, eval.ID) + require.NoError(err) + require.NotNil(evalOut) + assert.EqualValues(planModifyIndex, evalOut.ModifyIndex) +} + // This test checks that the deployment is created and allocations count towards // the deployment func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { @@ -271,11 +352,9 @@ func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) { require.NoError(err) minimalPreemptedAlloc := &structs.Allocation{ - ID: preemptedAlloc.ID, - Namespace: preemptedAlloc.Namespace, - DesiredStatus: structs.AllocDesiredStatusEvict, - ModifyTime: time.Now().Unix(), - DesiredDescription: fmt.Sprintf("Preempted by allocation %v", alloc.ID), + ID: preemptedAlloc.ID, + PreemptedByAllocation: alloc.ID, + ModifyTime: time.Now().Unix(), } // Create eval for preempted job @@ -316,7 +395,7 @@ func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) { preempted, err := state.AllocByID(ws, preemptedAlloc.ID) require.NoError(err) require.Equal(preempted.DesiredStatus, structs.AllocDesiredStatusEvict) - require.Equal(preempted.DesiredDescription, fmt.Sprintf("Preempted by allocation %v", alloc.ID)) + require.Equal(preempted.DesiredDescription, fmt.Sprintf("Preempted by alloc ID %v", alloc.ID)) // Verify eval for preempted job preemptedJobEval, err := state.EvalByID(ws, eval2.ID) @@ -6975,6 +7054,31 @@ func TestStateStore_Abandon(t *testing.T) { } } +// Verifies that an error is returned when an allocation doesn't exist in the state store. +func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testing.T) { + state := testStateStore(t) + alloc := mock.Alloc() + require := require.New(t) + + // Insert job + err := state.UpsertJob(999, alloc.Job) + require.NoError(err) + + allocDiffs := []*structs.AllocationDiff{ + { + ID: alloc.ID, + }, + } + + snap, err := state.Snapshot() + require.NoError(err) + + denormalizedAllocs, err := snap.DenormalizeAllocationDiffSlice(allocDiffs, alloc.Job) + + require.EqualError(err, fmt.Sprintf("alloc %v doesn't exist", alloc.ID)) + require.Nil(denormalizedAllocs) +} + // watchFired is a helper for unit tests that returns if the given watch set // fired (it doesn't care which watch actually fired). This uses a fixed // timeout since we already expect the event happened before calling this and diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 48d7b57d9af..8d2ae8a7f2c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -28,8 +28,8 @@ import ( "github.com/gorhill/cronexpr" "github.com/hashicorp/consul/api" hcodec "github.com/hashicorp/go-msgpack/codec" - multierror "github.com/hashicorp/go-multierror" - version "github.com/hashicorp/go-version" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/args" @@ -660,10 +660,16 @@ type ApplyPlanResultsRequest struct { // the evaluation itself being updated. EvalID string + // COMPAT 0.11 // NodePreemptions is a slice of allocations from other lower priority jobs // that are preempted. Preempted allocations are marked as evicted. + // Deprecated: Replaced with AllocsPreempted which contains only the diff NodePreemptions []*Allocation + // AllocsPreempted is a slice of allocation diffs from other lower priority jobs + // that are preempted. Preempted allocations are marked as evicted. + AllocsPreempted []*AllocationDiff + // PreemptionEvals is a slice of follow up evals for jobs whose allocations // have been preempted to place allocs in this plan PreemptionEvals []*Evaluation @@ -673,9 +679,18 @@ type ApplyPlanResultsRequest struct { // to cause evictions or to assign new allocations. Both can be done // within a single transaction type AllocUpdateRequest struct { + // COMPAT 0.11 // Alloc is the list of new allocations to assign + // Deprecated: Replaced with two separate slices, one containing stopped allocations + // and another containing updated allocations Alloc []*Allocation + // Allocations to stop. Contains only the diff, not the entire allocation + AllocsStopped []*AllocationDiff + + // New or updated allocations + AllocsUpdated []*Allocation + // Evals is the list of new evaluations to create // Evals are valid only when used in the Raft RPC Evals []*Evaluation @@ -7168,6 +7183,9 @@ const ( // Allocation is used to allocate the placement of a task group to a node. type Allocation struct { + // msgpack omit empty fields during serialization + _struct bool `codec:",omitempty"` // nolint: structcheck + // ID of the allocation (UUID) ID string @@ -7296,11 +7314,12 @@ func (a *Allocation) Index() uint { return uint(num) } +// Copy provides a copy of the allocation and deep copies the job func (a *Allocation) Copy() *Allocation { return a.copyImpl(true) } -// Copy provides a copy of the allocation but doesn't deep copy the job +// CopySkipJob provides a copy of the allocation but doesn't deep copy the job func (a *Allocation) CopySkipJob() *Allocation { return a.copyImpl(false) } @@ -7670,6 +7689,19 @@ func (a *Allocation) Stub() *AllocListStub { } } +// AllocationDiff converts an Allocation type to an AllocationDiff type +// If at any time, modification are made to AllocationDiff so that an +// Allocation can no longer be safely converted to AllocationDiff, +// this method should be changed accordingly. +func (a *Allocation) AllocationDiff() *AllocationDiff { + return (*AllocationDiff)(a) +} + +// AllocationDiff is another named type for Allocation (to use the same fields), +// which is used to represent the delta for an Allocation. If you need a method +// defined on the al +type AllocationDiff Allocation + // AllocListStub is used to return a subset of alloc information type AllocListStub struct { ID string @@ -8037,6 +8069,9 @@ const ( // potentially taking action (allocation of work) or doing nothing if the state // of the world does not require it. type Evaluation struct { + // msgpack omit empty fields during serialization + _struct bool `codec:",omitempty"` // nolint: structcheck + // ID is a randomly generated UUID used for this evaluation. This // is assigned upon the creation of the evaluation. ID string @@ -8304,6 +8339,9 @@ func (e *Evaluation) CreateFailedFollowUpEval(wait time.Duration) *Evaluation { // are submitted to the leader which verifies that resources have // not been overcommitted before admitting the plan. type Plan struct { + // msgpack omit empty fields during serialization + _struct bool `codec:",omitempty"` // nolint: structcheck + // EvalID is the evaluation ID this plan is associated with EvalID string @@ -8355,9 +8393,9 @@ type Plan struct { NodePreemptions map[string][]*Allocation } -// AppendUpdate marks the allocation for eviction. The clientStatus of the +// AppendStoppedAlloc marks an allocation to be stopped. The clientStatus of the // allocation may be optionally set by passing in a non-empty value. -func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clientStatus string) { +func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus string) { newAlloc := new(Allocation) *newAlloc = *alloc @@ -8373,7 +8411,7 @@ func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clien // Strip the resources as it can be rebuilt. newAlloc.Resources = nil - newAlloc.DesiredStatus = desiredStatus + newAlloc.DesiredStatus = AllocDesiredStatusStop newAlloc.DesiredDescription = desiredDesc if clientStatus != "" { @@ -8387,12 +8425,12 @@ func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clien // AppendPreemptedAlloc is used to append an allocation that's being preempted to the plan. // To minimize the size of the plan, this only sets a minimal set of fields in the allocation -func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, desiredStatus, preemptingAllocID string) { +func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, preemptingAllocID string) { newAlloc := &Allocation{} newAlloc.ID = alloc.ID newAlloc.JobID = alloc.JobID newAlloc.Namespace = alloc.Namespace - newAlloc.DesiredStatus = desiredStatus + newAlloc.DesiredStatus = AllocDesiredStatusEvict newAlloc.PreemptedByAllocation = preemptingAllocID desiredDesc := fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocID) @@ -8445,6 +8483,29 @@ func (p *Plan) IsNoOp() bool { len(p.DeploymentUpdates) == 0 } +// NormalizeAllocations normalizes allocations to remove fields that can +// be fetched from the MemDB instead of sending over the wire +func (p *Plan) NormalizeAllocations() { + for _, allocs := range p.NodeUpdate { + for i, alloc := range allocs { + allocs[i] = &Allocation{ + ID: alloc.ID, + DesiredDescription: alloc.DesiredDescription, + ClientStatus: alloc.ClientStatus, + } + } + } + + for _, allocs := range p.NodePreemptions { + for i, alloc := range allocs { + allocs[i] = &Allocation{ + ID: alloc.ID, + PreemptedByAllocation: alloc.PreemptedByAllocation, + } + } + } +} + // PlanResult is the result of a plan submitted to the leader. type PlanResult struct { // NodeUpdate contains all the updates that were committed. diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 90b33b19827..e574f51dc58 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -9,7 +9,7 @@ import ( "time" "github.com/hashicorp/consul/api" - multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper/uuid" "github.com/kr/pretty" "github.com/stretchr/testify/assert" @@ -2842,6 +2842,100 @@ func TestTaskArtifact_Validate_Checksum(t *testing.T) { } } +func TestPlan_NormalizeAllocations(t *testing.T) { + t.Parallel() + plan := &Plan{ + NodeUpdate: make(map[string][]*Allocation), + NodePreemptions: make(map[string][]*Allocation), + } + stoppedAlloc := MockAlloc() + desiredDesc := "Desired desc" + plan.AppendStoppedAlloc(stoppedAlloc, desiredDesc, AllocClientStatusLost) + preemptedAlloc := MockAlloc() + preemptingAllocID := uuid.Generate() + plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID) + + plan.NormalizeAllocations() + + actualStoppedAlloc := plan.NodeUpdate[stoppedAlloc.NodeID][0] + expectedStoppedAlloc := &Allocation{ + ID: stoppedAlloc.ID, + DesiredDescription: desiredDesc, + ClientStatus: AllocClientStatusLost, + } + assert.Equal(t, expectedStoppedAlloc, actualStoppedAlloc) + actualPreemptedAlloc := plan.NodePreemptions[preemptedAlloc.NodeID][0] + expectedPreemptedAlloc := &Allocation{ + ID: preemptedAlloc.ID, + PreemptedByAllocation: preemptingAllocID, + } + assert.Equal(t, expectedPreemptedAlloc, actualPreemptedAlloc) +} + +func TestPlan_AppendStoppedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) { + t.Parallel() + plan := &Plan{ + NodeUpdate: make(map[string][]*Allocation), + } + alloc := MockAlloc() + desiredDesc := "Desired desc" + + plan.AppendStoppedAlloc(alloc, desiredDesc, AllocClientStatusLost) + + appendedAlloc := plan.NodeUpdate[alloc.NodeID][0] + expectedAlloc := new(Allocation) + *expectedAlloc = *alloc + expectedAlloc.DesiredDescription = desiredDesc + expectedAlloc.DesiredStatus = AllocDesiredStatusStop + expectedAlloc.ClientStatus = AllocClientStatusLost + expectedAlloc.Job = nil + assert.Equal(t, expectedAlloc, appendedAlloc) + assert.Equal(t, alloc.Job, plan.Job) +} + +func TestPlan_AppendPreemptedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) { + t.Parallel() + plan := &Plan{ + NodePreemptions: make(map[string][]*Allocation), + } + alloc := MockAlloc() + preemptingAllocID := uuid.Generate() + + plan.AppendPreemptedAlloc(alloc, preemptingAllocID) + + appendedAlloc := plan.NodePreemptions[alloc.NodeID][0] + expectedAlloc := &Allocation{ + ID: alloc.ID, + PreemptedByAllocation: preemptingAllocID, + JobID: alloc.JobID, + Namespace: alloc.Namespace, + DesiredStatus: AllocDesiredStatusEvict, + DesiredDescription: fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocID), + AllocatedResources: alloc.AllocatedResources, + TaskResources: alloc.TaskResources, + SharedResources: alloc.SharedResources, + } + assert.Equal(t, expectedAlloc, appendedAlloc) +} + +func TestAllocation_MsgPackTags(t *testing.T) { + t.Parallel() + planType := reflect.TypeOf(Allocation{}) + + msgPackTags, _ := planType.FieldByName("_struct") + + assert.Equal(t, msgPackTags.Tag, reflect.StructTag(`codec:",omitempty"`)) +} + +func TestEvaluation_MsgPackTags(t *testing.T) { + t.Parallel() + planType := reflect.TypeOf(Evaluation{}) + + msgPackTags, _ := planType.FieldByName("_struct") + + assert.Equal(t, msgPackTags.Tag, reflect.StructTag(`codec:",omitempty"`)) +} + func TestAllocation_Terminated(t *testing.T) { type desiredState struct { ClientStatus string diff --git a/nomad/util.go b/nomad/util.go index 44b7119242b..ccd4504af05 100644 --- a/nomad/util.go +++ b/nomad/util.go @@ -14,6 +14,11 @@ import ( "github.com/hashicorp/serf/serf" ) +// MinVersionPlanNormalization is the minimum version to support the +// normalization of Plan in SubmitPlan, and the denormalization raft log entry committed +// in ApplyPlanResultsRequest +var MinVersionPlanNormalization = version.Must(version.NewVersion("0.9.2")) + // ensurePath is used to make sure a path exists func ensurePath(path string, dir bool) error { if !dir { @@ -143,11 +148,12 @@ func isNomadServer(m serf.Member) (bool, *serverParts) { return true, parts } -// ServersMeetMinimumVersion returns whether the given alive servers are at least on the -// given Nomad version -func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version) bool { +// ServersMeetMinimumVersion returns whether the Nomad servers are at least on the +// given Nomad version. The checkFailedServers parameter specifies whether version +// for the failed servers should be verified. +func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version, checkFailedServers bool) bool { for _, member := range members { - if valid, parts := isNomadServer(member); valid && parts.Status == serf.StatusAlive { + if valid, parts := isNomadServer(member); valid && (parts.Status == serf.StatusAlive || (checkFailedServers && parts.Status == serf.StatusFailed)) { // Check if the versions match - version.LessThan will return true for // 0.8.0-rc1 < 0.8.0, so we want to ignore the metadata versionsMatch := slicesMatch(minVersion.Segments(), parts.Build.Segments()) diff --git a/nomad/util_test.go b/nomad/util_test.go index 4fe670cdc71..f216d5e4243 100644 --- a/nomad/util_test.go +++ b/nomad/util_test.go @@ -86,23 +86,8 @@ func TestIsNomadServer(t *testing.T) { } } -func TestServersMeetMinimumVersion(t *testing.T) { +func TestServersMeetMinimumVersionExcludingFailed(t *testing.T) { t.Parallel() - makeMember := func(version string) serf.Member { - return serf.Member{ - Name: "foo", - Addr: net.IP([]byte{127, 0, 0, 1}), - Tags: map[string]string{ - "role": "nomad", - "region": "aws", - "dc": "east-aws", - "port": "10000", - "build": version, - "vsn": "1", - }, - Status: serf.StatusAlive, - } - } cases := []struct { members []serf.Member @@ -112,7 +97,7 @@ func TestServersMeetMinimumVersion(t *testing.T) { // One server, meets reqs { members: []serf.Member{ - makeMember("0.7.5"), + makeMember("0.7.5", serf.StatusAlive), }, ver: version.Must(version.NewVersion("0.7.5")), expected: true, @@ -120,7 +105,7 @@ func TestServersMeetMinimumVersion(t *testing.T) { // One server in dev, meets reqs { members: []serf.Member{ - makeMember("0.8.5-dev"), + makeMember("0.8.5-dev", serf.StatusAlive), }, ver: version.Must(version.NewVersion("0.7.5")), expected: true, @@ -128,7 +113,7 @@ func TestServersMeetMinimumVersion(t *testing.T) { // One server with meta, meets reqs { members: []serf.Member{ - makeMember("0.7.5+ent"), + makeMember("0.7.5+ent", serf.StatusAlive), }, ver: version.Must(version.NewVersion("0.7.5")), expected: true, @@ -136,16 +121,17 @@ func TestServersMeetMinimumVersion(t *testing.T) { // One server, doesn't meet reqs { members: []serf.Member{ - makeMember("0.7.5"), + makeMember("0.7.5", serf.StatusAlive), }, ver: version.Must(version.NewVersion("0.8.0")), expected: false, }, - // Multiple servers, meets req version + // Multiple servers, meets req version, includes failed that doesn't meet req { members: []serf.Member{ - makeMember("0.7.5"), - makeMember("0.8.0"), + makeMember("0.7.5", serf.StatusAlive), + makeMember("0.8.0", serf.StatusAlive), + makeMember("0.7.0", serf.StatusFailed), }, ver: version.Must(version.NewVersion("0.7.5")), expected: true, @@ -153,8 +139,8 @@ func TestServersMeetMinimumVersion(t *testing.T) { // Multiple servers, doesn't meet req version { members: []serf.Member{ - makeMember("0.7.5"), - makeMember("0.8.0"), + makeMember("0.7.5", serf.StatusAlive), + makeMember("0.8.0", serf.StatusAlive), }, ver: version.Must(version.NewVersion("0.8.0")), expected: false, @@ -162,13 +148,67 @@ func TestServersMeetMinimumVersion(t *testing.T) { } for _, tc := range cases { - result := ServersMeetMinimumVersion(tc.members, tc.ver) + result := ServersMeetMinimumVersion(tc.members, tc.ver, false) if result != tc.expected { t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc) } } } +func TestServersMeetMinimumVersionIncludingFailed(t *testing.T) { + t.Parallel() + + cases := []struct { + members []serf.Member + ver *version.Version + expected bool + }{ + // Multiple servers, meets req version + { + members: []serf.Member{ + makeMember("0.7.5", serf.StatusAlive), + makeMember("0.8.0", serf.StatusAlive), + makeMember("0.7.5", serf.StatusFailed), + }, + ver: version.Must(version.NewVersion("0.7.5")), + expected: true, + }, + // Multiple servers, doesn't meet req version + { + members: []serf.Member{ + makeMember("0.7.5", serf.StatusAlive), + makeMember("0.8.0", serf.StatusAlive), + makeMember("0.7.0", serf.StatusFailed), + }, + ver: version.Must(version.NewVersion("0.7.5")), + expected: false, + }, + } + + for _, tc := range cases { + result := ServersMeetMinimumVersion(tc.members, tc.ver, true) + if result != tc.expected { + t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc) + } + } +} + +func makeMember(version string, status serf.MemberStatus) serf.Member { + return serf.Member{ + Name: "foo", + Addr: net.IP([]byte{127, 0, 0, 1}), + Tags: map[string]string{ + "role": "nomad", + "region": "aws", + "dc": "east-aws", + "port": "10000", + "build": version, + "vsn": "1", + }, + Status: status, + } +} + func TestShuffleStrings(t *testing.T) { t.Parallel() // Generate input diff --git a/nomad/worker.go b/nomad/worker.go index f0aefb62d54..a50c58e68dd 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -310,6 +310,12 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler. // Add the evaluation token to the plan plan.EvalToken = w.evalToken + // Normalize stopped and preempted allocs before RPC + normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), MinVersionPlanNormalization, true) + if normalizePlan { + plan.NormalizeAllocations() + } + // Setup the request req := structs.PlanRequest{ Plan: plan, diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 2f3e3172831..4bc8628aff6 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -8,13 +8,14 @@ import ( "time" log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" ) type NoopScheduler struct { @@ -390,6 +391,57 @@ func TestWorker_SubmitPlan(t *testing.T) { } } +func TestWorker_SubmitPlanNormalizedAllocations(t *testing.T) { + t.Parallel() + s1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.EnabledSchedulers = []string{structs.JobTypeService} + c.Build = "0.9.2" + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Register node + node := mock.Node() + testRegisterNode(t, s1, node) + + job := mock.Job() + eval1 := mock.Eval() + eval1.JobID = job.ID + s1.fsm.State().UpsertJob(0, job) + s1.fsm.State().UpsertEvals(0, []*structs.Evaluation{eval1}) + + stoppedAlloc := mock.Alloc() + preemptedAlloc := mock.Alloc() + s1.fsm.State().UpsertAllocs(5, []*structs.Allocation{stoppedAlloc, preemptedAlloc}) + + // Create an allocation plan + plan := &structs.Plan{ + Job: job, + EvalID: eval1.ID, + NodeUpdate: make(map[string][]*structs.Allocation), + NodePreemptions: make(map[string][]*structs.Allocation), + } + desiredDescription := "desired desc" + plan.AppendStoppedAlloc(stoppedAlloc, desiredDescription, structs.AllocClientStatusLost) + preemptingAllocID := uuid.Generate() + plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID) + + // Attempt to submit a plan + w := &Worker{srv: s1, logger: s1.logger} + w.SubmitPlan(plan) + + assert.Equal(t, &structs.Allocation{ + ID: preemptedAlloc.ID, + PreemptedByAllocation: preemptingAllocID, + }, plan.NodePreemptions[preemptedAlloc.NodeID][0]) + assert.Equal(t, &structs.Allocation{ + ID: stoppedAlloc.ID, + DesiredDescription: desiredDescription, + ClientStatus: structs.AllocClientStatusLost, + }, plan.NodeUpdate[stoppedAlloc.NodeID][0]) +} + func TestWorker_SubmitPlan_MissingNodeRefresh(t *testing.T) { t.Parallel() s1 := TestServer(t, func(c *Config) { diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 6c518f528f5..376a826ee81 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -5,8 +5,8 @@ import ( "time" log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" - multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" ) @@ -366,7 +366,7 @@ func (s *GenericScheduler) computeJobAllocs() error { // Handle the stop for _, stop := range results.stop { - s.plan.AppendUpdate(stop.alloc, structs.AllocDesiredStatusStop, stop.statusDescription, stop.clientStatus) + s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus) } // Handle the in-place updates @@ -464,7 +464,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc() prevAllocation := missing.PreviousAllocation() if stopPrevAlloc { - s.plan.AppendUpdate(prevAllocation, structs.AllocDesiredStatusStop, stopPrevAllocDesc, "") + s.plan.AppendStoppedAlloc(prevAllocation, stopPrevAllocDesc, "") } // Compute penalty nodes for rescheduled allocs diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index bcfd6e5be3d..5230ba0f71b 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -4,7 +4,7 @@ import ( "fmt" log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" ) @@ -210,18 +210,18 @@ func (s *SystemScheduler) computeJobAllocs() error { // Add all the allocs to stop for _, e := range diff.stop { - s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded, "") + s.plan.AppendStoppedAlloc(e.Alloc, allocNotNeeded, "") } // Add all the allocs to migrate for _, e := range diff.migrate { - s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNodeTainted, "") + s.plan.AppendStoppedAlloc(e.Alloc, allocNodeTainted, "") } // Lost allocations should be transitioned to desired status stop and client // status lost. for _, e := range diff.lost { - s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocLost, structs.AllocClientStatusLost) + s.plan.AppendStoppedAlloc(e.Alloc, allocLost, structs.AllocClientStatusLost) } // Attempt to do the upgrades in place @@ -351,7 +351,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { if option.PreemptedAllocs != nil { var preemptedAllocIDs []string for _, stop := range option.PreemptedAllocs { - s.plan.AppendPreemptedAlloc(stop, structs.AllocDesiredStatusEvict, alloc.ID) + s.plan.AppendPreemptedAlloc(stop, alloc.ID) preemptedAllocIDs = append(preemptedAllocIDs, stop.ID) if s.eval.AnnotatePlan && s.plan.Annotations != nil { diff --git a/scheduler/testing.go b/scheduler/testing.go index f0501010246..cb6059c5469 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -7,7 +7,7 @@ import ( testing "github.com/mitchellh/go-testing-interface" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -53,6 +53,8 @@ type Harness struct { nextIndex uint64 nextIndexLock sync.Mutex + + optimizePlan bool } // NewHarness is used to make a new testing harness @@ -101,42 +103,69 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er result.AllocIndex = index // Flatten evicts and allocs - var allocs []*structs.Allocation - for _, updateList := range plan.NodeUpdate { - allocs = append(allocs, updateList...) - } - for _, allocList := range plan.NodeAllocation { - allocs = append(allocs, allocList...) - } - - // Set the time the alloc was applied for the first time. This can be used - // to approximate the scheduling time. now := time.Now().UTC().UnixNano() - for _, alloc := range allocs { - if alloc.CreateTime == 0 { - alloc.CreateTime = now - } - } - // Set modify time for preempted allocs and flatten them - var preemptedAllocs []*structs.Allocation - for _, preemptions := range result.NodePreemptions { - for _, alloc := range preemptions { - alloc.ModifyTime = now - preemptedAllocs = append(preemptedAllocs, alloc) - } + allocsUpdated := make([]*structs.Allocation, 0, len(result.NodeAllocation)) + for _, allocList := range plan.NodeAllocation { + allocsUpdated = append(allocsUpdated, allocList...) } + updateCreateTimestamp(allocsUpdated, now) // Setup the update request req := structs.ApplyPlanResultsRequest{ AllocUpdateRequest: structs.AllocUpdateRequest{ - Job: plan.Job, - Alloc: allocs, + Job: plan.Job, }, Deployment: plan.Deployment, DeploymentUpdates: plan.DeploymentUpdates, EvalID: plan.EvalID, - NodePreemptions: preemptedAllocs, + } + + if h.optimizePlan { + stoppedAllocDiffs := make([]*structs.AllocationDiff, 0, len(result.NodeUpdate)) + for _, updateList := range plan.NodeUpdate { + for _, stoppedAlloc := range updateList { + stoppedAllocDiffs = append(stoppedAllocDiffs, stoppedAlloc.AllocationDiff()) + } + } + req.AllocsStopped = stoppedAllocDiffs + + req.AllocsUpdated = allocsUpdated + + preemptedAllocDiffs := make([]*structs.AllocationDiff, 0, len(result.NodePreemptions)) + for _, preemptions := range plan.NodePreemptions { + for _, preemptedAlloc := range preemptions { + allocDiff := preemptedAlloc.AllocationDiff() + allocDiff.ModifyTime = now + preemptedAllocDiffs = append(preemptedAllocDiffs, allocDiff) + } + } + req.AllocsPreempted = preemptedAllocDiffs + } else { + // COMPAT 0.11: Handles unoptimized log format + var allocs []*structs.Allocation + + allocsStopped := make([]*structs.Allocation, 0, len(result.NodeUpdate)) + for _, updateList := range plan.NodeUpdate { + allocsStopped = append(allocsStopped, updateList...) + } + allocs = append(allocs, allocsStopped...) + + allocs = append(allocs, allocsUpdated...) + updateCreateTimestamp(allocs, now) + + req.Alloc = allocs + + // Set modify time for preempted allocs and flatten them + var preemptedAllocs []*structs.Allocation + for _, preemptions := range result.NodePreemptions { + for _, alloc := range preemptions { + alloc.ModifyTime = now + preemptedAllocs = append(preemptedAllocs, alloc) + } + } + + req.NodePreemptions = preemptedAllocs } // Apply the full plan @@ -144,6 +173,22 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er return result, nil, err } +// OptimizePlan is a function used only for Harness to help set the optimzePlan field, +// since Harness doesn't have access to a Server object +func (h *Harness) OptimizePlan(optimize bool) { + h.optimizePlan = optimize +} + +func updateCreateTimestamp(allocations []*structs.Allocation, now int64) { + // Set the time the alloc was applied for the first time. This can be used + // to approximate the scheduling time. + for _, alloc := range allocations { + if alloc.CreateTime == 0 { + alloc.CreateTime = now + } + } +} + func (h *Harness) UpdateEval(eval *structs.Evaluation) error { // Ensure sequential plan application h.planLock.Lock() diff --git a/scheduler/util.go b/scheduler/util.go index 5f62d31b378..3b9b437de95 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -507,8 +507,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, // the current allocation is discounted when checking for feasibility. // Otherwise we would be trying to fit the tasks current resources and // updated resources. After select is called we can remove the evict. - ctx.Plan().AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop, - allocInPlace, "") + ctx.Plan().AppendStoppedAlloc(update.Alloc, allocInPlace, "") // Attempt to match the task group option := stack.Select(update.TaskGroup, nil) // This select only looks at one node so we don't pass selectOptions @@ -573,7 +572,7 @@ func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc stri n := len(allocs) for i := 0; i < n && i < *limit; i++ { a := allocs[i] - ctx.Plan().AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc, "") + ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "") diff.place = append(diff.place, a) } if n <= *limit { @@ -734,7 +733,7 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc if alloc.DesiredStatus == structs.AllocDesiredStatusStop && (alloc.ClientStatus == structs.AllocClientStatusRunning || alloc.ClientStatus == structs.AllocClientStatusPending) { - plan.AppendUpdate(alloc, structs.AllocDesiredStatusStop, allocLost, structs.AllocClientStatusLost) + plan.AppendStoppedAlloc(alloc, allocLost, structs.AllocClientStatusLost) } } } @@ -784,7 +783,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy // the current allocation is discounted when checking for feasibility. // Otherwise we would be trying to fit the tasks current resources and // updated resources. After select is called we can remove the evict. - ctx.Plan().AppendUpdate(existing, structs.AllocDesiredStatusStop, allocInPlace, "") + ctx.Plan().AppendStoppedAlloc(existing, allocInPlace, "") // Attempt to match the task group option := stack.Select(newTG, nil) // This select only looks at one node so we don't pass selectOptions