Skip to content

Commit

Permalink
Reaping failed evaluations creates follow up eval
Browse files Browse the repository at this point in the history
Create a follow up evaluation when reaping failed evaluations. This
ensures that a job will still make eventual progress.
  • Loading branch information
dadgar committed Apr 12, 2017
1 parent 57ed873 commit 7e64557
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 10 deletions.
19 changes: 18 additions & 1 deletion nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"time"

Expand All @@ -19,6 +20,15 @@ const (
// unblocked to re-enter the scheduler. A failed evaluation occurs under
// high contention when the schedulers plan does not make progress.
failedEvalUnblockInterval = 1 * time.Minute

// failedEvalFollowUpBaseLineWait is the minimum time waited before retrying
// a failed evaluation.
failedEvalFollowUpBaseLineWait = 1 * time.Minute

// failedEvalFollowUpWaitRange defines the the range of additional time from
// the minimum in which to wait before retrying a failed evaluation. A value
// from this range should be selected using a uniform distribution.
failedEvalFollowUpWaitRange = 9 * time.Minute
)

// monitorLeadership is used to monitor if we acquire or lose our role
Expand Down Expand Up @@ -392,9 +402,16 @@ func (s *Server) reapFailedEvaluations(stopCh chan struct{}) {
newEval.StatusDescription = fmt.Sprintf("evaluation reached delivery limit (%d)", s.config.EvalDeliveryLimit)
s.logger.Printf("[WARN] nomad: eval %#v reached delivery limit, marking as failed", newEval)

// Create a follow-up evaluation that will be used to retry the
// scheduling for the job after the cluster is hopefully more stable
// due to the fairly large backoff.
followupEvalWait := failedEvalFollowUpBaseLineWait +
time.Duration(rand.Int63n(int64(failedEvalFollowUpWaitRange)))
followupEval := eval.CreateFailedFollowUpEval(followupEvalWait)

// Update via Raft
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{newEval},
Evals: []*structs.Evaluation{newEval, followupEval},
}
if _, _, err := s.raftApply(structs.EvalUpdateRequestType, &req); err != nil {
s.logger.Printf("[ERR] nomad: failed to update failed eval %#v: %v", newEval, err)
Expand Down
42 changes: 40 additions & 2 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,15 +508,53 @@ func TestLeader_ReapFailedEval(t *testing.T) {
}
s1.evalBroker.Nack(out.ID, token)

// Wait updated evaluation
// Wait for an updated and followup evaluation
state := s1.fsm.State()
testutil.WaitForResult(func() (bool, error) {
ws := memdb.NewWatchSet()
out, err := state.EvalByID(ws, eval.ID)
if err != nil {
return false, err
}
return out != nil && out.Status == structs.EvalStatusFailed, nil
if out == nil {
return false, fmt.Errorf("expect original evaluation to exist")
}
if out.Status != structs.EvalStatusFailed {
return false, fmt.Errorf("got status %v; want %v", out.Status, structs.EvalStatusFailed)
}

// See if there is a followup
evals, err := state.EvalsByJob(ws, eval.JobID)
if err != nil {
return false, err
}

if l := len(evals); l != 2 {
return false, fmt.Errorf("got %d evals, want 2", l)
}

for _, e := range evals {
if e.ID == eval.ID {
continue
}

if e.Status != structs.EvalStatusPending {
return false, fmt.Errorf("follow up eval has status %v; want %v",
e.Status, structs.EvalStatusPending)
}

if e.Wait < failedEvalFollowUpBaseLineWait ||
e.Wait > failedEvalFollowUpBaseLineWait+failedEvalFollowUpWaitRange {
return false, fmt.Errorf("bad wait: %v", e.Wait)
}

if e.TriggeredBy != structs.EvalTriggerFailedFollowUp {
return false, fmt.Errorf("follow up eval TriggeredBy %v; want %v",
e.TriggeredBy, structs.EvalTriggerFailedFollowUp)
}
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
Expand Down
32 changes: 25 additions & 7 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3736,13 +3736,14 @@ const (
)

const (
EvalTriggerJobRegister = "job-register"
EvalTriggerJobDeregister = "job-deregister"
EvalTriggerPeriodicJob = "periodic-job"
EvalTriggerNodeUpdate = "node-update"
EvalTriggerScheduled = "scheduled"
EvalTriggerRollingUpdate = "rolling-update"
EvalTriggerMaxPlans = "max-plan-attempts"
EvalTriggerJobRegister = "job-register"
EvalTriggerJobDeregister = "job-deregister"
EvalTriggerPeriodicJob = "periodic-job"
EvalTriggerNodeUpdate = "node-update"
EvalTriggerScheduled = "scheduled"
EvalTriggerRollingUpdate = "rolling-update"
EvalTriggerFailedFollowUp = "failed-eval-follow-up"
EvalTriggerMaxPlans = "max-plan-attempts"
)

const (
Expand Down Expand Up @@ -3985,6 +3986,23 @@ func (e *Evaluation) CreateBlockedEval(classEligibility map[string]bool, escaped
}
}

// CreateFailedFollowUpEval creates a follow up evaluation when the current one
// has been marked as failed becasue it has hit the delivery limit and will not
// be retried by the eval_broker.
func (e *Evaluation) CreateFailedFollowUpEval(wait time.Duration) *Evaluation {
return &Evaluation{
ID: GenerateUUID(),
Priority: e.Priority,
Type: e.Type,
TriggeredBy: EvalTriggerFailedFollowUp,
JobID: e.JobID,
JobModifyIndex: e.JobModifyIndex,
Status: EvalStatusPending,
Wait: wait,
PreviousEval: e.ID,
}
}

// Plan is used to submit a commit plan for task allocations. These
// are submitted to the leader which verifies that resources have
// not been overcommitted before admiting the plan.
Expand Down

0 comments on commit 7e64557

Please sign in to comment.