Skip to content

Commit

Permalink
Leader reaps and cancels duplicate evals
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Feb 1, 2016
1 parent 5956bee commit a987dec
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 7 deletions.
38 changes: 38 additions & 0 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Reap any failed evaluations
go s.reapFailedEvaluations(stopCh)

// Reap any duplicate blocked evaluations
go s.reapDupBlockedEvaluations(stopCh)

// Setup the heartbeat timers. This is done both when starting up or when
// a leader fail over happens. Since the timers are maintained by the leader
// node, effectively this means all the timers are renewed at the time of failover.
Expand Down Expand Up @@ -301,6 +304,41 @@ func (s *Server) reapFailedEvaluations(stopCh chan struct{}) {
}
}

// reapDupBlockedEvaluations is used to reap duplicate blocked evaluations and
// should be cancelled.
func (s *Server) reapDupBlockedEvaluations(stopCh chan struct{}) {
for {
select {
case <-stopCh:
return
default:
// Scan for duplicate blocked evals.
dups := s.blockedEvals.GetDuplicates(time.Second)
if dups == nil {
continue
}

cancel := make([]*structs.Evaluation, len(dups))
for i, dup := range dups {
// Update the status to cancelled
newEval := dup.Copy()
newEval.Status = structs.EvalStatusCancelled
newEval.StatusDescription = fmt.Sprintf("existing blocked evaluation exists for job %q", newEval.JobID)
cancel[i] = newEval
}

// Update via Raft
req := structs.EvalUpdateRequest{
Evals: cancel,
}
if _, _, err := s.raftApply(structs.EvalUpdateRequestType, &req); err != nil {
s.logger.Printf("[ERR] nomad: failed to update duplicate evals %#v: %v", cancel, err)
continue
}
}
}
}

// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (s *Server) revokeLeadership() error {
Expand Down
27 changes: 27 additions & 0 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,3 +522,30 @@ func TestLeader_ReapFailedEval(t *testing.T) {
t.Fatalf("err: %v", err)
})
}

func TestLeader_ReapDuplicateEval(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Create a duplicate blocked eval
eval := mock.Eval()
eval2 := mock.Eval()
eval2.JobID = eval.JobID
s1.blockedEvals.Block(eval)
s1.blockedEvals.Block(eval2)

// Wait for the evaluation to marked as cancelled
state := s1.fsm.State()
testutil.WaitForResult(func() (bool, error) {
out, err := state.EvalByID(eval2.ID)
if err != nil {
return false, err
}
return out != nil && out.Status == structs.EvalStatusCancelled, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
15 changes: 8 additions & 7 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1824,10 +1824,11 @@ func (a *AllocMetric) ScoreNode(node *Node, name string, score float64) {
}

const (
EvalStatusBlocked = "blocked"
EvalStatusPending = "pending"
EvalStatusComplete = "complete"
EvalStatusFailed = "failed"
EvalStatusBlocked = "blocked"
EvalStatusPending = "pending"
EvalStatusComplete = "complete"
EvalStatusFailed = "failed"
EvalStatusCancelled = "cancelled"
)

const (
Expand Down Expand Up @@ -1930,7 +1931,7 @@ type Evaluation struct {
// will no longer transition.
func (e *Evaluation) TerminalStatus() bool {
switch e.Status {
case EvalStatusComplete, EvalStatusFailed:
case EvalStatusComplete, EvalStatusFailed, EvalStatusCancelled:
return true
default:
return false
Expand All @@ -1953,7 +1954,7 @@ func (e *Evaluation) ShouldEnqueue() bool {
switch e.Status {
case EvalStatusPending:
return true
case EvalStatusComplete, EvalStatusFailed, EvalStatusBlocked:
case EvalStatusComplete, EvalStatusFailed, EvalStatusBlocked, EvalStatusCancelled:
return false
default:
panic(fmt.Sprintf("unhandled evaluation (%s) status %s", e.ID, e.Status))
Expand All @@ -1966,7 +1967,7 @@ func (e *Evaluation) ShouldBlock() bool {
switch e.Status {
case EvalStatusBlocked:
return true
case EvalStatusComplete, EvalStatusFailed, EvalStatusPending:
case EvalStatusComplete, EvalStatusFailed, EvalStatusPending, EvalStatusCancelled:
return false
default:
panic(fmt.Sprintf("unhandled evaluation (%s) status %s", e.ID, e.Status))
Expand Down

0 comments on commit a987dec

Please sign in to comment.