Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker waits til max ModifyIndex across EvalsByJob #3206

Merged
merged 3 commits into from
Sep 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ IMPROVEMENTS:

BUG FIXES:
* core: *Fix restoration of stopped periodic jobs [GH-3201]
* core: Fix a race condition in which scheduling results from one invocation of
the scheduler wouldn't be considered by the next for the same job [GH-3206]
* api: Sort /v1/agent/servers output so that output of Consul checks does not
change [GH-3214]
* api: Fix search handling of jobs with more than four hyphens and case were
Expand Down
42 changes: 42 additions & 0 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
Expand Down Expand Up @@ -92,15 +93,56 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,

// Provide the output if any
if eval != nil {
// Get the index that the worker should wait until before scheduling.
waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID)
if err != nil {
var mErr multierror.Error
multierror.Append(&mErr, err)

// We have dequeued the evaluation but won't be returning it to the
// worker so Nack the eval.
if err := e.srv.evalBroker.Nack(eval.ID, token); err != nil {
multierror.Append(&mErr, err)
}

return &mErr
}

reply.Eval = eval
reply.Token = token
reply.WaitIndex = waitIndex
}

// Set the query response
e.srv.setQueryMeta(&reply.QueryMeta)
return nil
}

// getWaitIndex returns the wait index that should be used by the worker before
// invoking the scheduler. The index should be the highest modify index of any
// evaluation for the job. This prevents scheduling races for the same job when
// there are blocked evaluations.
func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) {
snap, err := e.srv.State().Snapshot()
if err != nil {
return 0, err
}

evals, err := snap.EvalsByJob(nil, namespace, job)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use an iterator to avoid materializing the slice?

if err != nil {
return 0, err
}

var max uint64
for _, eval := range evals {
if max < eval.ModifyIndex {
max = eval.ModifyIndex
}
}

return max, nil
}

// Ack is used to acknowledge completion of a dequeued evaluation
func (e *Eval) Ack(args *structs.EvalAckRequest,
reply *structs.GenericResponse) error {
Expand Down
50 changes: 50 additions & 0 deletions nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,56 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
if token != resp.Token {
t.Fatalf("bad token: %#v %#v", token, resp.Token)
}

if resp.WaitIndex != eval1.ModifyIndex {
t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, eval1.ModifyIndex)
}
}

func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request
eval1 := mock.Eval()
eval2 := mock.Eval()
eval2.JobID = eval1.JobID
s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1})
s1.evalBroker.Enqueue(eval1)
s1.fsm.State().UpsertEvals(1001, []*structs.Evaluation{eval2})

// Dequeue the eval
get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}

if !reflect.DeepEqual(eval1, resp.Eval) {
t.Fatalf("bad: %v %v", eval1, resp.Eval)
}

// Ensure outstanding
token, ok := s1.evalBroker.Outstanding(eval1.ID)
if !ok {
t.Fatalf("should be outstanding")
}
if token != resp.Token {
t.Fatalf("bad token: %#v %#v", token, resp.Token)
}

if resp.WaitIndex != 1001 {
t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, 1001)
}
}

func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) {
Expand Down
20 changes: 20 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,9 +932,29 @@ type SingleEvalResponse struct {
type EvalDequeueResponse struct {
Eval *Evaluation
Token string

// WaitIndex is the Raft index the worker should wait until invoking the
// scheduler.
WaitIndex uint64

QueryMeta
}

// GetWaitIndex is used to retrieve the Raft index in which state should be at
// or beyond before invoking the scheduler.
func (e *EvalDequeueResponse) GetWaitIndex() uint64 {
// Prefer the wait index sent. This will be populated on all responses from
// 0.7.0 and above
if e.WaitIndex != 0 {
return e.WaitIndex
} else if e.Eval != nil {
return e.Eval.ModifyIndex
}

// This should never happen
return 1
}

// PlanResponse is used to return from a PlanRequest
type PlanResponse struct {
Result *PlanResult
Expand Down
13 changes: 7 additions & 6 deletions nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (w *Worker) checkPaused() {
func (w *Worker) run() {
for {
// Dequeue a pending evaluation
eval, token, shutdown := w.dequeueEvaluation(dequeueTimeout)
eval, token, waitIndex, shutdown := w.dequeueEvaluation(dequeueTimeout)
if shutdown {
return
}
Expand All @@ -118,7 +118,7 @@ func (w *Worker) run() {
}

// Wait for the raft log to catchup to the evaluation
if err := w.waitForIndex(eval.ModifyIndex, raftSyncLimit); err != nil {
if err := w.waitForIndex(waitIndex, raftSyncLimit); err != nil {
w.sendAck(eval.ID, token, false)
continue
}
Expand All @@ -136,7 +136,8 @@ func (w *Worker) run() {

// dequeueEvaluation is used to fetch the next ready evaluation.
// This blocks until an evaluation is available or a timeout is reached.
func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, string, bool) {
func (w *Worker) dequeueEvaluation(timeout time.Duration) (
eval *structs.Evaluation, token string, waitIndex uint64, shutdown bool) {
// Setup the request
req := structs.EvalDequeueRequest{
Schedulers: w.srv.config.EnabledSchedulers,
Expand Down Expand Up @@ -170,7 +171,7 @@ REQ:
}

if w.backoffErr(base, limit) {
return nil, "", true
return nil, "", 0, true
}
goto REQ
}
Expand All @@ -179,12 +180,12 @@ REQ:
// Check if we got a response
if resp.Eval != nil {
w.logger.Printf("[DEBUG] worker: dequeued evaluation %s", resp.Eval.ID)
return resp.Eval, resp.Token, false
return resp.Eval, resp.Token, resp.GetWaitIndex(), false
}

// Check for potential shutdown
if w.srv.IsShutdown() {
return nil, "", true
return nil, "", 0, true
}
goto REQ
}
Expand Down
86 changes: 81 additions & 5 deletions nomad/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,93 @@ func TestWorker_dequeueEvaluation(t *testing.T) {
w := &Worker{srv: s1, logger: s1.logger}

// Attempt dequeue
eval, token, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
if shutdown {
t.Fatalf("should not shutdown")
}
if token == "" {
t.Fatalf("should get token")
}
if waitIndex != eval1.ModifyIndex {
t.Fatalf("bad wait index; got %d; want %d", eval1.ModifyIndex)
}

// Ensure we get a sane eval
if !reflect.DeepEqual(eval, eval1) {
t.Fatalf("bad: %#v %#v", eval, eval1)
}
}

// Test that the worker picks up the correct wait index when there are multiple
// evals for the same job.
func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
c.EnabledSchedulers = []string{structs.JobTypeService}
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Create the evaluation
eval1 := mock.Eval()
eval2 := mock.Eval()
eval2.JobID = eval1.JobID

// Insert the evals into the state store
if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1, eval2}); err != nil {
t.Fatal(err)
}

s1.evalBroker.Enqueue(eval1)
s1.evalBroker.Enqueue(eval2)

// Create a worker
w := &Worker{srv: s1, logger: s1.logger}

// Attempt dequeue
eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
if shutdown {
t.Fatalf("should not shutdown")
}
if token == "" {
t.Fatalf("should get token")
}
if waitIndex != eval1.ModifyIndex {
t.Fatalf("bad wait index; got %d; want %d", eval1.ModifyIndex)
}

// Ensure we get a sane eval
if !reflect.DeepEqual(eval, eval1) {
t.Fatalf("bad: %#v %#v", eval, eval1)
}

// Update the modify index of the first eval
if err := s1.fsm.State().UpsertEvals(2000, []*structs.Evaluation{eval1}); err != nil {
t.Fatal(err)
}

// Send the Ack
w.sendAck(eval1.ID, token, true)

// Attempt second dequeue
eval, token, waitIndex, shutdown = w.dequeueEvaluation(10 * time.Millisecond)
if shutdown {
t.Fatalf("should not shutdown")
}
if token == "" {
t.Fatalf("should get token")
}
if waitIndex != 2000 {
t.Fatalf("bad wait index; got %d; want 2000", eval2.ModifyIndex)
}

// Ensure we get a sane eval
if !reflect.DeepEqual(eval, eval2) {
t.Fatalf("bad: %#v %#v", eval, eval2)
}
}

func TestWorker_dequeueEvaluation_paused(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
Expand Down Expand Up @@ -101,7 +174,7 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) {

// Attempt dequeue
start := time.Now()
eval, token, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
if diff := time.Since(start); diff < 100*time.Millisecond {
t.Fatalf("should have paused: %v", diff)
}
Expand All @@ -111,6 +184,9 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) {
if token == "" {
t.Fatalf("should get token")
}
if waitIndex != eval1.ModifyIndex {
t.Fatalf("bad wait index; got %d; want %d", eval1.ModifyIndex)
}

// Ensure we get a sane eval
if !reflect.DeepEqual(eval, eval1) {
Expand All @@ -136,7 +212,7 @@ func TestWorker_dequeueEvaluation_shutdown(t *testing.T) {
}()

// Attempt dequeue
eval, _, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
eval, _, _, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
if !shutdown {
t.Fatalf("should not shutdown")
}
Expand Down Expand Up @@ -164,7 +240,7 @@ func TestWorker_sendAck(t *testing.T) {
w := &Worker{srv: s1, logger: s1.logger}

// Attempt dequeue
eval, token, _ := w.dequeueEvaluation(10 * time.Millisecond)
eval, token, _, _ := w.dequeueEvaluation(10 * time.Millisecond)

// Check the depth is 0, 1 unacked
stats := s1.evalBroker.Stats()
Expand All @@ -182,7 +258,7 @@ func TestWorker_sendAck(t *testing.T) {
}

// Attempt dequeue
eval, token, _ = w.dequeueEvaluation(10 * time.Millisecond)
eval, token, _, _ = w.dequeueEvaluation(10 * time.Millisecond)

// Send the Ack
w.sendAck(eval.ID, token, true)
Expand Down