Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalize plan to increase the plan apply throughput #5602

Merged
merged 7 commits into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,7 +1243,7 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig {
return config
}

if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion) {
if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion, false) {
s.logger.Named("autopilot").Warn("can't initialize until all servers are above minimum version", "min_version", minAutopilotVersion)
return nil
}
Expand All @@ -1270,7 +1270,7 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration {
if config != nil {
return config
}
if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion) {
if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion, false) {
s.logger.Named("core").Warn("can't initialize scheduler config until all servers are above minimum version", "min_version", minSchedulerConfigVersion)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
}

// All servers should be at or above 0.8.0 to apply this operatation
if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion) {
if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion, false) {
return fmt.Errorf("All servers should be running version %v to update autopilot config", minAutopilotVersion)
}

Expand Down Expand Up @@ -305,7 +305,7 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe
}

// All servers should be at or above 0.9.0 to apply this operatation
if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion) {
if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion, false) {
return fmt.Errorf("All servers should be running version %v to update scheduler config", minSchedulerConfigVersion)
}
// Apply the update
Expand Down
163 changes: 122 additions & 41 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/hashicorp/raft"
)

// planner is used to mange the submitted allocation plans that are waiting
// planner is used to manage the submitted allocation plans that are waiting
// to be accessed by the leader
type planner struct {
*Server
Expand Down Expand Up @@ -149,52 +149,82 @@ func (p *planner) planApply() {

// applyPlan is used to apply the plan result and to return the alloc index
func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
minUpdates := len(result.NodeUpdate)
minUpdates += len(result.NodeAllocation)

// Setup the update request
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: plan.Job,
Alloc: make([]*structs.Allocation, 0, minUpdates),
Job: plan.Job,
},
Deployment: result.Deployment,
DeploymentUpdates: result.DeploymentUpdates,
EvalID: plan.EvalID,
NodePreemptions: make([]*structs.Allocation, 0, len(result.NodePreemptions)),
}
for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
}
for _, allocList := range result.NodeAllocation {
req.Alloc = append(req.Alloc, allocList...)
}

for _, preemptions := range result.NodePreemptions {
req.NodePreemptions = append(req.NodePreemptions, preemptions...)
}

// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
now := time.Now().UTC().UnixNano()
for _, alloc := range req.Alloc {
if alloc.CreateTime == 0 {
alloc.CreateTime = now

if ServersMeetMinimumVersion(p.Members(), MinVersionPlanNormalization, true) {
// Initialize the allocs request using the new optimized log entry format.
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
req.AllocsStopped = make([]*structs.AllocationDiff, 0, len(result.NodeUpdate))
req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation))
req.AllocsPreempted = make([]*structs.AllocationDiff, 0, len(result.NodePreemptions))

for _, updateList := range result.NodeUpdate {
for _, stoppedAlloc := range updateList {
req.AllocsStopped = append(req.AllocsStopped, normalizeStoppedAlloc(stoppedAlloc, now))
}
}
alloc.ModifyTime = now
}

// Set modify time for preempted allocs if any
// Also gather jobids to create follow up evals
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
for _, alloc := range req.NodePreemptions {
alloc.ModifyTime = now
id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}
_, ok := preemptedJobIDs[id]
if !ok {
preemptedJobIDs[id] = struct{}{}
for _, allocList := range result.NodeAllocation {
req.AllocsUpdated = append(req.AllocsUpdated, allocList...)
}

// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
updateAllocTimestamps(req.AllocsUpdated, now)

for _, preemptions := range result.NodePreemptions {
for _, preemptedAlloc := range preemptions {
req.AllocsPreempted = append(req.AllocsPreempted, normalizePreemptedAlloc(preemptedAlloc, now))

// Gather jobids to create follow up evals
appendNamespacedJobID(preemptedJobIDs, preemptedAlloc)
}
}
} else {
// COMPAT 0.11: This branch is deprecated and will only be used to support
// application of older log entries. Expected to be removed in a future version.

// Determine the minimum number of updates, could be more if there
// are multiple updates per node
minUpdates := len(result.NodeUpdate)
minUpdates += len(result.NodeAllocation)

// Initialize using the older log entry format for Alloc and NodePreemptions
req.Alloc = make([]*structs.Allocation, 0, minUpdates)
req.NodePreemptions = make([]*structs.Allocation, 0, len(result.NodePreemptions))

for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
}
for _, allocList := range result.NodeAllocation {
req.Alloc = append(req.Alloc, allocList...)
}

for _, preemptions := range result.NodePreemptions {
req.NodePreemptions = append(req.NodePreemptions, preemptions...)
}

// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
updateAllocTimestamps(req.Alloc, now)

// Set modify time for preempted allocs if any
// Also gather jobids to create follow up evals
for _, alloc := range req.NodePreemptions {
alloc.ModifyTime = now
appendNamespacedJobID(preemptedJobIDs, alloc)
}
}

Expand Down Expand Up @@ -232,6 +262,50 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
return future, nil
}

// normalizePreemptedAlloc removes redundant fields from a preempted allocation and
// returns AllocationDiff. Since a preempted allocation is always an existing allocation,
// the struct returned by this method contains only the differential, which can be
// applied to an existing allocation, to yield the updated struct
func normalizePreemptedAlloc(preemptedAlloc *structs.Allocation, now int64) *structs.AllocationDiff {
return &structs.AllocationDiff{
ID: preemptedAlloc.ID,
PreemptedByAllocation: preemptedAlloc.PreemptedByAllocation,
ModifyTime: now,
}
}

// normalizeStoppedAlloc removes redundant fields from a stopped allocation and
// returns AllocationDiff. Since a stopped allocation is always an existing allocation,
// the struct returned by this method contains only the differential, which can be
// applied to an existing allocation, to yield the updated struct
func normalizeStoppedAlloc(stoppedAlloc *structs.Allocation, now int64) *structs.AllocationDiff {
return &structs.AllocationDiff{
ID: stoppedAlloc.ID,
DesiredDescription: stoppedAlloc.DesiredDescription,
ClientStatus: stoppedAlloc.ClientStatus,
ModifyTime: now,
}
}

// appendNamespacedJobID appends the namespaced Job ID for the alloc to the jobIDs set
func appendNamespacedJobID(jobIDs map[structs.NamespacedID]struct{}, alloc *structs.Allocation) {
id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}
if _, ok := jobIDs[id]; !ok {
jobIDs[id] = struct{}{}
}
}

// updateAllocTimestamps sets the CreateTime and ModifyTime for the allocations
// to the timestamp provided
func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) {
for _, alloc := range allocations {
if alloc.CreateTime == 0 {
alloc.CreateTime = timestamp
}
alloc.ModifyTime = timestamp
}
}

// asyncPlanWait is used to apply and respond to a plan async
func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
result *structs.PlanResult, pending *pendingPlan) {
Expand Down Expand Up @@ -264,6 +338,17 @@ func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.Plan, logger log.Logger) (*structs.PlanResult, error) {
defer metrics.MeasureSince([]string{"nomad", "plan", "evaluate"}, time.Now())

// Denormalize without the job
err := snap.DenormalizeAllocationsMap(plan.NodeUpdate, nil)
if err != nil {
return nil, err
}
// Denormalize without the job
err = snap.DenormalizeAllocationsMap(plan.NodePreemptions, nil)
if err != nil {
return nil, err
}

// Check if the plan exceeds quota
overQuota, err := evaluatePlanQuota(snap, plan)
if err != nil {
Expand Down Expand Up @@ -521,15 +606,11 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri

// Remove any preempted allocs
if preempted := plan.NodePreemptions[nodeID]; len(preempted) > 0 {
for _, allocs := range preempted {
remove = append(remove, allocs)
}
remove = append(remove, preempted...)
}

if updated := plan.NodeAllocation[nodeID]; len(updated) > 0 {
for _, alloc := range updated {
remove = append(remove, alloc)
}
remove = append(remove, updated...)
}
proposed := structs.RemoveAllocs(existingAlloc, remove)
proposed = append(proposed, plan.NodeAllocation[nodeID]...)
Expand Down
Loading