Skip to content

Commit

Permalink
Merge pull request #5602 from hashicorp/normalized-plan
Browse files Browse the repository at this point in the history
Normalize plan to increase the plan apply throughput
  • Loading branch information
arshjohar authored Apr 24, 2019
2 parents 653b89e + 23bc1f2 commit 8d48b77
Show file tree
Hide file tree
Showing 17 changed files with 986 additions and 186 deletions.
4 changes: 2 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,7 +1243,7 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig {
return config
}

if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion) {
if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion, false) {
s.logger.Named("autopilot").Warn("can't initialize until all servers are above minimum version", "min_version", minAutopilotVersion)
return nil
}
Expand All @@ -1270,7 +1270,7 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration {
if config != nil {
return config
}
if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion) {
if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion, false) {
s.logger.Named("core").Warn("can't initialize scheduler config until all servers are above minimum version", "min_version", minSchedulerConfigVersion)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
}

// All servers should be at or above 0.8.0 to apply this operatation
if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion) {
if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion, false) {
return fmt.Errorf("All servers should be running version %v to update autopilot config", minAutopilotVersion)
}

Expand Down Expand Up @@ -305,7 +305,7 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe
}

// All servers should be at or above 0.9.0 to apply this operatation
if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion) {
if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion, false) {
return fmt.Errorf("All servers should be running version %v to update scheduler config", minSchedulerConfigVersion)
}
// Apply the update
Expand Down
163 changes: 122 additions & 41 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/hashicorp/raft"
)

// planner is used to mange the submitted allocation plans that are waiting
// planner is used to manage the submitted allocation plans that are waiting
// to be accessed by the leader
type planner struct {
*Server
Expand Down Expand Up @@ -149,52 +149,82 @@ func (p *planner) planApply() {

// applyPlan is used to apply the plan result and to return the alloc index
func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
minUpdates := len(result.NodeUpdate)
minUpdates += len(result.NodeAllocation)

// Setup the update request
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: plan.Job,
Alloc: make([]*structs.Allocation, 0, minUpdates),
Job: plan.Job,
},
Deployment: result.Deployment,
DeploymentUpdates: result.DeploymentUpdates,
EvalID: plan.EvalID,
NodePreemptions: make([]*structs.Allocation, 0, len(result.NodePreemptions)),
}
for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
}
for _, allocList := range result.NodeAllocation {
req.Alloc = append(req.Alloc, allocList...)
}

for _, preemptions := range result.NodePreemptions {
req.NodePreemptions = append(req.NodePreemptions, preemptions...)
}

// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
now := time.Now().UTC().UnixNano()
for _, alloc := range req.Alloc {
if alloc.CreateTime == 0 {
alloc.CreateTime = now

if ServersMeetMinimumVersion(p.Members(), MinVersionPlanNormalization, true) {
// Initialize the allocs request using the new optimized log entry format.
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
req.AllocsStopped = make([]*structs.AllocationDiff, 0, len(result.NodeUpdate))
req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation))
req.AllocsPreempted = make([]*structs.AllocationDiff, 0, len(result.NodePreemptions))

for _, updateList := range result.NodeUpdate {
for _, stoppedAlloc := range updateList {
req.AllocsStopped = append(req.AllocsStopped, normalizeStoppedAlloc(stoppedAlloc, now))
}
}
alloc.ModifyTime = now
}

// Set modify time for preempted allocs if any
// Also gather jobids to create follow up evals
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
for _, alloc := range req.NodePreemptions {
alloc.ModifyTime = now
id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}
_, ok := preemptedJobIDs[id]
if !ok {
preemptedJobIDs[id] = struct{}{}
for _, allocList := range result.NodeAllocation {
req.AllocsUpdated = append(req.AllocsUpdated, allocList...)
}

// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
updateAllocTimestamps(req.AllocsUpdated, now)

for _, preemptions := range result.NodePreemptions {
for _, preemptedAlloc := range preemptions {
req.AllocsPreempted = append(req.AllocsPreempted, normalizePreemptedAlloc(preemptedAlloc, now))

// Gather jobids to create follow up evals
appendNamespacedJobID(preemptedJobIDs, preemptedAlloc)
}
}
} else {
// COMPAT 0.11: This branch is deprecated and will only be used to support
// application of older log entries. Expected to be removed in a future version.

// Determine the minimum number of updates, could be more if there
// are multiple updates per node
minUpdates := len(result.NodeUpdate)
minUpdates += len(result.NodeAllocation)

// Initialize using the older log entry format for Alloc and NodePreemptions
req.Alloc = make([]*structs.Allocation, 0, minUpdates)
req.NodePreemptions = make([]*structs.Allocation, 0, len(result.NodePreemptions))

for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
}
for _, allocList := range result.NodeAllocation {
req.Alloc = append(req.Alloc, allocList...)
}

for _, preemptions := range result.NodePreemptions {
req.NodePreemptions = append(req.NodePreemptions, preemptions...)
}

// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
updateAllocTimestamps(req.Alloc, now)

// Set modify time for preempted allocs if any
// Also gather jobids to create follow up evals
for _, alloc := range req.NodePreemptions {
alloc.ModifyTime = now
appendNamespacedJobID(preemptedJobIDs, alloc)
}
}

Expand Down Expand Up @@ -232,6 +262,50 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
return future, nil
}

// normalizePreemptedAlloc removes redundant fields from a preempted allocation and
// returns AllocationDiff. Since a preempted allocation is always an existing allocation,
// the struct returned by this method contains only the differential, which can be
// applied to an existing allocation, to yield the updated struct
func normalizePreemptedAlloc(preemptedAlloc *structs.Allocation, now int64) *structs.AllocationDiff {
return &structs.AllocationDiff{
ID: preemptedAlloc.ID,
PreemptedByAllocation: preemptedAlloc.PreemptedByAllocation,
ModifyTime: now,
}
}

// normalizeStoppedAlloc removes redundant fields from a stopped allocation and
// returns AllocationDiff. Since a stopped allocation is always an existing allocation,
// the struct returned by this method contains only the differential, which can be
// applied to an existing allocation, to yield the updated struct
func normalizeStoppedAlloc(stoppedAlloc *structs.Allocation, now int64) *structs.AllocationDiff {
return &structs.AllocationDiff{
ID: stoppedAlloc.ID,
DesiredDescription: stoppedAlloc.DesiredDescription,
ClientStatus: stoppedAlloc.ClientStatus,
ModifyTime: now,
}
}

// appendNamespacedJobID appends the namespaced Job ID for the alloc to the jobIDs set
func appendNamespacedJobID(jobIDs map[structs.NamespacedID]struct{}, alloc *structs.Allocation) {
id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}
if _, ok := jobIDs[id]; !ok {
jobIDs[id] = struct{}{}
}
}

// updateAllocTimestamps sets the CreateTime and ModifyTime for the allocations
// to the timestamp provided
func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) {
for _, alloc := range allocations {
if alloc.CreateTime == 0 {
alloc.CreateTime = timestamp
}
alloc.ModifyTime = timestamp
}
}

// asyncPlanWait is used to apply and respond to a plan async
func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
result *structs.PlanResult, pending *pendingPlan) {
Expand Down Expand Up @@ -264,6 +338,17 @@ func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.Plan, logger log.Logger) (*structs.PlanResult, error) {
defer metrics.MeasureSince([]string{"nomad", "plan", "evaluate"}, time.Now())

// Denormalize without the job
err := snap.DenormalizeAllocationsMap(plan.NodeUpdate, nil)
if err != nil {
return nil, err
}
// Denormalize without the job
err = snap.DenormalizeAllocationsMap(plan.NodePreemptions, nil)
if err != nil {
return nil, err
}

// Check if the plan exceeds quota
overQuota, err := evaluatePlanQuota(snap, plan)
if err != nil {
Expand Down Expand Up @@ -521,15 +606,11 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri

// Remove any preempted allocs
if preempted := plan.NodePreemptions[nodeID]; len(preempted) > 0 {
for _, allocs := range preempted {
remove = append(remove, allocs)
}
remove = append(remove, preempted...)
}

if updated := plan.NodeAllocation[nodeID]; len(updated) > 0 {
for _, alloc := range updated {
remove = append(remove, alloc)
}
remove = append(remove, updated...)
}
proposed := structs.RemoveAllocs(existingAlloc, remove)
proposed = append(proposed, plan.NodeAllocation[nodeID]...)
Expand Down
Loading

0 comments on commit 8d48b77

Please sign in to comment.