Skip to content

Commit

Permalink
Merge pull request #787 from hashicorp/f-scheduler-retries
Browse files Browse the repository at this point in the history
Improved scheduler retry logic under high contention
  • Loading branch information
dadgar committed Feb 11, 2016
2 parents c242612 + 13d1fd0 commit 49b4d39
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 27 deletions.
51 changes: 38 additions & 13 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log"

"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -50,11 +51,12 @@ type GenericScheduler struct {
planner Planner
batch bool

eval *structs.Evaluation
job *structs.Job
plan *structs.Plan
ctx *EvalContext
stack *GenericStack
eval *structs.Evaluation
job *structs.Job
plan *structs.Plan
planResult *structs.PlanResult
ctx *EvalContext
stack *GenericStack

limitReached bool
nextEval *structs.Evaluation
Expand Down Expand Up @@ -99,14 +101,24 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusFailed, desc)
}

// Retry up to the maxScheduleAttempts
// Retry up to the maxScheduleAttempts and reset if progress is made.
progress := func() bool { return progressMade(s.planResult) }
limit := maxServiceScheduleAttempts
if s.batch {
limit = maxBatchScheduleAttempts
}
if err := retryMax(limit, s.process); err != nil {
if err := retryMax(limit, s.process, progress); err != nil {
if statusErr, ok := err.(*SetStatusError); ok {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, statusErr.EvalStatus, err.Error())
// Scheduling was tried but made no forward progress so create a
// blocked eval to retry once resources become available.
var mErr multierror.Error
if err := s.createBlockedEval(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
if err := setStatus(s.logger, s.planner, s.eval, s.nextEval, statusErr.EvalStatus, err.Error()); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
return mErr.ErrorOrNil()
}
return err
}
Expand All @@ -115,6 +127,21 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusComplete, "")
}

// createBlockedEval creates a blocked eval and stores it.
func (s *GenericScheduler) createBlockedEval() error {
e := s.ctx.Eligibility()
escaped := e.HasEscaped()

// Only store the eligible classes if the eval hasn't escaped.
var classEligibility map[string]bool
if !escaped {
classEligibility = e.GetClasses()
}

s.blocked = s.eval.BlockedEval(classEligibility, escaped)
return s.planner.CreateEval(s.blocked)
}

// process is wrapped in retryMax to iteratively run the handler until we have no
// further work or we've made the maximum number of attempts.
func (s *GenericScheduler) process() (bool, error) {
Expand Down Expand Up @@ -163,18 +190,16 @@ func (s *GenericScheduler) process() (bool, error) {
// If there are failed allocations, we need to create a blocked evaluation
// to place the failed allocations when resources become available.
if len(s.plan.FailedAllocs) != 0 && s.blocked == nil {
e := s.ctx.Eligibility()
classes := e.GetClasses()
s.blocked = s.eval.BlockedEval(classes, e.HasEscaped())
if err := s.planner.CreateEval(s.blocked); err != nil {
if err := s.createBlockedEval(); err != nil {
s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err)
return false, err
}
s.logger.Printf("[DEBUG] sched: %#v: failed to place all allocations, blocked eval '%s' created", s.eval, s.blocked.ID)
}

// Submit the plan
// Submit the plan and store the results.
result, newState, err := s.planner.SubmitPlan(s.plan)
s.planResult = result
if err != nil {
return false, err
}
Expand Down
21 changes: 12 additions & 9 deletions scheduler/system_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ type SystemScheduler struct {
state State
planner Planner

eval *structs.Evaluation
job *structs.Job
plan *structs.Plan
ctx *EvalContext
stack *SystemStack
nodes []*structs.Node
nodesByDC map[string]int
eval *structs.Evaluation
job *structs.Job
plan *structs.Plan
planResult *structs.PlanResult
ctx *EvalContext
stack *SystemStack
nodes []*structs.Node
nodesByDC map[string]int

limitReached bool
nextEval *structs.Evaluation
Expand Down Expand Up @@ -62,8 +63,9 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusFailed, desc)
}

// Retry up to the maxSystemScheduleAttempts
if err := retryMax(maxSystemScheduleAttempts, s.process); err != nil {
// Retry up to the maxSystemScheduleAttempts and reset if progress is made.
progress := func() bool { return progressMade(s.planResult) }
if err := retryMax(maxSystemScheduleAttempts, s.process, progress); err != nil {
if statusErr, ok := err.(*SetStatusError); ok {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, statusErr.EvalStatus, err.Error())
}
Expand Down Expand Up @@ -129,6 +131,7 @@ func (s *SystemScheduler) process() (bool, error) {

// Submit the plan
result, newState, err := s.planner.SubmitPlan(s.plan)
s.planResult = result
if err != nil {
return false, err
}
Expand Down
21 changes: 18 additions & 3 deletions scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,10 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
}

// retryMax is used to retry a callback until it returns success or
// a maximum number of attempts is reached
func retryMax(max int, cb func() (bool, error)) error {
// a maximum number of attempts is reached. An optional reset function may be
// passed which is called after each failed iteration. If the reset function is
// set and returns true, the number of attempts is reset back to max.
func retryMax(max int, cb func() (bool, error), reset func() bool) error {
attempts := 0
for attempts < max {
done, err := cb()
Expand All @@ -219,14 +221,27 @@ func retryMax(max int, cb func() (bool, error)) error {
if done {
return nil
}
attempts += 1

// Check if we should reset the number attempts
if reset != nil && reset() {
attempts = 0
} else {
attempts += 1
}
}
return &SetStatusError{
Err: fmt.Errorf("maximum attempts reached (%d)", max),
EvalStatus: structs.EvalStatusFailed,
}
}

// progressMade checks to see if the plan result made allocations or updates.
// If the result is nil, false is returned.
func progressMade(result *structs.PlanResult) bool {
return result != nil && len(result.NodeUpdate) != 0 &&
len(result.NodeAllocation) != 0
}

// taintedNodes is used to scan the allocations and then check if the
// underlying nodes are tainted, and should force a migration of the allocation.
func taintedNodes(state State, allocs []*structs.Allocation) (map[string]bool, error) {
Expand Down
21 changes: 19 additions & 2 deletions scheduler/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,20 +229,37 @@ func TestRetryMax(t *testing.T) {
calls += 1
return false, nil
}
err := retryMax(3, bad)
err := retryMax(3, bad, nil)
if err == nil {
t.Fatalf("should fail")
}
if calls != 3 {
t.Fatalf("mis match")
}

calls = 0
first := true
reset := func() bool {
if calls == 3 && first {
first = false
return true
}
return false
}
err = retryMax(3, bad, reset)
if err == nil {
t.Fatalf("should fail")
}
if calls != 6 {
t.Fatalf("mis match")
}

calls = 0
good := func() (bool, error) {
calls += 1
return true, nil
}
err = retryMax(3, good)
err = retryMax(3, good, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down

0 comments on commit 49b4d39

Please sign in to comment.