Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nomad: compare current eval when setting WaitIndex #5381

Merged
merged 4 commits into from
Mar 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
// Provide the output if any
if eval != nil {
// Get the index that the worker should wait until before scheduling.
waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID)
waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID, eval.ModifyIndex)
if err != nil {
var mErr multierror.Error
multierror.Append(&mErr, err)
Expand Down Expand Up @@ -133,7 +133,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
// invoking the scheduler. The index should be the highest modify index of any
// evaluation for the job. This prevents scheduling races for the same job when
// there are blocked evaluations.
func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) {
func (e *Eval) getWaitIndex(namespace, job string, evalModifyIndex uint64) (uint64, error) {
snap, err := e.srv.State().Snapshot()
if err != nil {
return 0, err
Expand All @@ -144,7 +144,10 @@ func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) {
return 0, err
}

var max uint64
// Since dequeueing evals is concurrent with applying Raft messages to
// the state store, initialize to the currently dequeued eval's index
// in case it isn't in the snapshot used by EvalsByJob yet.
max := evalModifyIndex
for _, eval := range evals {
if max < eval.ModifyIndex {
max = eval.ModifyIndex
Expand Down
47 changes: 46 additions & 1 deletion nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hashicorp/nomad/scheduler"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestEvalEndpoint_GetEval(t *testing.T) {
Expand Down Expand Up @@ -239,7 +240,9 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
}
}

func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) {
// TestEvalEndpoint_Dequeue_WaitIndex_Snapshot asserts that an eval's wait
// index will be equal to the highest eval modify index in the state store.
func TestEvalEndpoint_Dequeue_WaitIndex_Snapshot(t *testing.T) {
t.Parallel()
s1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
Expand Down Expand Up @@ -285,6 +288,48 @@ func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) {
}
}

// TestEvalEndpoint_Dequeue_WaitIndex_Eval asserts that an eval's wait index
// will be its own modify index if its modify index is greater than all of the
// indexes in the state store. This can happen if Dequeue receives an eval that
// has not yet been applied from the Raft log to the local node's state store.
func TestEvalEndpoint_Dequeue_WaitIndex_Eval(t *testing.T) {
t.Parallel()
s1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request but only upsert 1 into the state store
eval1 := mock.Eval()
eval2 := mock.Eval()
eval2.JobID = eval1.JobID
s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1})
eval2.ModifyIndex = 1001
s1.evalBroker.Enqueue(eval2)

// Dequeue the eval
get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp))
require.Equal(t, eval2, resp.Eval)

// Ensure outstanding
token, ok := s1.evalBroker.Outstanding(eval2.ID)
require.True(t, ok)
require.Equal(t, resp.Token, token)

// WaitIndex should be equal to the max ModifyIndex - even when that
// modify index is of the dequeued eval which has yet to be applied to
// the state store.
require.Equal(t, eval2.ModifyIndex, resp.WaitIndex)
}

func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) {
// test enqueuing an eval, updating a plan result for the same eval and de-queueing the eval
t.Parallel()
Expand Down