From 3fc57654018a2999e1b4afab0a2729449ec82c82 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 1 Mar 2019 15:23:39 -0800 Subject: [PATCH 1/4] nomad: compare current eval when setting WaitIndex Consider currently dequeued Evaluation's ModifyIndex when determining its WaitIndex. Normally the Evaluation itself would already be in the state store snapshot used to determine the WaitIndex. However, since the FSM applies Raft messages to the state store concurrently with Dequeueing, it's possible the currently dequeued Evaluation won't yet exist in the state store snapshot used by JobsForEval. This can be solved by always considering the current eval's modify index and using it if it is greater than all of the evals returned by the state store. --- nomad/eval_endpoint.go | 12 ++++++++-- nomad/eval_endpoint_test.go | 47 ++++++++++++++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index ff30eaba836..0f1595c0b39 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -105,7 +105,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, // Provide the output if any if eval != nil { // Get the index that the worker should wait until before scheduling. - waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID) + waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID, eval.ModifyIndex) if err != nil { var mErr multierror.Error multierror.Append(&mErr, err) @@ -133,7 +133,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, // invoking the scheduler. The index should be the highest modify index of any // evaluation for the job. This prevents scheduling races for the same job when // there are blocked evaluations. -func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) { +func (e *Eval) getWaitIndex(namespace, job string, curIndex uint64) (uint64, error) { snap, err := e.srv.State().Snapshot() if err != nil { return 0, err @@ -151,6 +151,14 @@ func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) { } } + // Since dequeueing evals is concurrent with applying raft messages to + // the state store, manually compare the currently dequeued eval's + // index against max in case it wasn't in the snapshot used by + // EvalsByJob yet. + if max < curIndex { + max = curIndex + } + return max, nil } diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 43662ed8c73..6d32086b558 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestEvalEndpoint_GetEval(t *testing.T) { @@ -239,7 +240,9 @@ func TestEvalEndpoint_Dequeue(t *testing.T) { } } -func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) { +// TestEvalEndpoint_Dequeue_WaitIndex_Snapshot asserts that an eval's wait +// index will be equal to the highest eval modify index in the state store. +func TestEvalEndpoint_Dequeue_WaitIndex_Snapshot(t *testing.T) { t.Parallel() s1 := TestServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -285,6 +288,48 @@ func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) { } } +// TestEvalEndpoint_Dequeue_WaitIndex_Eval asserts that an eval's wait index +// will be its own modify index if its modify index is greater than all of the +// indexes in the state store. This can happen if Dequeue receives an eval that +// has not yet been applied from the raft log to the local node's state store. +func TestEvalEndpoint_Dequeue_WaitIndex_Eval(t *testing.T) { + t.Parallel() + s1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request but only upsert 1 into the state store + eval1 := mock.Eval() + eval2 := mock.Eval() + eval2.JobID = eval1.JobID + s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}) + eval2.ModifyIndex = 1001 + s1.evalBroker.Enqueue(eval2) + + // Dequeue the eval + get := &structs.EvalDequeueRequest{ + Schedulers: defaultSched, + SchedulerVersion: scheduler.SchedulerVersion, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.EvalDequeueResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp)) + require.Equal(t, eval2, resp.Eval) + + // Ensure outstanding + token, ok := s1.evalBroker.Outstanding(eval2.ID) + require.True(t, ok) + require.Equal(t, resp.Token, token) + + // WaitIndex should be equal to the max ModifyIndex - even when that + // modify index is of the dequeued eval which has yet to be applied to + // the state store. + require.Equal(t, eval2.ModifyIndex, resp.WaitIndex) +} + func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) { // test enqueuing an eval, updating a plan result for the same eval and de-queueing the eval t.Parallel() From b47b1f1cda12892bcf655e61d6e96e9522bf0552 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 4 Mar 2019 13:44:14 -0800 Subject: [PATCH 2/4] nomad: simplify code and improve parameter name --- nomad/eval_endpoint.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 0f1595c0b39..6e6c0d80997 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -133,7 +133,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, // invoking the scheduler. The index should be the highest modify index of any // evaluation for the job. This prevents scheduling races for the same job when // there are blocked evaluations. -func (e *Eval) getWaitIndex(namespace, job string, curIndex uint64) (uint64, error) { +func (e *Eval) getWaitIndex(namespace, job string, evalModifyIndex uint64) (uint64, error) { snap, err := e.srv.State().Snapshot() if err != nil { return 0, err @@ -144,21 +144,16 @@ func (e *Eval) getWaitIndex(namespace, job string, curIndex uint64) (uint64, err return 0, err } - var max uint64 + // Since dequeueing evals is concurrent with applying raft messages to + // the state store, initialize to the currently dequeued eval's index + // in case it isn't in the snapshot used by EvalsByJob yet. + max := evalModifyIndex for _, eval := range evals { if max < eval.ModifyIndex { max = eval.ModifyIndex } } - // Since dequeueing evals is concurrent with applying raft messages to - // the state store, manually compare the currently dequeued eval's - // index against max in case it wasn't in the snapshot used by - // EvalsByJob yet. - if max < curIndex { - max = curIndex - } - return max, nil } From 3d8683b0db338a3fcd25d12325acf8c16fdabf01 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 5 Mar 2019 15:19:07 -0800 Subject: [PATCH 3/4] Update nomad/eval_endpoint.go Co-Authored-By: schmichael --- nomad/eval_endpoint.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 6e6c0d80997..fa7b23dcd4e 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -144,7 +144,7 @@ func (e *Eval) getWaitIndex(namespace, job string, evalModifyIndex uint64) (uint return 0, err } - // Since dequeueing evals is concurrent with applying raft messages to + // Since dequeueing evals is concurrent with applying Raft messages to // the state store, initialize to the currently dequeued eval's index // in case it isn't in the snapshot used by EvalsByJob yet. max := evalModifyIndex From 6c8839a859036904b32c5b9886ac24c743d63199 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 5 Mar 2019 15:19:15 -0800 Subject: [PATCH 4/4] Update nomad/eval_endpoint_test.go Co-Authored-By: schmichael --- nomad/eval_endpoint_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 6d32086b558..6051d4040e5 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -291,7 +291,7 @@ func TestEvalEndpoint_Dequeue_WaitIndex_Snapshot(t *testing.T) { // TestEvalEndpoint_Dequeue_WaitIndex_Eval asserts that an eval's wait index // will be its own modify index if its modify index is greater than all of the // indexes in the state store. This can happen if Dequeue receives an eval that -// has not yet been applied from the raft log to the local node's state store. +// has not yet been applied from the Raft log to the local node's state store. func TestEvalEndpoint_Dequeue_WaitIndex_Eval(t *testing.T) { t.Parallel() s1 := TestServer(t, func(c *Config) {