Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor scheduler package to enable preemption for batch/service jobs #5545

Merged
merged 2 commits into from
Apr 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul

// Compute penalty nodes for rescheduled allocs
selectOptions := getSelectOptions(prevAllocation, preferredNode)
option := s.stack.Select(tg, selectOptions)
option := s.selectNextOption(tg, selectOptions)

// Store the available nodes by datacenter
s.ctx.Metrics().NodesAvailable = byDC
Expand Down Expand Up @@ -527,6 +527,8 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
}
}

s.handlePreemptions(option, alloc, missing)

// Track the placement
s.plan.AppendAlloc(alloc)

Expand Down
15 changes: 15 additions & 0 deletions scheduler/generic_sched_oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// +build !pro,!ent

package scheduler

import "github.com/hashicorp/nomad/nomad/structs"

// selectNextOption calls the stack to get a node for placement
func (s *GenericScheduler) selectNextOption(tg *structs.TaskGroup, selectOptions *SelectOptions) *RankedNode {
return s.stack.Select(tg, selectOptions)
}

// handlePreemptions sets relevant preeemption related fields. In OSS this is a no op.
func (s *GenericScheduler) handlePreemptions(option *RankedNode, alloc *structs.Allocation, missing placementResult) {

}
16 changes: 14 additions & 2 deletions scheduler/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type Preemptor struct {
// jobPriority is the priority of the job being preempted
jobPriority int

// jobID is the ID of the job being preempted
jobID *structs.NamespacedID

// nodeRemainingResources tracks available resources on the node after
// accounting for running allocations
nodeRemainingResources *structs.ComparableResources
Expand All @@ -118,10 +121,11 @@ type Preemptor struct {
ctx Context
}

func NewPreemptor(jobPriority int, ctx Context) *Preemptor {
func NewPreemptor(jobPriority int, ctx Context, jobID *structs.NamespacedID) *Preemptor {
return &Preemptor{
currentPreemptions: make(map[structs.NamespacedID]map[string]int),
jobPriority: jobPriority,
jobID: jobID,
allocDetails: make(map[string]*allocInfo),
ctx: ctx,
}
Expand All @@ -140,14 +144,22 @@ func (p *Preemptor) SetNode(node *structs.Node) {

// SetCandidates initializes the candidate set from which preemptions are chosen
func (p *Preemptor) SetCandidates(allocs []*structs.Allocation) {
p.currentAllocs = allocs
// Reset candidate set
p.currentAllocs = []*structs.Allocation{}
for _, alloc := range allocs {
// Ignore any allocations of the job being placed
// This filters out any previous allocs of the job, and any new allocs in the plan
if alloc.JobID == p.jobID.ID && alloc.Namespace == p.jobID.Namespace {
continue
}

maxParallel := 0
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg != nil && tg.Migrate != nil {
maxParallel = tg.Migrate.MaxParallel
}
p.allocDetails[alloc.ID] = &allocInfo{maxParallel: maxParallel, resources: alloc.ComparableResources()}
p.currentAllocs = append(p.currentAllocs, alloc)
}
}

Expand Down
4 changes: 4 additions & 0 deletions scheduler/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,7 @@ func TestPreemption(t *testing.T) {
node.ReservedResources = tc.nodeReservedCapacity

state, ctx := testContext(t)

nodes := []*RankedNode{
{
Node: node,
Expand All @@ -1267,6 +1268,9 @@ func TestPreemption(t *testing.T) {
}
static := NewStaticRankIterator(ctx, nodes)
binPackIter := NewBinPackIterator(ctx, static, true, tc.jobPriority)
job := mock.Job()
job.Priority = tc.jobPriority
binPackIter.SetJob(job)

taskGroup := &structs.TaskGroup{
EphemeralDisk: &structs.EphemeralDisk{},
Expand Down
8 changes: 5 additions & 3 deletions scheduler/rank.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ type BinPackIterator struct {
source RankIterator
evict bool
priority int
jobId *structs.NamespacedID
taskGroup *structs.TaskGroup
}

Expand All @@ -162,8 +163,9 @@ func NewBinPackIterator(ctx Context, source RankIterator, evict bool, priority i
return iter
}

func (iter *BinPackIterator) SetPriority(p int) {
iter.priority = p
func (iter *BinPackIterator) SetJob(job *structs.Job) {
iter.priority = job.Priority
iter.jobId = job.NamespacedID()
}

func (iter *BinPackIterator) SetTaskGroup(taskGroup *structs.TaskGroup) {
Expand Down Expand Up @@ -211,7 +213,7 @@ OUTER:
var allocsToPreempt []*structs.Allocation

// Initialize preemptor with node
preemptor := NewPreemptor(iter.priority, iter.ctx)
preemptor := NewPreemptor(iter.priority, iter.ctx, iter.jobId)
preemptor.SetNode(option.Node)

// Count the number of existing preemptions
Expand Down
79 changes: 6 additions & 73 deletions scheduler/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Stack interface {
type SelectOptions struct {
PenaltyNodeIDs map[string]struct{}
PreferredNodes []*structs.Node
Preempt bool
}

// GenericStack is the Stack used for the Generic scheduler. It is
Expand Down Expand Up @@ -62,77 +63,6 @@ type GenericStack struct {
scoreNorm *ScoreNormalizationIterator
}

// NewGenericStack constructs a stack used for selecting service placements
func NewGenericStack(batch bool, ctx Context) *GenericStack {
// Create a new stack
s := &GenericStack{
batch: batch,
ctx: ctx,
}

// Create the source iterator. We randomize the order we visit nodes
// to reduce collisions between schedulers and to do a basic load
// balancing across eligible nodes.
s.source = NewRandomIterator(ctx, nil)

// Create the quota iterator to determine if placements would result in the
// quota attached to the namespace of the job to go over.
s.quota = NewQuotaIterator(ctx, s.source)

// Attach the job constraints. The job is filled in later.
s.jobConstraint = NewConstraintChecker(ctx, nil)

// Filter on task group drivers first as they are faster
s.taskGroupDrivers = NewDriverChecker(ctx, nil)

// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)

// Filter on task group devices
s.taskGroupDevices = NewDeviceChecker(ctx)

// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs)

// Filter on distinct host constraints.
s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks)

// Filter on distinct property constraints.
s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint)

// Upgrade from feasible to rank iterator
rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint)

// Apply the bin packing, this depends on the resources needed
// by a particular task group.

s.binPack = NewBinPackIterator(ctx, rankSource, false, 0)

// Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job.
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "")

s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff)

s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty)

s.spread = NewSpreadIterator(ctx, s.nodeAffinity)

s.scoreNorm = NewScoreNormalizationIterator(ctx, s.spread)

// Apply a limit function. This is to avoid scanning *every* possible node.
s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip)

// Select the node with the maximum score for placement
s.maxScore = NewMaxScoreIterator(ctx, s.limit)
return s
}

func (s *GenericStack) SetNodes(baseNodes []*structs.Node) {
// Shuffle base nodes
shuffleNodes(baseNodes)
Expand All @@ -159,7 +89,7 @@ func (s *GenericStack) SetJob(job *structs.Job) {
s.jobConstraint.SetConstraints(job.Constraints)
s.distinctHostsConstraint.SetJob(job)
s.distinctPropertyConstraint.SetJob(job)
s.binPack.SetPriority(job.Priority)
s.binPack.SetJob(job)
s.jobAntiAff.SetJob(job)
s.nodeAffinity.SetJob(job)
s.spread.SetJob(job)
Expand Down Expand Up @@ -203,6 +133,9 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
s.distinctPropertyConstraint.SetTaskGroup(tg)
s.wrappedChecks.SetTaskGroup(tg.Name)
s.binPack.SetTaskGroup(tg)
if options != nil {
s.binPack.evict = options.Preempt
}
s.jobAntiAff.SetTaskGroup(tg)
if options != nil {
s.nodeReschedulingPenalty.SetPenaltyNodes(options.PenaltyNodeIDs)
Expand Down Expand Up @@ -306,7 +239,7 @@ func (s *SystemStack) SetNodes(baseNodes []*structs.Node) {
func (s *SystemStack) SetJob(job *structs.Job) {
s.jobConstraint.SetConstraints(job.Constraints)
s.distinctPropertyConstraint.SetJob(job)
s.binPack.SetPriority(job.Priority)
s.binPack.SetJob(job)
s.ctx.Eligibility().SetJob(job)

if contextual, ok := s.quota.(ContextualIterator); ok {
Expand Down
78 changes: 78 additions & 0 deletions scheduler/stack_oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// +build !pro,!ent

package scheduler

// NewGenericStack constructs a stack used for selecting service placements
func NewGenericStack(batch bool, ctx Context) *GenericStack {
Copy link
Contributor

@endocrimes endocrimes Apr 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we potentially want to have a way to modify the stack, rather than duping the whole setup? (mostly worried about duplicate work when introducing other new scheduler things that could easily be forgotten to be also PR'd to ENT)

// Create a new stack
s := &GenericStack{
batch: batch,
ctx: ctx,
}

// Create the source iterator. We randomize the order we visit nodes
// to reduce collisions between schedulers and to do a basic load
// balancing across eligible nodes.
s.source = NewRandomIterator(ctx, nil)

// Create the quota iterator to determine if placements would result in the
// quota attached to the namespace of the job to go over.
s.quota = NewQuotaIterator(ctx, s.source)

// Attach the job constraints. The job is filled in later.
s.jobConstraint = NewConstraintChecker(ctx, nil)

// Filter on task group drivers first as they are faster
s.taskGroupDrivers = NewDriverChecker(ctx, nil)

// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)

// Filter on task group devices
s.taskGroupDevices = NewDeviceChecker(ctx)

// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs)

// Filter on distinct host constraints.
s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks)

// Filter on distinct property constraints.
s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint)

// Upgrade from feasible to rank iterator
rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint)

// Apply the bin packing, this depends on the resources needed
// by a particular task group.
s.binPack = NewBinPackIterator(ctx, rankSource, false, 0)

// Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job.
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "")

// Apply node rescheduling penalty. This tries to avoid placing on a
// node where the allocation failed previously
s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff)

// Apply scores based on affinity stanza
s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty)

// Apply scores based on spread stanza
s.spread = NewSpreadIterator(ctx, s.nodeAffinity)

// Normalizes scores by averaging them across various scorers
s.scoreNorm = NewScoreNormalizationIterator(ctx, s.spread)

// Apply a limit function. This is to avoid scanning *every* possible node.
s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip)

// Select the node with the maximum score for placement
s.maxScore = NewMaxScoreIterator(ctx, s.limit)
return s
}