From 4b4e376d7e1af13d1c5cd42065b95bb6db526647 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 13 Sep 2017 13:47:01 -0700 Subject: [PATCH 1/3] Worker waits til max ModifyIndex across EvalsByJob This PR fixes a scheduling race condition in which the plan results from one invocation of the scheduler were not being considered by the next since the Worker was not waiting for the correct index. Fixes https://github.com/hashicorp/nomad/issues/3198 --- nomad/eval_endpoint.go | 32 ++++++++++++++ nomad/eval_endpoint_test.go | 50 +++++++++++++++++++++ nomad/structs/structs.go | 20 +++++++++ nomad/worker.go | 13 +++--- nomad/worker_test.go | 86 ++++++++++++++++++++++++++++++++++--- 5 files changed, 190 insertions(+), 11 deletions(-) diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 7b50c313a60..0a00992e4f1 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -92,8 +92,15 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, // Provide the output if any if eval != nil { + // Get the index that the worker should wait until before scheduling. + waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID) + if err != nil { + return err + } + reply.Eval = eval reply.Token = token + reply.WaitIndex = waitIndex } // Set the query response @@ -101,6 +108,31 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, return nil } +// getWaitIndex returns the wait index that should be used by the worker before +// invoking the scheduler. The index should be the highest modify index of any +// evaluation for the job. This prevents scheduling races for the same job when +// there are blocked evaluations. +func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) { + snap, err := e.srv.State().Snapshot() + if err != nil { + return 0, err + } + + evals, err := snap.EvalsByJob(nil, namespace, job) + if err != nil { + return 0, err + } + + var max uint64 + for _, eval := range evals { + if max < eval.ModifyIndex { + max = eval.ModifyIndex + } + } + + return max, nil +} + // Ack is used to acknowledge completion of a dequeued evaluation func (e *Eval) Ack(args *structs.EvalAckRequest, reply *structs.GenericResponse) error { diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 3330eda4355..9cf1b21a1df 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -170,6 +170,56 @@ func TestEvalEndpoint_Dequeue(t *testing.T) { if token != resp.Token { t.Fatalf("bad token: %#v %#v", token, resp.Token) } + + if resp.WaitIndex != eval1.ModifyIndex { + t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, eval1.ModifyIndex) + } +} + +func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) { + t.Parallel() + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + eval1 := mock.Eval() + eval2 := mock.Eval() + eval2.JobID = eval1.JobID + s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}) + s1.evalBroker.Enqueue(eval1) + s1.fsm.State().UpsertEvals(1001, []*structs.Evaluation{eval2}) + + // Dequeue the eval + get := &structs.EvalDequeueRequest{ + Schedulers: defaultSched, + SchedulerVersion: scheduler.SchedulerVersion, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.EvalDequeueResponse + if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(eval1, resp.Eval) { + t.Fatalf("bad: %v %v", eval1, resp.Eval) + } + + // Ensure outstanding + token, ok := s1.evalBroker.Outstanding(eval1.ID) + if !ok { + t.Fatalf("should be outstanding") + } + if token != resp.Token { + t.Fatalf("bad token: %#v %#v", token, resp.Token) + } + + if resp.WaitIndex != 1001 { + t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, 1001) + } } func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 832ee476440..578d93dbac8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -932,9 +932,29 @@ type SingleEvalResponse struct { type EvalDequeueResponse struct { Eval *Evaluation Token string + + // WaitIndex is the Raft index the worker should wait until invoking the + // scheduler. + WaitIndex uint64 + QueryMeta } +// GetWaitIndex is used to retrieve the Raft index in which state should be at +// or beyond before invoking the scheduler. +func (e *EvalDequeueResponse) GetWaitIndex() uint64 { + // Prefer the wait index sent. This will be populated on all responses from + // 0.7.0 and above + if e.WaitIndex != 0 { + return e.WaitIndex + } else if e.Eval != nil { + return e.Eval.ModifyIndex + } + + // This should never happen + return 1 +} + // PlanResponse is used to return from a PlanRequest type PlanResponse struct { Result *PlanResult diff --git a/nomad/worker.go b/nomad/worker.go index 0a6b66cc328..76fb4eefacf 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -106,7 +106,7 @@ func (w *Worker) checkPaused() { func (w *Worker) run() { for { // Dequeue a pending evaluation - eval, token, shutdown := w.dequeueEvaluation(dequeueTimeout) + eval, token, waitIndex, shutdown := w.dequeueEvaluation(dequeueTimeout) if shutdown { return } @@ -118,7 +118,7 @@ func (w *Worker) run() { } // Wait for the raft log to catchup to the evaluation - if err := w.waitForIndex(eval.ModifyIndex, raftSyncLimit); err != nil { + if err := w.waitForIndex(waitIndex, raftSyncLimit); err != nil { w.sendAck(eval.ID, token, false) continue } @@ -136,7 +136,8 @@ func (w *Worker) run() { // dequeueEvaluation is used to fetch the next ready evaluation. // This blocks until an evaluation is available or a timeout is reached. -func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, string, bool) { +func (w *Worker) dequeueEvaluation(timeout time.Duration) ( + eval *structs.Evaluation, token string, waitIndex uint64, shutdown bool) { // Setup the request req := structs.EvalDequeueRequest{ Schedulers: w.srv.config.EnabledSchedulers, @@ -170,7 +171,7 @@ REQ: } if w.backoffErr(base, limit) { - return nil, "", true + return nil, "", 0, true } goto REQ } @@ -179,12 +180,12 @@ REQ: // Check if we got a response if resp.Eval != nil { w.logger.Printf("[DEBUG] worker: dequeued evaluation %s", resp.Eval.ID) - return resp.Eval, resp.Token, false + return resp.Eval, resp.Token, resp.GetWaitIndex(), false } // Check for potential shutdown if w.srv.IsShutdown() { - return nil, "", true + return nil, "", 0, true } goto REQ } diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 3e716f31dc0..567a3132153 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -60,13 +60,16 @@ func TestWorker_dequeueEvaluation(t *testing.T) { w := &Worker{srv: s1, logger: s1.logger} // Attempt dequeue - eval, token, shutdown := w.dequeueEvaluation(10 * time.Millisecond) + eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond) if shutdown { t.Fatalf("should not shutdown") } if token == "" { t.Fatalf("should get token") } + if waitIndex != eval1.ModifyIndex { + t.Fatalf("bad wait index; got %d; want %d", eval1.ModifyIndex) + } // Ensure we get a sane eval if !reflect.DeepEqual(eval, eval1) { @@ -74,6 +77,76 @@ func TestWorker_dequeueEvaluation(t *testing.T) { } } +// Test that the worker picks up the correct wait index when there are multiple +// evals for the same job. +func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) { + t.Parallel() + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.EnabledSchedulers = []string{structs.JobTypeService} + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Create the evaluation + eval1 := mock.Eval() + eval2 := mock.Eval() + eval2.JobID = eval1.JobID + + // Insert the evals into the state store + if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1, eval2}); err != nil { + t.Fatal(err) + } + + s1.evalBroker.Enqueue(eval1) + s1.evalBroker.Enqueue(eval2) + + // Create a worker + w := &Worker{srv: s1, logger: s1.logger} + + // Attempt dequeue + eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond) + if shutdown { + t.Fatalf("should not shutdown") + } + if token == "" { + t.Fatalf("should get token") + } + if waitIndex != eval1.ModifyIndex { + t.Fatalf("bad wait index; got %d; want %d", eval1.ModifyIndex) + } + + // Ensure we get a sane eval + if !reflect.DeepEqual(eval, eval1) { + t.Fatalf("bad: %#v %#v", eval, eval1) + } + + // Update the modify index of the first eval + if err := s1.fsm.State().UpsertEvals(2000, []*structs.Evaluation{eval1}); err != nil { + t.Fatal(err) + } + + // Send the Ack + w.sendAck(eval1.ID, token, true) + + // Attempt second dequeue + eval, token, waitIndex, shutdown = w.dequeueEvaluation(10 * time.Millisecond) + if shutdown { + t.Fatalf("should not shutdown") + } + if token == "" { + t.Fatalf("should get token") + } + if waitIndex != 2000 { + t.Fatalf("bad wait index; got %d; want 2000", eval2.ModifyIndex) + } + + // Ensure we get a sane eval + if !reflect.DeepEqual(eval, eval2) { + t.Fatalf("bad: %#v %#v", eval, eval2) + } +} + func TestWorker_dequeueEvaluation_paused(t *testing.T) { t.Parallel() s1 := testServer(t, func(c *Config) { @@ -101,7 +174,7 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) { // Attempt dequeue start := time.Now() - eval, token, shutdown := w.dequeueEvaluation(10 * time.Millisecond) + eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond) if diff := time.Since(start); diff < 100*time.Millisecond { t.Fatalf("should have paused: %v", diff) } @@ -111,6 +184,9 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) { if token == "" { t.Fatalf("should get token") } + if waitIndex != eval1.ModifyIndex { + t.Fatalf("bad wait index; got %d; want %d", eval1.ModifyIndex) + } // Ensure we get a sane eval if !reflect.DeepEqual(eval, eval1) { @@ -136,7 +212,7 @@ func TestWorker_dequeueEvaluation_shutdown(t *testing.T) { }() // Attempt dequeue - eval, _, shutdown := w.dequeueEvaluation(10 * time.Millisecond) + eval, _, _, shutdown := w.dequeueEvaluation(10 * time.Millisecond) if !shutdown { t.Fatalf("should not shutdown") } @@ -164,7 +240,7 @@ func TestWorker_sendAck(t *testing.T) { w := &Worker{srv: s1, logger: s1.logger} // Attempt dequeue - eval, token, _ := w.dequeueEvaluation(10 * time.Millisecond) + eval, token, _, _ := w.dequeueEvaluation(10 * time.Millisecond) // Check the depth is 0, 1 unacked stats := s1.evalBroker.Stats() @@ -182,7 +258,7 @@ func TestWorker_sendAck(t *testing.T) { } // Attempt dequeue - eval, token, _ = w.dequeueEvaluation(10 * time.Millisecond) + eval, token, _, _ = w.dequeueEvaluation(10 * time.Millisecond) // Send the Ack w.sendAck(eval.ID, token, true) From c3cca843b5cc80cf7a535d13e409f8aa3b459e18 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 14 Sep 2017 14:27:00 -0700 Subject: [PATCH 2/3] Address feedback --- nomad/eval_endpoint.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 0a00992e4f1..9b5cd3cdce9 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -6,6 +6,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/go-memdb" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" @@ -95,7 +96,16 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, // Get the index that the worker should wait until before scheduling. waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID) if err != nil { - return err + var mErr multierror.Error + multierror.Append(&mErr, err) + + // We have dequeued the evaluation but won't be returning it to the + // worker so Nack the eval. + if err := e.srv.evalBroker.Nack(eval.ID, token); err != nil { + multierror.Append(&mErr, err) + } + + return &mErr } reply.Eval = eval From eae982d9f22ef5a159d590f7be00f73e826bd4a9 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 14 Sep 2017 14:28:04 -0700 Subject: [PATCH 3/3] Changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e253c5737af..cb7f45e49f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ IMPROVEMENTS: BUG FIXES: * core: *Fix restoration of stopped periodic jobs [GH-3201] + * core: Fix a race condition in which scheduling results from one invocation of + the scheduler wouldn't be considered by the next for the same job [GH-3206] * api: Sort /v1/agent/servers output so that output of Consul checks does not change [GH-3214] * api: Fix search handling of jobs with more than four hyphens and case were