From 8456f77220ae46206a921275574c1e75bf87c1a3 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 23 May 2016 16:27:26 -0700 Subject: [PATCH] Periodically unblock failed evaluations --- nomad/blocked_evals.go | 27 +++++++++++++++++++++++++++ nomad/blocked_evals_test.go | 31 +++++++++++++++++++++++++++++++ nomad/leader.go | 18 ++++++++++++++++++ nomad/structs/structs.go | 1 + scheduler/generic_sched.go | 18 +++++++++--------- 5 files changed, 86 insertions(+), 9 deletions(-) diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 558826937fb..1209c548c70 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -304,6 +304,33 @@ func (b *BlockedEvals) unblock(computedClass string, index uint64) { } } +// UnblockFailed unblocks all blocked evaluation that were due to scheduler +// failure. +func (b *BlockedEvals) UnblockFailed() { + b.l.Lock() + defer b.l.Unlock() + + // Do nothing if not enabled + if !b.enabled { + return + } + + var unblock []*structs.Evaluation + for _, eval := range b.captured { + if eval.TriggeredBy == structs.EvalTriggerMaxPlans { + unblock = append(unblock, eval) + } + } + + for _, eval := range b.escaped { + if eval.TriggeredBy == structs.EvalTriggerMaxPlans { + unblock = append(unblock, eval) + } + } + + b.evalBroker.EnqueueAll(unblock) +} + // GetDuplicates returns all the duplicate evaluations and blocks until the // passed timeout. func (b *BlockedEvals) GetDuplicates(timeout time.Duration) []*structs.Evaluation { diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index 32487d9ee77..9b963feffba 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -359,3 +359,34 @@ func TestBlockedEvals_Block_ImmediateUnblock_SeenClass(t *testing.T) { t.Fatalf("err: %s", err) }) } + +func TestBlockedEvals_UnblockFailed(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Create blocked evals that are due to failures + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.TriggeredBy = structs.EvalTriggerMaxPlans + e.EscapedComputedClass = true + blocked.Block(e) + + e2 := mock.Eval() + e2.Status = structs.EvalStatusBlocked + e2.TriggeredBy = structs.EvalTriggerMaxPlans + e2.ClassEligibility = map[string]bool{"v1:123": true, "v1:456": false} + blocked.Block(e2) + + // Trigger an unblock fail + blocked.UnblockFailed() + + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock caused an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 2 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} diff --git a/nomad/leader.go b/nomad/leader.go index 04f5f1432d5..58e94c588c8 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -143,6 +143,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Reap any duplicate blocked evaluations go s.reapDupBlockedEvaluations(stopCh) + // Periodically unblock failed allocations + go s.periodicUnblockFailedEvals(stopCh) + // Setup the heartbeat timers. This is done both when starting up or when // a leader fail over happens. Since the timers are maintained by the leader // node, effectively this means all the timers are renewed at the time of failover. @@ -341,6 +344,21 @@ func (s *Server) reapDupBlockedEvaluations(stopCh chan struct{}) { } } +// periodicUnblockFailedEvals periodically unblocks failed, blocked evaluations. +func (s *Server) periodicUnblockFailedEvals(stopCh chan struct{}) { + ticker := time.NewTimer(1 * time.Minute) + for { + select { + case <-stopCh: + ticker.Stop() + return + case <-ticker.C: + // Unblock the failed allocations + s.blockedEvals.UnblockFailed() + } + } +} + // revokeLeadership is invoked once we step down as leader. // This is used to cleanup any state that may be specific to a leader. func (s *Server) revokeLeadership() error { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 9b1aa66963c..64039cf511f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2570,6 +2570,7 @@ const ( EvalTriggerNodeUpdate = "node-update" EvalTriggerScheduled = "scheduled" EvalTriggerRollingUpdate = "rolling-update" + EvalTriggerMaxPlans = "max-plan-attempts" ) const ( diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 8276acc97e9..9db02a8137b 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -113,13 +113,8 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { if statusErr, ok := err.(*SetStatusError); ok { // Scheduling was tried but made no forward progress so create a // blocked eval to retry once resources become available. - - // TODO: Set the trigger by reason of the blocked eval here to - // something like "max-attempts" - // We can then periodically dequeue these from the blocked_eval - // tracker. var mErr multierror.Error - if err := s.createBlockedEval(); err != nil { + if err := s.createBlockedEval(true); err != nil { mErr.Errors = append(mErr.Errors, err) } if err := setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, statusErr.EvalStatus, err.Error()); err != nil { @@ -140,8 +135,9 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, structs.EvalStatusComplete, "") } -// createBlockedEval creates a blocked eval and stores it. -func (s *GenericScheduler) createBlockedEval() error { +// createBlockedEval creates a blocked eval and submits it to the planner. If +// failure is set to true, the eval's trigger reason reflects that. +func (s *GenericScheduler) createBlockedEval(planFailure bool) error { e := s.ctx.Eligibility() escaped := e.HasEscaped() @@ -152,6 +148,10 @@ func (s *GenericScheduler) createBlockedEval() error { } s.blocked = s.eval.CreateBlockedEval(classEligibility, escaped) + if planFailure { + s.blocked.TriggeredBy = structs.EvalTriggerMaxPlans + } + return s.planner.CreateEval(s.blocked) } @@ -191,7 +191,7 @@ func (s *GenericScheduler) process() (bool, error) { // to place the failed allocations when resources become available. If the // current evaluation is already a blocked eval, we reuse it. if s.eval.Status != structs.EvalStatusBlocked && len(s.eval.FailedTGAllocs) != 0 && s.blocked == nil { - if err := s.createBlockedEval(); err != nil { + if err := s.createBlockedEval(false); err != nil { s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err) return false, err }