diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index ff30eaba836..fa7b23dcd4e 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -105,7 +105,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, // Provide the output if any if eval != nil { // Get the index that the worker should wait until before scheduling. - waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID) + waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID, eval.ModifyIndex) if err != nil { var mErr multierror.Error multierror.Append(&mErr, err) @@ -133,7 +133,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, // invoking the scheduler. The index should be the highest modify index of any // evaluation for the job. This prevents scheduling races for the same job when // there are blocked evaluations. -func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) { +func (e *Eval) getWaitIndex(namespace, job string, evalModifyIndex uint64) (uint64, error) { snap, err := e.srv.State().Snapshot() if err != nil { return 0, err @@ -144,7 +144,10 @@ func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) { return 0, err } - var max uint64 + // Since dequeueing evals is concurrent with applying Raft messages to + // the state store, initialize to the currently dequeued eval's index + // in case it isn't in the snapshot used by EvalsByJob yet. + max := evalModifyIndex for _, eval := range evals { if max < eval.ModifyIndex { max = eval.ModifyIndex diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 43662ed8c73..6051d4040e5 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestEvalEndpoint_GetEval(t *testing.T) { @@ -239,7 +240,9 @@ func TestEvalEndpoint_Dequeue(t *testing.T) { } } -func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) { +// TestEvalEndpoint_Dequeue_WaitIndex_Snapshot asserts that an eval's wait +// index will be equal to the highest eval modify index in the state store. +func TestEvalEndpoint_Dequeue_WaitIndex_Snapshot(t *testing.T) { t.Parallel() s1 := TestServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -285,6 +288,48 @@ func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) { } } +// TestEvalEndpoint_Dequeue_WaitIndex_Eval asserts that an eval's wait index +// will be its own modify index if its modify index is greater than all of the +// indexes in the state store. This can happen if Dequeue receives an eval that +// has not yet been applied from the Raft log to the local node's state store. +func TestEvalEndpoint_Dequeue_WaitIndex_Eval(t *testing.T) { + t.Parallel() + s1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request but only upsert 1 into the state store + eval1 := mock.Eval() + eval2 := mock.Eval() + eval2.JobID = eval1.JobID + s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}) + eval2.ModifyIndex = 1001 + s1.evalBroker.Enqueue(eval2) + + // Dequeue the eval + get := &structs.EvalDequeueRequest{ + Schedulers: defaultSched, + SchedulerVersion: scheduler.SchedulerVersion, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.EvalDequeueResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp)) + require.Equal(t, eval2, resp.Eval) + + // Ensure outstanding + token, ok := s1.evalBroker.Outstanding(eval2.ID) + require.True(t, ok) + require.Equal(t, resp.Token, token) + + // WaitIndex should be equal to the max ModifyIndex - even when that + // modify index is of the dequeued eval which has yet to be applied to + // the state store. + require.Equal(t, eval2.ModifyIndex, resp.WaitIndex) +} + func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) { // test enqueuing an eval, updating a plan result for the same eval and de-queueing the eval t.Parallel()