From 57ed87324fcccbbfbe8dac212efdb1dbe57be940 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 12 Apr 2017 13:39:19 -0700 Subject: [PATCH 1/5] Delay Nack re-enqueue Add a delay when an evaluation is nacked that starts off small but compounds to a larger delay for subsequent Nacks. This creates some back pressure. --- nomad/eval_broker.go | 84 ++++++++++--- nomad/eval_broker_test.go | 247 ++++++++++++++++++++++++++++++++------ 2 files changed, 278 insertions(+), 53 deletions(-) diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 783ad84f0ac..3733a28849d 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -28,6 +28,14 @@ var ( // ErrNackTimeoutReached is returned if an expired evaluation is reset ErrNackTimeoutReached = errors.New("evaluation nack timeout reached") + + // initialNackReenqueueDelay is the delay applied before re-enqueuing a + // Nacked evaluation for the first time + initialNackReenqueueDelay = time.Second + + // subsequentNackReenqueueDelay is a compounding delay applied on each + // subsequent Nack for an evaluation. + subsequentNackReenqueueDelay = 20 * time.Second ) // EvalBroker is used to manage brokering of evaluations. When an evaluation is @@ -76,6 +84,10 @@ type EvalBroker struct { // timeWait has evaluations that are waiting for time to elapse timeWait map[string]*time.Timer + // Nack delays are defaulted and are only made available for testing + initialNackDelay time.Duration + subsequentNackDelay time.Duration + l sync.RWMutex } @@ -100,18 +112,20 @@ func NewEvalBroker(timeout time.Duration, deliveryLimit int) (*EvalBroker, error return nil, fmt.Errorf("timeout cannot be negative") } b := &EvalBroker{ - nackTimeout: timeout, - deliveryLimit: deliveryLimit, - enabled: false, - stats: new(BrokerStats), - evals: make(map[string]int), - jobEvals: make(map[string]string), - blocked: make(map[string]PendingEvaluations), - ready: make(map[string]PendingEvaluations), - unack: make(map[string]*unackEval), - waiting: make(map[string]chan struct{}), - requeue: make(map[string]*structs.Evaluation), - timeWait: make(map[string]*time.Timer), + nackTimeout: timeout, + deliveryLimit: deliveryLimit, + enabled: false, + stats: new(BrokerStats), + evals: make(map[string]int), + jobEvals: make(map[string]string), + blocked: make(map[string]PendingEvaluations), + ready: make(map[string]PendingEvaluations), + unack: make(map[string]*unackEval), + waiting: make(map[string]chan struct{}), + requeue: make(map[string]*structs.Evaluation), + timeWait: make(map[string]*time.Timer), + initialNackDelay: initialNackReenqueueDelay, + subsequentNackDelay: subsequentNackReenqueueDelay, } b.stats.ByScheduler = make(map[string]*SchedulerStats) return b, nil @@ -187,17 +201,23 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) { // Check if we need to enforce a wait if eval.Wait > 0 { - timer := time.AfterFunc(eval.Wait, func() { - b.enqueueWaiting(eval) - }) - b.timeWait[eval.ID] = timer - b.stats.TotalWaiting += 1 + b.processWaitingEnqueue(eval) return } b.enqueueLocked(eval, eval.Type) } +// processWaitingEnqueue waits the given duration on the evaluation before +// enqueueing. +func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) { + timer := time.AfterFunc(eval.Wait, func() { + b.enqueueWaiting(eval) + }) + b.timeWait[eval.ID] = timer + b.stats.TotalWaiting += 1 +} + // enqueueWaiting is used to enqueue a waiting evaluation func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) { b.l.Lock() @@ -547,14 +567,40 @@ func (b *EvalBroker) Nack(evalID, token string) error { // Check if we've hit the delivery limit, and re-enqueue // in the failedQueue - if b.evals[evalID] >= b.deliveryLimit { + if dequeues := b.evals[evalID]; dequeues >= b.deliveryLimit { b.enqueueLocked(unack.Eval, failedQueue) } else { - b.enqueueLocked(unack.Eval, unack.Eval.Type) + e := unack.Eval + e.Wait = b.nackReenqueueDelay(e, dequeues) + + // See if there should be a delay before re-enqueuing + if e.Wait > 0 { + b.processWaitingEnqueue(e) + } else { + b.enqueueLocked(e, e.Type) + } } + return nil } +// nackReenqueueDelay is used to determine the delay that should be applied on +// the evaluation given the number of previous attempts +func (b *EvalBroker) nackReenqueueDelay(eval *structs.Evaluation, prevDequeues int) time.Duration { + var delay time.Duration + + switch { + case prevDequeues <= 0: + case prevDequeues == 1: + delay = b.initialNackDelay + default: + // For each subsequent nack compound a delay + delay = time.Duration(prevDequeues-1) * b.subsequentNackDelay + } + + return delay +} + // PauseNackTimeout is used to pause the Nack timeout for an eval that is making // progress but is in a potentially unbounded operation such as the plan queue. func (b *EvalBroker) PauseNackTimeout(evalID, token string) error { diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index ab6ecedf037..c263fb1ef84 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -1,11 +1,13 @@ package nomad import ( + "fmt" "testing" "time" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" ) var ( @@ -23,6 +25,11 @@ func testBroker(t *testing.T, timeout time.Duration) *EvalBroker { if err != nil { t.Fatalf("err: %v", err) } + + // Tune the Nack delay + b.initialNackDelay = 5 * time.Millisecond + b.subsequentNackDelay = 50 * time.Millisecond + return b } @@ -125,20 +132,150 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { t.Fatalf("should not be outstanding") } + // Check the stats + testutil.WaitForResult(func() (bool, error) { + stats = b.Stats() + if stats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalWaiting != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Ready != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Unacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + + return true, nil + }, func(e error) { + t.Fatal(e) + }) + + // Dequeue should work again + out2, token2, err := b.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if out2 != eval { + t.Fatalf("bad : %#v", out2) + } + if token2 == token { + t.Fatalf("should get a new token") + } + + tokenOut2, ok := b.Outstanding(out.ID) + if !ok { + t.Fatalf("should be outstanding") + } + if tokenOut2 != token2 { + t.Fatalf("Bad: %#v %#v", token2, tokenOut2) + } + + // Ack with wrong token + err = b.Ack(eval.ID, "zip") + if err == nil { + t.Fatalf("should fail to ack") + } + + // Ack finally + err = b.Ack(eval.ID, token2) + if err != nil { + t.Fatalf("err: %v", err) + } + + if _, ok := b.Outstanding(out.ID); ok { + t.Fatalf("should not be outstanding") + } + // Check the stats stats = b.Stats() - if stats.TotalReady != 1 { + if stats.TotalReady != 0 { t.Fatalf("bad: %#v", stats) } if stats.TotalUnacked != 0 { t.Fatalf("bad: %#v", stats) } - if stats.ByScheduler[eval.Type].Ready != 1 { + if stats.ByScheduler[eval.Type].Ready != 0 { t.Fatalf("bad: %#v", stats) } if stats.ByScheduler[eval.Type].Unacked != 0 { t.Fatalf("bad: %#v", stats) } +} + +func TestEvalBroker_Nack_Delay(t *testing.T) { + b := testBroker(t, 0) + + // Enqueue, but broker is disabled! + b.SetEnabled(true) + eval := mock.Eval() + b.Enqueue(eval) + + // Dequeue should work + out, token, err := b.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != eval { + t.Fatalf("bad : %#v", out) + } + + // Nack back into the queue + err = b.Nack(eval.ID, token) + if err != nil { + t.Fatalf("err: %v", err) + } + + if _, ok := b.Outstanding(out.ID); ok { + t.Fatalf("should not be outstanding") + } + + // Check the stats to ensure that it is waiting + stats := b.Stats() + if stats.TotalReady != 0 { + t.Fatalf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + t.Fatalf("bad: %#v", stats) + } + if stats.TotalWaiting != 1 { + t.Fatalf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Ready != 0 { + t.Fatalf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Unacked != 0 { + t.Fatalf("bad: %#v", stats) + } + + // Now wait for it to be re-enqueued + testutil.WaitForResult(func() (bool, error) { + stats = b.Stats() + if stats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalWaiting != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Ready != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Unacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + + return true, nil + }, func(e error) { + t.Fatal(e) + }) // Dequeue should work again out2, token2, err := b.Dequeue(defaultSched, time.Second) @@ -152,22 +289,58 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { t.Fatalf("should get a new token") } - tokenOut2, ok := b.Outstanding(out.ID) - if !ok { - t.Fatalf("should be outstanding") + // Capture the time + start := time.Now() + + // Nack back into the queue + err = b.Nack(eval.ID, token2) + if err != nil { + t.Fatalf("err: %v", err) } - if tokenOut2 != token2 { - t.Fatalf("Bad: %#v %#v", token2, tokenOut2) + + // Now wait for it to be re-enqueued + testutil.WaitForResult(func() (bool, error) { + stats = b.Stats() + if stats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalWaiting != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Ready != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Unacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + + return true, nil + }, func(e error) { + t.Fatal(e) + }) + + delay := time.Now().Sub(start) + if delay < b.subsequentNackDelay { + t.Fatalf("bad: delay was %v; want at least %v", delay, b.subsequentNackDelay) } - // Ack with wrong token - err = b.Ack(eval.ID, "zip") - if err == nil { - t.Fatalf("should fail to ack") + // Dequeue should work again + out3, token3, err := b.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if out3 != eval { + t.Fatalf("bad : %#v", out3) + } + if token3 == token || token3 == token2 { + t.Fatalf("should get a new token") } // Ack finally - err = b.Ack(eval.ID, token2) + err = b.Ack(eval.ID, token3) if err != nil { t.Fatalf("err: %v", err) } @@ -472,7 +645,7 @@ func TestEvalBroker_Dequeue_FIFO(t *testing.T) { func TestEvalBroker_Dequeue_Fairness(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) - NUM := 100 + NUM := 1000 for i := 0; i < NUM; i++ { eval1 := mock.Eval() @@ -503,7 +676,7 @@ func TestEvalBroker_Dequeue_Fairness(t *testing.T) { // This will fail randomly at times. It is very hard to // test deterministically that its acting randomly. - if counter >= 25 || counter <= -25 { + if counter >= 250 || counter <= -250 { t.Fatalf("unlikely sequence: %d", counter) } } @@ -584,7 +757,7 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) { // Ensure we nack in a timely manner func TestEvalBroker_Nack_TimeoutReset(t *testing.T) { - b := testBroker(t, 5*time.Millisecond) + b := testBroker(t, 50*time.Millisecond) b.SetEnabled(true) // Enqueue @@ -601,8 +774,8 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) { t.Fatalf("bad: %v", out) } - // Reset in 2 milliseconds - time.Sleep(2 * time.Millisecond) + // Reset in 20 milliseconds + time.Sleep(20 * time.Millisecond) if err := b.OutstandingReset(out.ID, token); err != nil { t.Fatalf("err: %v", err) } @@ -618,13 +791,13 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) { } // Check the nack timer - if diff := end.Sub(start); diff < 7*time.Millisecond { + if diff := end.Sub(start); diff < 75*time.Millisecond { t.Fatalf("bad: %#v", diff) } } func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) { - b := testBroker(t, 5*time.Millisecond) + b := testBroker(t, 50*time.Millisecond) b.SetEnabled(true) // Enqueue @@ -641,14 +814,14 @@ func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) { t.Fatalf("bad: %v", out) } - // Pause in 2 milliseconds - time.Sleep(2 * time.Millisecond) + // Pause in 20 milliseconds + time.Sleep(20 * time.Millisecond) if err := b.PauseNackTimeout(out.ID, token); err != nil { t.Fatalf("err: %v", err) } go func() { - time.Sleep(2 * time.Millisecond) + time.Sleep(20 * time.Millisecond) if err := b.ResumeNackTimeout(out.ID, token); err != nil { t.Fatalf("err: %v", err) } @@ -665,7 +838,7 @@ func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) { } // Check the nack timer - if diff := end.Sub(start); diff < 9*time.Millisecond { + if diff := end.Sub(start); diff < 95*time.Millisecond { t.Fatalf("bad: %#v", diff) } } @@ -820,7 +993,7 @@ func TestEvalBroker_Wait(t *testing.T) { } // Let the wait elapse - time.Sleep(15 * time.Millisecond) + time.Sleep(20 * time.Millisecond) // Verify ready stats = b.Stats() @@ -976,14 +1149,20 @@ func TestEvalBroker_EnqueueAll_Requeue_Nack(t *testing.T) { } // Check stats again as this should cause the re-enqueued one to be dropped - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if len(b.requeue) != 0 { - t.Fatalf("bad: %#v", b.requeue) - } + testutil.WaitForResult(func() (bool, error) { + stats = b.Stats() + if stats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if len(b.requeue) != 0 { + return false, fmt.Errorf("bad: %#v", b.requeue) + } + + return true, nil + }, func(e error) { + t.Fatal(e) + }) } From 7e64557e18c19dc8cf63bbc1662cc9ffeb0180a7 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 12 Apr 2017 14:47:59 -0700 Subject: [PATCH 2/5] Reaping failed evaluations creates follow up eval Create a follow up evaluation when reaping failed evaluations. This ensures that a job will still make eventual progress. --- nomad/leader.go | 19 +++++++++++++++++- nomad/leader_test.go | 42 ++++++++++++++++++++++++++++++++++++++-- nomad/structs/structs.go | 32 +++++++++++++++++++++++------- 3 files changed, 83 insertions(+), 10 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index e938bbee586..e9439d39a38 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math/rand" "net" "time" @@ -19,6 +20,15 @@ const ( // unblocked to re-enter the scheduler. A failed evaluation occurs under // high contention when the schedulers plan does not make progress. failedEvalUnblockInterval = 1 * time.Minute + + // failedEvalFollowUpBaseLineWait is the minimum time waited before retrying + // a failed evaluation. + failedEvalFollowUpBaseLineWait = 1 * time.Minute + + // failedEvalFollowUpWaitRange defines the the range of additional time from + // the minimum in which to wait before retrying a failed evaluation. A value + // from this range should be selected using a uniform distribution. + failedEvalFollowUpWaitRange = 9 * time.Minute ) // monitorLeadership is used to monitor if we acquire or lose our role @@ -392,9 +402,16 @@ func (s *Server) reapFailedEvaluations(stopCh chan struct{}) { newEval.StatusDescription = fmt.Sprintf("evaluation reached delivery limit (%d)", s.config.EvalDeliveryLimit) s.logger.Printf("[WARN] nomad: eval %#v reached delivery limit, marking as failed", newEval) + // Create a follow-up evaluation that will be used to retry the + // scheduling for the job after the cluster is hopefully more stable + // due to the fairly large backoff. + followupEvalWait := failedEvalFollowUpBaseLineWait + + time.Duration(rand.Int63n(int64(failedEvalFollowUpWaitRange))) + followupEval := eval.CreateFailedFollowUpEval(followupEvalWait) + // Update via Raft req := structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{newEval}, + Evals: []*structs.Evaluation{newEval, followupEval}, } if _, _, err := s.raftApply(structs.EvalUpdateRequestType, &req); err != nil { s.logger.Printf("[ERR] nomad: failed to update failed eval %#v: %v", newEval, err) diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 70822eab144..1700cc4c037 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -508,7 +508,7 @@ func TestLeader_ReapFailedEval(t *testing.T) { } s1.evalBroker.Nack(out.ID, token) - // Wait updated evaluation + // Wait for an updated and followup evaluation state := s1.fsm.State() testutil.WaitForResult(func() (bool, error) { ws := memdb.NewWatchSet() @@ -516,7 +516,45 @@ func TestLeader_ReapFailedEval(t *testing.T) { if err != nil { return false, err } - return out != nil && out.Status == structs.EvalStatusFailed, nil + if out == nil { + return false, fmt.Errorf("expect original evaluation to exist") + } + if out.Status != structs.EvalStatusFailed { + return false, fmt.Errorf("got status %v; want %v", out.Status, structs.EvalStatusFailed) + } + + // See if there is a followup + evals, err := state.EvalsByJob(ws, eval.JobID) + if err != nil { + return false, err + } + + if l := len(evals); l != 2 { + return false, fmt.Errorf("got %d evals, want 2", l) + } + + for _, e := range evals { + if e.ID == eval.ID { + continue + } + + if e.Status != structs.EvalStatusPending { + return false, fmt.Errorf("follow up eval has status %v; want %v", + e.Status, structs.EvalStatusPending) + } + + if e.Wait < failedEvalFollowUpBaseLineWait || + e.Wait > failedEvalFollowUpBaseLineWait+failedEvalFollowUpWaitRange { + return false, fmt.Errorf("bad wait: %v", e.Wait) + } + + if e.TriggeredBy != structs.EvalTriggerFailedFollowUp { + return false, fmt.Errorf("follow up eval TriggeredBy %v; want %v", + e.TriggeredBy, structs.EvalTriggerFailedFollowUp) + } + } + + return true, nil }, func(err error) { t.Fatalf("err: %v", err) }) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 868767a1a1e..bb633ebede5 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3736,13 +3736,14 @@ const ( ) const ( - EvalTriggerJobRegister = "job-register" - EvalTriggerJobDeregister = "job-deregister" - EvalTriggerPeriodicJob = "periodic-job" - EvalTriggerNodeUpdate = "node-update" - EvalTriggerScheduled = "scheduled" - EvalTriggerRollingUpdate = "rolling-update" - EvalTriggerMaxPlans = "max-plan-attempts" + EvalTriggerJobRegister = "job-register" + EvalTriggerJobDeregister = "job-deregister" + EvalTriggerPeriodicJob = "periodic-job" + EvalTriggerNodeUpdate = "node-update" + EvalTriggerScheduled = "scheduled" + EvalTriggerRollingUpdate = "rolling-update" + EvalTriggerFailedFollowUp = "failed-eval-follow-up" + EvalTriggerMaxPlans = "max-plan-attempts" ) const ( @@ -3985,6 +3986,23 @@ func (e *Evaluation) CreateBlockedEval(classEligibility map[string]bool, escaped } } +// CreateFailedFollowUpEval creates a follow up evaluation when the current one +// has been marked as failed becasue it has hit the delivery limit and will not +// be retried by the eval_broker. +func (e *Evaluation) CreateFailedFollowUpEval(wait time.Duration) *Evaluation { + return &Evaluation{ + ID: GenerateUUID(), + Priority: e.Priority, + Type: e.Type, + TriggeredBy: EvalTriggerFailedFollowUp, + JobID: e.JobID, + JobModifyIndex: e.JobModifyIndex, + Status: EvalStatusPending, + Wait: wait, + PreviousEval: e.ID, + } +} + // Plan is used to submit a commit plan for task allocations. These // are submitted to the leader which verifies that resources have // not been overcommitted before admiting the plan. From 53f95408ebf0091ed2d675b1605ca63df1f1fa2c Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 14 Apr 2017 13:19:14 -0700 Subject: [PATCH 3/5] Easy feedback fixes --- nomad/leader.go | 12 ++++++------ nomad/structs/structs.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index e9439d39a38..503553966bb 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -397,10 +397,10 @@ func (s *Server) reapFailedEvaluations(stopCh chan struct{}) { } // Update the status to failed - newEval := eval.Copy() - newEval.Status = structs.EvalStatusFailed - newEval.StatusDescription = fmt.Sprintf("evaluation reached delivery limit (%d)", s.config.EvalDeliveryLimit) - s.logger.Printf("[WARN] nomad: eval %#v reached delivery limit, marking as failed", newEval) + updateEval := eval.Copy() + updateEval.Status = structs.EvalStatusFailed + updateEval.StatusDescription = fmt.Sprintf("evaluation reached delivery limit (%d)", s.config.EvalDeliveryLimit) + s.logger.Printf("[WARN] nomad: eval %#v reached delivery limit, marking as failed", updateEval) // Create a follow-up evaluation that will be used to retry the // scheduling for the job after the cluster is hopefully more stable @@ -411,10 +411,10 @@ func (s *Server) reapFailedEvaluations(stopCh chan struct{}) { // Update via Raft req := structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{newEval, followupEval}, + Evals: []*structs.Evaluation{updateEval, followupEval}, } if _, _, err := s.raftApply(structs.EvalUpdateRequestType, &req); err != nil { - s.logger.Printf("[ERR] nomad: failed to update failed eval %#v: %v", newEval, err) + s.logger.Printf("[ERR] nomad: failed to update failed eval %#v and create a follow-up: %v", updateEval, err) continue } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index bb633ebede5..57376279ca1 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3742,7 +3742,7 @@ const ( EvalTriggerNodeUpdate = "node-update" EvalTriggerScheduled = "scheduled" EvalTriggerRollingUpdate = "rolling-update" - EvalTriggerFailedFollowUp = "failed-eval-follow-up" + EvalTriggerFailedFollowUp = "failed-follow-up" EvalTriggerMaxPlans = "max-plan-attempts" ) From 3062b26ddd4d2e59824212809bdadcabf998597b Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 14 Apr 2017 15:24:55 -0700 Subject: [PATCH 4/5] Push to configs --- nomad/config.go | 82 ++++++++++++++++++++++++++------------- nomad/eval_broker.go | 32 +++++++-------- nomad/eval_broker_test.go | 29 ++++++++++---- nomad/leader.go | 13 +------ nomad/leader_test.go | 4 +- nomad/server.go | 6 ++- 6 files changed, 99 insertions(+), 67 deletions(-) diff --git a/nomad/config.go b/nomad/config.go index e843ba89503..92e16b1e71f 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -158,6 +158,30 @@ type Config struct { // complete eventually fails out of the system. EvalDeliveryLimit int + // EvalNackInitialReenqueueDelay is the delay applied before reenqueuing a + // Nacked evaluation for the first time. This value should be small as the + // initial Nack can be due to a down machine and the eval should be retried + // quickly for liveliness. + EvalNackInitialReenqueueDelay time.Duration + + // EvalNackSubsequentReenqueueDelay is the delay applied before reenqueuing + // an evaluation that has been Nacked more than once. This delay is + // compounding after the first Nack. This value should be significantly + // longer than the initial delay as the purpose it severs is to apply + // back-pressure as evaluatiions are being Nacked either due to scheduler + // failures or because they are hitting their Nack timeout, both of which + // are signs of high server resource usage. + EvalNackSubsequentReenqueueDelay time.Duration + + // EvalFailedFollowupBaselineDelay is the minimum time waited before + // retrying a failed evaluation. + EvalFailedFollowupBaselineDelay time.Duration + + // EvalFailedFollowupDelayRange defines the range of additional time from + // the baseline in which to wait before retrying a failed evaluation. The + // additional delay is selected from this range randomly. + EvalFailedFollowupDelayRange time.Duration + // MinHeartbeatTTL is the minimum time between heartbeats. // This is used as a floor to prevent excessive updates. MinHeartbeatTTL time.Duration @@ -214,33 +238,37 @@ func DefaultConfig() *Config { } c := &Config{ - Region: DefaultRegion, - Datacenter: DefaultDC, - NodeName: hostname, - ProtocolVersion: ProtocolVersionMax, - RaftConfig: raft.DefaultConfig(), - RaftTimeout: 10 * time.Second, - LogOutput: os.Stderr, - RPCAddr: DefaultRPCAddr, - SerfConfig: serf.DefaultConfig(), - NumSchedulers: 1, - ReconcileInterval: 60 * time.Second, - EvalGCInterval: 5 * time.Minute, - EvalGCThreshold: 1 * time.Hour, - JobGCInterval: 5 * time.Minute, - JobGCThreshold: 4 * time.Hour, - NodeGCInterval: 5 * time.Minute, - NodeGCThreshold: 24 * time.Hour, - EvalNackTimeout: 60 * time.Second, - EvalDeliveryLimit: 3, - MinHeartbeatTTL: 10 * time.Second, - MaxHeartbeatsPerSecond: 50.0, - HeartbeatGrace: 10 * time.Second, - FailoverHeartbeatTTL: 300 * time.Second, - ConsulConfig: config.DefaultConsulConfig(), - VaultConfig: config.DefaultVaultConfig(), - RPCHoldTimeout: 5 * time.Second, - TLSConfig: &config.TLSConfig{}, + Region: DefaultRegion, + Datacenter: DefaultDC, + NodeName: hostname, + ProtocolVersion: ProtocolVersionMax, + RaftConfig: raft.DefaultConfig(), + RaftTimeout: 10 * time.Second, + LogOutput: os.Stderr, + RPCAddr: DefaultRPCAddr, + SerfConfig: serf.DefaultConfig(), + NumSchedulers: 1, + ReconcileInterval: 60 * time.Second, + EvalGCInterval: 5 * time.Minute, + EvalGCThreshold: 1 * time.Hour, + JobGCInterval: 5 * time.Minute, + JobGCThreshold: 4 * time.Hour, + NodeGCInterval: 5 * time.Minute, + NodeGCThreshold: 24 * time.Hour, + EvalNackTimeout: 60 * time.Second, + EvalDeliveryLimit: 3, + EvalNackInitialReenqueueDelay: 1 * time.Second, + EvalNackSubsequentReenqueueDelay: 20 * time.Second, + EvalFailedFollowupBaselineDelay: 1 * time.Minute, + EvalFailedFollowupDelayRange: 5 * time.Minute, + MinHeartbeatTTL: 10 * time.Second, + MaxHeartbeatsPerSecond: 50.0, + HeartbeatGrace: 10 * time.Second, + FailoverHeartbeatTTL: 300 * time.Second, + ConsulConfig: config.DefaultConsulConfig(), + VaultConfig: config.DefaultVaultConfig(), + RPCHoldTimeout: 5 * time.Second, + TLSConfig: &config.TLSConfig{}, } // Enable all known schedulers by default diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 3733a28849d..92fd0bc8a70 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -28,14 +28,6 @@ var ( // ErrNackTimeoutReached is returned if an expired evaluation is reset ErrNackTimeoutReached = errors.New("evaluation nack timeout reached") - - // initialNackReenqueueDelay is the delay applied before re-enqueuing a - // Nacked evaluation for the first time - initialNackReenqueueDelay = time.Second - - // subsequentNackReenqueueDelay is a compounding delay applied on each - // subsequent Nack for an evaluation. - subsequentNackReenqueueDelay = 20 * time.Second ) // EvalBroker is used to manage brokering of evaluations. When an evaluation is @@ -84,8 +76,13 @@ type EvalBroker struct { // timeWait has evaluations that are waiting for time to elapse timeWait map[string]*time.Timer - // Nack delays are defaulted and are only made available for testing - initialNackDelay time.Duration + // initialNackDelay is the delay applied before reenqueuing a + // Nacked evaluation for the first time. + initialNackDelay time.Duration + + // subsequentNackDelay is the delay applied before reenqueuing + // an evaluation that has been Nacked more than once. This delay is + // compounding after the first Nack. subsequentNackDelay time.Duration l sync.RWMutex @@ -107,7 +104,7 @@ type PendingEvaluations []*structs.Evaluation // with the timeout used for messages that are not acknowledged before we // assume a Nack and attempt to redeliver as well as the deliveryLimit // which prevents a failing eval from being endlessly delivered. -func NewEvalBroker(timeout time.Duration, deliveryLimit int) (*EvalBroker, error) { +func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) { if timeout < 0 { return nil, fmt.Errorf("timeout cannot be negative") } @@ -124,8 +121,8 @@ func NewEvalBroker(timeout time.Duration, deliveryLimit int) (*EvalBroker, error waiting: make(map[string]chan struct{}), requeue: make(map[string]*structs.Evaluation), timeWait: make(map[string]*time.Timer), - initialNackDelay: initialNackReenqueueDelay, - subsequentNackDelay: subsequentNackReenqueueDelay, + initialNackDelay: initialNackDelay, + subsequentNackDelay: subsequentNackDelay, } b.stats.ByScheduler = make(map[string]*SchedulerStats) return b, nil @@ -587,18 +584,15 @@ func (b *EvalBroker) Nack(evalID, token string) error { // nackReenqueueDelay is used to determine the delay that should be applied on // the evaluation given the number of previous attempts func (b *EvalBroker) nackReenqueueDelay(eval *structs.Evaluation, prevDequeues int) time.Duration { - var delay time.Duration - switch { case prevDequeues <= 0: + return 0 case prevDequeues == 1: - delay = b.initialNackDelay + return b.initialNackDelay default: // For each subsequent nack compound a delay - delay = time.Duration(prevDequeues-1) * b.subsequentNackDelay + return time.Duration(prevDequeues-1) * b.subsequentNackDelay } - - return delay } // PauseNackTimeout is used to pause the Nack timeout for an eval that is making diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index c263fb1ef84..2c1036ef7ac 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -17,19 +17,34 @@ var ( } ) +func testBrokerConfig() *Config { + config := DefaultConfig() + + // Tune the Nack timeout + config.EvalNackTimeout = 5 * time.Second + + // Tune the Nack delay + config.EvalNackInitialReenqueueDelay = 5 * time.Millisecond + config.EvalNackSubsequentReenqueueDelay = 50 * time.Millisecond + return config +} + func testBroker(t *testing.T, timeout time.Duration) *EvalBroker { - if timeout == 0 { - timeout = 5 * time.Second + config := testBrokerConfig() + + if timeout != 0 { + config.EvalNackTimeout = timeout } - b, err := NewEvalBroker(timeout, 3) + + return testBrokerFromConfig(t, config) +} + +func testBrokerFromConfig(t *testing.T, c *Config) *EvalBroker { + b, err := NewEvalBroker(c.EvalNackTimeout, c.EvalNackInitialReenqueueDelay, c.EvalNackSubsequentReenqueueDelay, 3) if err != nil { t.Fatalf("err: %v", err) } - // Tune the Nack delay - b.initialNackDelay = 5 * time.Millisecond - b.subsequentNackDelay = 50 * time.Millisecond - return b } diff --git a/nomad/leader.go b/nomad/leader.go index 503553966bb..275f84de9ac 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -20,15 +20,6 @@ const ( // unblocked to re-enter the scheduler. A failed evaluation occurs under // high contention when the schedulers plan does not make progress. failedEvalUnblockInterval = 1 * time.Minute - - // failedEvalFollowUpBaseLineWait is the minimum time waited before retrying - // a failed evaluation. - failedEvalFollowUpBaseLineWait = 1 * time.Minute - - // failedEvalFollowUpWaitRange defines the the range of additional time from - // the minimum in which to wait before retrying a failed evaluation. A value - // from this range should be selected using a uniform distribution. - failedEvalFollowUpWaitRange = 9 * time.Minute ) // monitorLeadership is used to monitor if we acquire or lose our role @@ -405,8 +396,8 @@ func (s *Server) reapFailedEvaluations(stopCh chan struct{}) { // Create a follow-up evaluation that will be used to retry the // scheduling for the job after the cluster is hopefully more stable // due to the fairly large backoff. - followupEvalWait := failedEvalFollowUpBaseLineWait + - time.Duration(rand.Int63n(int64(failedEvalFollowUpWaitRange))) + followupEvalWait := s.config.EvalFailedFollowupBaselineDelay + + time.Duration(rand.Int63n(int64(s.config.EvalFailedFollowupDelayRange))) followupEval := eval.CreateFailedFollowUpEval(followupEvalWait) // Update via Raft diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 1700cc4c037..eda24c2b631 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -543,8 +543,8 @@ func TestLeader_ReapFailedEval(t *testing.T) { e.Status, structs.EvalStatusPending) } - if e.Wait < failedEvalFollowUpBaseLineWait || - e.Wait > failedEvalFollowUpBaseLineWait+failedEvalFollowUpWaitRange { + if e.Wait < s1.config.EvalFailedFollowupBaselineDelay || + e.Wait > s1.config.EvalFailedFollowupBaselineDelay+s1.config.EvalFailedFollowupDelayRange { return false, fmt.Errorf("bad wait: %v", e.Wait) } diff --git a/nomad/server.go b/nomad/server.go index 8089f151c01..efed68cebdf 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -174,7 +174,11 @@ func NewServer(config *Config, consulSyncer *consul.Syncer, logger *log.Logger) } // Create an eval broker - evalBroker, err := NewEvalBroker(config.EvalNackTimeout, config.EvalDeliveryLimit) + evalBroker, err := NewEvalBroker( + config.EvalNackTimeout, + config.EvalNackInitialReenqueueDelay, + config.EvalNackSubsequentReenqueueDelay, + config.EvalDeliveryLimit) if err != nil { return nil, err } From d3807db78fc67fd82a8d5c9507e484e0773a4be4 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 14 Apr 2017 15:26:54 -0700 Subject: [PATCH 5/5] NewEvalBroker comment --- nomad/eval_broker.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 92fd0bc8a70..4835ee07dc4 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -103,7 +103,10 @@ type PendingEvaluations []*structs.Evaluation // NewEvalBroker creates a new evaluation broker. This is parameterized // with the timeout used for messages that are not acknowledged before we // assume a Nack and attempt to redeliver as well as the deliveryLimit -// which prevents a failing eval from being endlessly delivered. +// which prevents a failing eval from being endlessly delivered. The +// initialNackDelay is the delay before making a Nacked evalution available +// again for the first Nack and subsequentNackDelay is the compounding delay +// after the first Nack. func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) { if timeout < 0 { return nil, fmt.Errorf("timeout cannot be negative")