Skip to content

Commit

Permalink
Merge pull request #3206 from hashicorp/b-eval-index
Browse files Browse the repository at this point in the history
Worker waits til max ModifyIndex across EvalsByJob
  • Loading branch information
dadgar authored Sep 14, 2017
2 parents a81ab20 + eae982d commit 04a04be
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ IMPROVEMENTS:

BUG FIXES:
* core: *Fix restoration of stopped periodic jobs [GH-3201]
* core: Fix a race condition in which scheduling results from one invocation of
the scheduler wouldn't be considered by the next for the same job [GH-3206]
* api: Sort /v1/agent/servers output so that output of Consul checks does not
change [GH-3214]
* api: Fix search handling of jobs with more than four hyphens and case were
Expand Down
42 changes: 42 additions & 0 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
Expand Down Expand Up @@ -92,15 +93,56 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,

// Provide the output if any
if eval != nil {
// Get the index that the worker should wait until before scheduling.
waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID)
if err != nil {
var mErr multierror.Error
multierror.Append(&mErr, err)

// We have dequeued the evaluation but won't be returning it to the
// worker so Nack the eval.
if err := e.srv.evalBroker.Nack(eval.ID, token); err != nil {
multierror.Append(&mErr, err)
}

return &mErr
}

reply.Eval = eval
reply.Token = token
reply.WaitIndex = waitIndex
}

// Set the query response
e.srv.setQueryMeta(&reply.QueryMeta)
return nil
}

// getWaitIndex returns the wait index that should be used by the worker before
// invoking the scheduler. The index should be the highest modify index of any
// evaluation for the job. This prevents scheduling races for the same job when
// there are blocked evaluations.
func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) {
snap, err := e.srv.State().Snapshot()
if err != nil {
return 0, err
}

evals, err := snap.EvalsByJob(nil, namespace, job)
if err != nil {
return 0, err
}

var max uint64
for _, eval := range evals {
if max < eval.ModifyIndex {
max = eval.ModifyIndex
}
}

return max, nil
}

// Ack is used to acknowledge completion of a dequeued evaluation
func (e *Eval) Ack(args *structs.EvalAckRequest,
reply *structs.GenericResponse) error {
Expand Down
50 changes: 50 additions & 0 deletions nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,56 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
if token != resp.Token {
t.Fatalf("bad token: %#v %#v", token, resp.Token)
}

if resp.WaitIndex != eval1.ModifyIndex {
t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, eval1.ModifyIndex)
}
}

func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request
eval1 := mock.Eval()
eval2 := mock.Eval()
eval2.JobID = eval1.JobID
s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1})
s1.evalBroker.Enqueue(eval1)
s1.fsm.State().UpsertEvals(1001, []*structs.Evaluation{eval2})

// Dequeue the eval
get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}

if !reflect.DeepEqual(eval1, resp.Eval) {
t.Fatalf("bad: %v %v", eval1, resp.Eval)
}

// Ensure outstanding
token, ok := s1.evalBroker.Outstanding(eval1.ID)
if !ok {
t.Fatalf("should be outstanding")
}
if token != resp.Token {
t.Fatalf("bad token: %#v %#v", token, resp.Token)
}

if resp.WaitIndex != 1001 {
t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, 1001)
}
}

func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) {
Expand Down
20 changes: 20 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,9 +932,29 @@ type SingleEvalResponse struct {
type EvalDequeueResponse struct {
Eval *Evaluation
Token string

// WaitIndex is the Raft index the worker should wait until invoking the
// scheduler.
WaitIndex uint64

QueryMeta
}

// GetWaitIndex is used to retrieve the Raft index in which state should be at
// or beyond before invoking the scheduler.
func (e *EvalDequeueResponse) GetWaitIndex() uint64 {
// Prefer the wait index sent. This will be populated on all responses from
// 0.7.0 and above
if e.WaitIndex != 0 {
return e.WaitIndex
} else if e.Eval != nil {
return e.Eval.ModifyIndex
}

// This should never happen
return 1
}

// PlanResponse is used to return from a PlanRequest
type PlanResponse struct {
Result *PlanResult
Expand Down
13 changes: 7 additions & 6 deletions nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (w *Worker) checkPaused() {
func (w *Worker) run() {
for {
// Dequeue a pending evaluation
eval, token, shutdown := w.dequeueEvaluation(dequeueTimeout)
eval, token, waitIndex, shutdown := w.dequeueEvaluation(dequeueTimeout)
if shutdown {
return
}
Expand All @@ -118,7 +118,7 @@ func (w *Worker) run() {
}

// Wait for the raft log to catchup to the evaluation
if err := w.waitForIndex(eval.ModifyIndex, raftSyncLimit); err != nil {
if err := w.waitForIndex(waitIndex, raftSyncLimit); err != nil {
w.sendAck(eval.ID, token, false)
continue
}
Expand All @@ -136,7 +136,8 @@ func (w *Worker) run() {

// dequeueEvaluation is used to fetch the next ready evaluation.
// This blocks until an evaluation is available or a timeout is reached.
func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, string, bool) {
func (w *Worker) dequeueEvaluation(timeout time.Duration) (
eval *structs.Evaluation, token string, waitIndex uint64, shutdown bool) {
// Setup the request
req := structs.EvalDequeueRequest{
Schedulers: w.srv.config.EnabledSchedulers,
Expand Down Expand Up @@ -170,7 +171,7 @@ REQ:
}

if w.backoffErr(base, limit) {
return nil, "", true
return nil, "", 0, true
}
goto REQ
}
Expand All @@ -179,12 +180,12 @@ REQ:
// Check if we got a response
if resp.Eval != nil {
w.logger.Printf("[DEBUG] worker: dequeued evaluation %s", resp.Eval.ID)
return resp.Eval, resp.Token, false
return resp.Eval, resp.Token, resp.GetWaitIndex(), false
}

// Check for potential shutdown
if w.srv.IsShutdown() {
return nil, "", true
return nil, "", 0, true
}
goto REQ
}
Expand Down
86 changes: 81 additions & 5 deletions nomad/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,93 @@ func TestWorker_dequeueEvaluation(t *testing.T) {
w := &Worker{srv: s1, logger: s1.logger}

// Attempt dequeue
eval, token, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
if shutdown {
t.Fatalf("should not shutdown")
}
if token == "" {
t.Fatalf("should get token")
}
if waitIndex != eval1.ModifyIndex {
t.Fatalf("bad wait index; got %d; want %d", eval1.ModifyIndex)
}

// Ensure we get a sane eval
if !reflect.DeepEqual(eval, eval1) {
t.Fatalf("bad: %#v %#v", eval, eval1)
}
}

// Test that the worker picks up the correct wait index when there are multiple
// evals for the same job.
func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
c.EnabledSchedulers = []string{structs.JobTypeService}
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Create the evaluation
eval1 := mock.Eval()
eval2 := mock.Eval()
eval2.JobID = eval1.JobID

// Insert the evals into the state store
if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1, eval2}); err != nil {
t.Fatal(err)
}

s1.evalBroker.Enqueue(eval1)
s1.evalBroker.Enqueue(eval2)

// Create a worker
w := &Worker{srv: s1, logger: s1.logger}

// Attempt dequeue
eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
if shutdown {
t.Fatalf("should not shutdown")
}
if token == "" {
t.Fatalf("should get token")
}
if waitIndex != eval1.ModifyIndex {
t.Fatalf("bad wait index; got %d; want %d", eval1.ModifyIndex)
}

// Ensure we get a sane eval
if !reflect.DeepEqual(eval, eval1) {
t.Fatalf("bad: %#v %#v", eval, eval1)
}

// Update the modify index of the first eval
if err := s1.fsm.State().UpsertEvals(2000, []*structs.Evaluation{eval1}); err != nil {
t.Fatal(err)
}

// Send the Ack
w.sendAck(eval1.ID, token, true)

// Attempt second dequeue
eval, token, waitIndex, shutdown = w.dequeueEvaluation(10 * time.Millisecond)
if shutdown {
t.Fatalf("should not shutdown")
}
if token == "" {
t.Fatalf("should get token")
}
if waitIndex != 2000 {
t.Fatalf("bad wait index; got %d; want 2000", eval2.ModifyIndex)
}

// Ensure we get a sane eval
if !reflect.DeepEqual(eval, eval2) {
t.Fatalf("bad: %#v %#v", eval, eval2)
}
}

func TestWorker_dequeueEvaluation_paused(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
Expand Down Expand Up @@ -101,7 +174,7 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) {

// Attempt dequeue
start := time.Now()
eval, token, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
if diff := time.Since(start); diff < 100*time.Millisecond {
t.Fatalf("should have paused: %v", diff)
}
Expand All @@ -111,6 +184,9 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) {
if token == "" {
t.Fatalf("should get token")
}
if waitIndex != eval1.ModifyIndex {
t.Fatalf("bad wait index; got %d; want %d", eval1.ModifyIndex)
}

// Ensure we get a sane eval
if !reflect.DeepEqual(eval, eval1) {
Expand All @@ -136,7 +212,7 @@ func TestWorker_dequeueEvaluation_shutdown(t *testing.T) {
}()

// Attempt dequeue
eval, _, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
eval, _, _, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
if !shutdown {
t.Fatalf("should not shutdown")
}
Expand Down Expand Up @@ -164,7 +240,7 @@ func TestWorker_sendAck(t *testing.T) {
w := &Worker{srv: s1, logger: s1.logger}

// Attempt dequeue
eval, token, _ := w.dequeueEvaluation(10 * time.Millisecond)
eval, token, _, _ := w.dequeueEvaluation(10 * time.Millisecond)

// Check the depth is 0, 1 unacked
stats := s1.evalBroker.Stats()
Expand All @@ -182,7 +258,7 @@ func TestWorker_sendAck(t *testing.T) {
}

// Attempt dequeue
eval, token, _ = w.dequeueEvaluation(10 * time.Millisecond)
eval, token, _, _ = w.dequeueEvaluation(10 * time.Millisecond)

// Send the Ack
w.sendAck(eval.ID, token, true)
Expand Down

0 comments on commit 04a04be

Please sign in to comment.