Skip to content

Commit

Permalink
Merge pull request #5381 from hashicorp/b-max-eval-wait-index
Browse files Browse the repository at this point in the history
nomad: compare current eval when setting WaitIndex
  • Loading branch information
schmichael authored Mar 6, 2019
2 parents d75cf58 + 6c8839a commit 76d2eee
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
9 changes: 6 additions & 3 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
// Provide the output if any
if eval != nil {
// Get the index that the worker should wait until before scheduling.
waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID)
waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID, eval.ModifyIndex)
if err != nil {
var mErr multierror.Error
multierror.Append(&mErr, err)
Expand Down Expand Up @@ -133,7 +133,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
// invoking the scheduler. The index should be the highest modify index of any
// evaluation for the job. This prevents scheduling races for the same job when
// there are blocked evaluations.
func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) {
func (e *Eval) getWaitIndex(namespace, job string, evalModifyIndex uint64) (uint64, error) {
snap, err := e.srv.State().Snapshot()
if err != nil {
return 0, err
Expand All @@ -144,7 +144,10 @@ func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) {
return 0, err
}

var max uint64
// Since dequeueing evals is concurrent with applying Raft messages to
// the state store, initialize to the currently dequeued eval's index
// in case it isn't in the snapshot used by EvalsByJob yet.
max := evalModifyIndex
for _, eval := range evals {
if max < eval.ModifyIndex {
max = eval.ModifyIndex
Expand Down
47 changes: 46 additions & 1 deletion nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hashicorp/nomad/scheduler"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestEvalEndpoint_GetEval(t *testing.T) {
Expand Down Expand Up @@ -239,7 +240,9 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
}
}

func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) {
// TestEvalEndpoint_Dequeue_WaitIndex_Snapshot asserts that an eval's wait
// index will be equal to the highest eval modify index in the state store.
func TestEvalEndpoint_Dequeue_WaitIndex_Snapshot(t *testing.T) {
t.Parallel()
s1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
Expand Down Expand Up @@ -285,6 +288,48 @@ func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) {
}
}

// TestEvalEndpoint_Dequeue_WaitIndex_Eval asserts that an eval's wait index
// will be its own modify index if its modify index is greater than all of the
// indexes in the state store. This can happen if Dequeue receives an eval that
// has not yet been applied from the Raft log to the local node's state store.
func TestEvalEndpoint_Dequeue_WaitIndex_Eval(t *testing.T) {
t.Parallel()
s1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request but only upsert 1 into the state store
eval1 := mock.Eval()
eval2 := mock.Eval()
eval2.JobID = eval1.JobID
s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1})
eval2.ModifyIndex = 1001
s1.evalBroker.Enqueue(eval2)

// Dequeue the eval
get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp))
require.Equal(t, eval2, resp.Eval)

// Ensure outstanding
token, ok := s1.evalBroker.Outstanding(eval2.ID)
require.True(t, ok)
require.Equal(t, resp.Token, token)

// WaitIndex should be equal to the max ModifyIndex - even when that
// modify index is of the dequeued eval which has yet to be applied to
// the state store.
require.Equal(t, eval2.ModifyIndex, resp.WaitIndex)
}

func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) {
// test enqueuing an eval, updating a plan result for the same eval and de-queueing the eval
t.Parallel()
Expand Down

0 comments on commit 76d2eee

Please sign in to comment.