diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 97c76eafaec..6e5a3ed5991 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -510,6 +510,40 @@ func (b *EvalBroker) Nack(evalID, token string) error { return nil } +// PauseNackTimeout is used to pause the Nack timeout for an eval that is making +// progress but is in a potentially unbounded operation such as the plan queue. +func (b *EvalBroker) PauseNackTimeout(evalID, token string) error { + b.l.RLock() + defer b.l.RUnlock() + unack, ok := b.unack[evalID] + if !ok { + return ErrNotOutstanding + } + if unack.Token != token { + return ErrTokenMismatch + } + if !unack.NackTimer.Stop() { + return ErrNackTimeoutReached + } + return nil +} + +// ResumeNackTimeout is used to resume the Nack timeout for an eval that was +// paused. It should be resumed after leaving an unbounded operation. +func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error { + b.l.Lock() + defer b.l.Unlock() + unack, ok := b.unack[evalID] + if !ok { + return ErrNotOutstanding + } + if unack.Token != token { + return ErrTokenMismatch + } + unack.NackTimer.Reset(b.nackTimeout) + return nil +} + // Flush is used to clear the state of the broker func (b *EvalBroker) Flush() { b.l.Lock() diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index b918b4e726c..b81fbd94426 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -656,6 +656,56 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) { } } +func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) { + b := testBroker(t, 5*time.Millisecond) + b.SetEnabled(true) + + // Enqueue + eval := mock.Eval() + err := b.Enqueue(eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Dequeue + out, token, err := b.Dequeue(defaultSched, time.Second) + start := time.Now() + if err != nil { + t.Fatalf("err: %v", err) + } + if out != eval { + t.Fatalf("bad: %v", out) + } + + // Pause in 2 milliseconds + time.Sleep(2 * time.Millisecond) + if err := b.PauseNackTimeout(out.ID, token); err != nil { + t.Fatalf("err: %v", err) + } + + go func() { + time.Sleep(2 * time.Millisecond) + if err := b.ResumeNackTimeout(out.ID, token); err != nil { + t.Fatalf("err: %v", err) + } + }() + + // Dequeue, should block until the timer is resumed + out, _, err = b.Dequeue(defaultSched, time.Second) + end := time.Now() + if err != nil { + t.Fatalf("err: %v", err) + } + if out != eval { + t.Fatalf("bad: %v", out) + } + + // Check the nack timer + if diff := end.Sub(start); diff < 9*time.Millisecond { + t.Fatalf("bad: %#v", diff) + } +} + func TestEvalBroker_DeliveryLimit(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) diff --git a/nomad/plan_endpoint.go b/nomad/plan_endpoint.go index cb9b798e82c..556c016df2a 100644 --- a/nomad/plan_endpoint.go +++ b/nomad/plan_endpoint.go @@ -19,8 +19,19 @@ func (p *Plan) Submit(args *structs.PlanRequest, reply *structs.PlanResponse) er } defer metrics.MeasureSince([]string{"nomad", "plan", "submit"}, time.Now()) + // Pause the Nack timer for the eval as it is making progress as long as it + // is in the plan queue. We resume immediately after we get a result to + // handle the case that the receiving worker dies. + plan := args.Plan + id := plan.EvalID + token := plan.EvalToken + if err := p.srv.evalBroker.PauseNackTimeout(id, token); err != nil { + return err + } + defer p.srv.evalBroker.ResumeNackTimeout(id, token) + // Submit the plan to the queue - future, err := p.srv.planQueue.Enqueue(args.Plan) + future, err := p.srv.planQueue.Enqueue(plan) if err != nil { return err }