From 47435613965259a365172ab27bc07086e4b3521a Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 10 Apr 2019 20:20:22 -0500 Subject: [PATCH 1/2] Refactor scheduler package to enable preemption for batch/service jobs --- scheduler/generic_sched.go | 5 ++- scheduler/generic_sched_oss.go | 15 +++++++ scheduler/preemption.go | 16 ++++++- scheduler/preemption_test.go | 4 ++ scheduler/rank.go | 8 ++-- scheduler/stack.go | 79 +++------------------------------- scheduler/stack_oss.go | 78 +++++++++++++++++++++++++++++++++ 7 files changed, 126 insertions(+), 79 deletions(-) create mode 100644 scheduler/generic_sched_oss.go create mode 100644 scheduler/stack_oss.go diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index a9d6267a8a7..ad53dbec520 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -468,7 +468,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul // Compute penalty nodes for rescheduled allocs selectOptions := getSelectOptions(prevAllocation, preferredNode) - option := s.stack.Select(tg, selectOptions) + option := s.selectNextOption(tg, selectOptions) // Store the available nodes by datacenter s.ctx.Metrics().NodesAvailable = byDC @@ -501,6 +501,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul AllocatedResources: resources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, + SharedResources: &structs.Resources{ DiskMB: tg.EphemeralDisk.SizeMB, }, @@ -527,6 +528,8 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul } } + s.handlePreemptions(option, alloc, missing) + // Track the placement s.plan.AppendAlloc(alloc) diff --git a/scheduler/generic_sched_oss.go b/scheduler/generic_sched_oss.go new file mode 100644 index 00000000000..3a3f4b30f66 --- /dev/null +++ b/scheduler/generic_sched_oss.go @@ -0,0 +1,15 @@ +// +build !pro,!ent + +package scheduler + +import "github.com/hashicorp/nomad/nomad/structs" + +// selectNextOption calls the stack to get a node for placement +func (s *GenericScheduler) selectNextOption(tg *structs.TaskGroup, selectOptions *SelectOptions) *RankedNode { + return s.stack.Select(tg, selectOptions) +} + +// handlePreemptions sets relevant preeemption related fields. In OSS this is a no op. +func (s *GenericScheduler) handlePreemptions(option *RankedNode, alloc *structs.Allocation, missing placementResult) { + +} diff --git a/scheduler/preemption.go b/scheduler/preemption.go index 6c71a53fa70..417adcebd91 100644 --- a/scheduler/preemption.go +++ b/scheduler/preemption.go @@ -107,6 +107,9 @@ type Preemptor struct { // jobPriority is the priority of the job being preempted jobPriority int + // jobID is the ID of the job being preempted + jobID *structs.NamespacedID + // nodeRemainingResources tracks available resources on the node after // accounting for running allocations nodeRemainingResources *structs.ComparableResources @@ -118,10 +121,11 @@ type Preemptor struct { ctx Context } -func NewPreemptor(jobPriority int, ctx Context) *Preemptor { +func NewPreemptor(jobPriority int, ctx Context, jobID *structs.NamespacedID) *Preemptor { return &Preemptor{ currentPreemptions: make(map[structs.NamespacedID]map[string]int), jobPriority: jobPriority, + jobID: jobID, allocDetails: make(map[string]*allocInfo), ctx: ctx, } @@ -140,14 +144,22 @@ func (p *Preemptor) SetNode(node *structs.Node) { // SetCandidates initializes the candidate set from which preemptions are chosen func (p *Preemptor) SetCandidates(allocs []*structs.Allocation) { - p.currentAllocs = allocs + // Reset candidate set + p.currentAllocs = []*structs.Allocation{} for _, alloc := range allocs { + // Ignore any allocations of the job being placed + // This filters out any previous allocs of the job, and any new allocs in the plan + if alloc.JobID == p.jobID.ID && alloc.Namespace == p.jobID.Namespace { + continue + } + maxParallel := 0 tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) if tg != nil && tg.Migrate != nil { maxParallel = tg.Migrate.MaxParallel } p.allocDetails[alloc.ID] = &allocInfo{maxParallel: maxParallel, resources: alloc.ComparableResources()} + p.currentAllocs = append(p.currentAllocs, alloc) } } diff --git a/scheduler/preemption_test.go b/scheduler/preemption_test.go index ccbfa6f7e24..cd816ea7a4e 100644 --- a/scheduler/preemption_test.go +++ b/scheduler/preemption_test.go @@ -1249,6 +1249,7 @@ func TestPreemption(t *testing.T) { node.ReservedResources = tc.nodeReservedCapacity state, ctx := testContext(t) + nodes := []*RankedNode{ { Node: node, @@ -1267,6 +1268,9 @@ func TestPreemption(t *testing.T) { } static := NewStaticRankIterator(ctx, nodes) binPackIter := NewBinPackIterator(ctx, static, true, tc.jobPriority) + job := mock.Job() + job.Priority = tc.jobPriority + binPackIter.SetJob(job) taskGroup := &structs.TaskGroup{ EphemeralDisk: &structs.EphemeralDisk{}, diff --git a/scheduler/rank.go b/scheduler/rank.go index 2ed48c34f17..a35c691a025 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -147,6 +147,7 @@ type BinPackIterator struct { source RankIterator evict bool priority int + jobId *structs.NamespacedID taskGroup *structs.TaskGroup } @@ -162,8 +163,9 @@ func NewBinPackIterator(ctx Context, source RankIterator, evict bool, priority i return iter } -func (iter *BinPackIterator) SetPriority(p int) { - iter.priority = p +func (iter *BinPackIterator) SetJob(job *structs.Job) { + iter.priority = job.Priority + iter.jobId = job.NamespacedID() } func (iter *BinPackIterator) SetTaskGroup(taskGroup *structs.TaskGroup) { @@ -211,7 +213,7 @@ OUTER: var allocsToPreempt []*structs.Allocation // Initialize preemptor with node - preemptor := NewPreemptor(iter.priority, iter.ctx) + preemptor := NewPreemptor(iter.priority, iter.ctx, iter.jobId) preemptor.SetNode(option.Node) // Count the number of existing preemptions diff --git a/scheduler/stack.go b/scheduler/stack.go index 48f89f81fdb..037caf61bc5 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -34,6 +34,7 @@ type Stack interface { type SelectOptions struct { PenaltyNodeIDs map[string]struct{} PreferredNodes []*structs.Node + Preempt bool } // GenericStack is the Stack used for the Generic scheduler. It is @@ -62,77 +63,6 @@ type GenericStack struct { scoreNorm *ScoreNormalizationIterator } -// NewGenericStack constructs a stack used for selecting service placements -func NewGenericStack(batch bool, ctx Context) *GenericStack { - // Create a new stack - s := &GenericStack{ - batch: batch, - ctx: ctx, - } - - // Create the source iterator. We randomize the order we visit nodes - // to reduce collisions between schedulers and to do a basic load - // balancing across eligible nodes. - s.source = NewRandomIterator(ctx, nil) - - // Create the quota iterator to determine if placements would result in the - // quota attached to the namespace of the job to go over. - s.quota = NewQuotaIterator(ctx, s.source) - - // Attach the job constraints. The job is filled in later. - s.jobConstraint = NewConstraintChecker(ctx, nil) - - // Filter on task group drivers first as they are faster - s.taskGroupDrivers = NewDriverChecker(ctx, nil) - - // Filter on task group constraints second - s.taskGroupConstraint = NewConstraintChecker(ctx, nil) - - // Filter on task group devices - s.taskGroupDevices = NewDeviceChecker(ctx) - - // Create the feasibility wrapper which wraps all feasibility checks in - // which feasibility checking can be skipped if the computed node class has - // previously been marked as eligible or ineligible. Generally this will be - // checks that only needs to examine the single node to determine feasibility. - jobs := []FeasibilityChecker{s.jobConstraint} - tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices} - s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs) - - // Filter on distinct host constraints. - s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks) - - // Filter on distinct property constraints. - s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint) - - // Upgrade from feasible to rank iterator - rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint) - - // Apply the bin packing, this depends on the resources needed - // by a particular task group. - - s.binPack = NewBinPackIterator(ctx, rankSource, false, 0) - - // Apply the job anti-affinity iterator. This is to avoid placing - // multiple allocations on the same node for this job. - s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "") - - s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff) - - s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty) - - s.spread = NewSpreadIterator(ctx, s.nodeAffinity) - - s.scoreNorm = NewScoreNormalizationIterator(ctx, s.spread) - - // Apply a limit function. This is to avoid scanning *every* possible node. - s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip) - - // Select the node with the maximum score for placement - s.maxScore = NewMaxScoreIterator(ctx, s.limit) - return s -} - func (s *GenericStack) SetNodes(baseNodes []*structs.Node) { // Shuffle base nodes shuffleNodes(baseNodes) @@ -159,7 +89,7 @@ func (s *GenericStack) SetJob(job *structs.Job) { s.jobConstraint.SetConstraints(job.Constraints) s.distinctHostsConstraint.SetJob(job) s.distinctPropertyConstraint.SetJob(job) - s.binPack.SetPriority(job.Priority) + s.binPack.SetJob(job) s.jobAntiAff.SetJob(job) s.nodeAffinity.SetJob(job) s.spread.SetJob(job) @@ -203,6 +133,9 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra s.distinctPropertyConstraint.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) s.binPack.SetTaskGroup(tg) + if options != nil { + s.binPack.evict = options.Preempt + } s.jobAntiAff.SetTaskGroup(tg) if options != nil { s.nodeReschedulingPenalty.SetPenaltyNodes(options.PenaltyNodeIDs) @@ -306,7 +239,7 @@ func (s *SystemStack) SetNodes(baseNodes []*structs.Node) { func (s *SystemStack) SetJob(job *structs.Job) { s.jobConstraint.SetConstraints(job.Constraints) s.distinctPropertyConstraint.SetJob(job) - s.binPack.SetPriority(job.Priority) + s.binPack.SetJob(job) s.ctx.Eligibility().SetJob(job) if contextual, ok := s.quota.(ContextualIterator); ok { diff --git a/scheduler/stack_oss.go b/scheduler/stack_oss.go new file mode 100644 index 00000000000..3ef90286ff5 --- /dev/null +++ b/scheduler/stack_oss.go @@ -0,0 +1,78 @@ +// +build !pro,!ent + +package scheduler + +// NewGenericStack constructs a stack used for selecting service placements +func NewGenericStack(batch bool, ctx Context) *GenericStack { + // Create a new stack + s := &GenericStack{ + batch: batch, + ctx: ctx, + } + + // Create the source iterator. We randomize the order we visit nodes + // to reduce collisions between schedulers and to do a basic load + // balancing across eligible nodes. + s.source = NewRandomIterator(ctx, nil) + + // Create the quota iterator to determine if placements would result in the + // quota attached to the namespace of the job to go over. + s.quota = NewQuotaIterator(ctx, s.source) + + // Attach the job constraints. The job is filled in later. + s.jobConstraint = NewConstraintChecker(ctx, nil) + + // Filter on task group drivers first as they are faster + s.taskGroupDrivers = NewDriverChecker(ctx, nil) + + // Filter on task group constraints second + s.taskGroupConstraint = NewConstraintChecker(ctx, nil) + + // Filter on task group devices + s.taskGroupDevices = NewDeviceChecker(ctx) + + // Create the feasibility wrapper which wraps all feasibility checks in + // which feasibility checking can be skipped if the computed node class has + // previously been marked as eligible or ineligible. Generally this will be + // checks that only needs to examine the single node to determine feasibility. + jobs := []FeasibilityChecker{s.jobConstraint} + tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices} + s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs) + + // Filter on distinct host constraints. + s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks) + + // Filter on distinct property constraints. + s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint) + + // Upgrade from feasible to rank iterator + rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint) + + // Apply the bin packing, this depends on the resources needed + // by a particular task group. + s.binPack = NewBinPackIterator(ctx, rankSource, false, 0) + + // Apply the job anti-affinity iterator. This is to avoid placing + // multiple allocations on the same node for this job. + s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "") + + // Apply node rescheduling penalty. This tries to avoid placing on a + // node where the allocation failed previously + s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff) + + // Apply scores based on affinity stanza + s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty) + + // Apply scores based on spread stanza + s.spread = NewSpreadIterator(ctx, s.nodeAffinity) + + // Normalizes scores by averaging them across various scorers + s.scoreNorm = NewScoreNormalizationIterator(ctx, s.spread) + + // Apply a limit function. This is to avoid scanning *every* possible node. + s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip) + + // Select the node with the maximum score for placement + s.maxScore = NewMaxScoreIterator(ctx, s.limit) + return s +} From a134c16c22218aea4d666d31f8e5ea8b2002f6ce Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Fri, 12 Apr 2019 10:32:48 -0500 Subject: [PATCH 2/2] remove stray new line --- scheduler/generic_sched.go | 1 - 1 file changed, 1 deletion(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index ad53dbec520..c8de7e549a0 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -501,7 +501,6 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul AllocatedResources: resources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, - SharedResources: &structs.Resources{ DiskMB: tg.EphemeralDisk.SizeMB, },