Skip to content

Commit

Permalink
Merge pull request #1872 from hashicorp/f-dequeue
Browse files Browse the repository at this point in the history
Add scheduler version enforcement
  • Loading branch information
dadgar authored Oct 27, 2016
2 parents 01d47a8 + 0efab91 commit 42fb115
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 7 deletions.
7 changes: 7 additions & 0 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
"github.com/hashicorp/nomad/scheduler"
)

const (
Expand Down Expand Up @@ -77,6 +78,12 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
return fmt.Errorf("dequeue requires at least one scheduler type")
}

// Check that there isn't a scheduler version mismatch
if args.SchedulerVersion != scheduler.SchedulerVersion {
return fmt.Errorf("dequeue disallowed: calling scheduler version is %d; leader version is %d",
args.SchedulerVersion, scheduler.SchedulerVersion)
}

// Ensure there is a default timeout
if args.Timeout <= 0 {
args.Timeout = DefaultDequeueTimeout
Expand Down
32 changes: 30 additions & 2 deletions nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package nomad

import (
"reflect"
"strings"
"testing"
"time"

"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
"github.com/hashicorp/nomad/testutil"
)

Expand Down Expand Up @@ -142,8 +144,9 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {

// Dequeue the eval
get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
WriteRequest: structs.WriteRequest{Region: "global"},
Schedulers: defaultSched,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil {
Expand All @@ -164,6 +167,31 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
}
}

func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)

// Dequeue the eval
get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
SchedulerVersion: 0,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp)
if err == nil || !strings.Contains(err.Error(), "scheduler version is 0") {
t.Fatalf("err: %v", err)
}
}

func TestEvalEndpoint_Ack(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down
5 changes: 3 additions & 2 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,9 @@ type EvalAckRequest struct {

// EvalDequeueRequest is used when we want to dequeue an evaluation
type EvalDequeueRequest struct {
Schedulers []string
Timeout time.Duration
Schedulers []string
Timeout time.Duration
SchedulerVersion uint16
WriteRequest
}

Expand Down
20 changes: 17 additions & 3 deletions nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const (
// the slower backoff
backoffLimitSlow = 10 * time.Second

// backoffSchedulerVersionMismatch is the backoff between retries when the
// scheduler version mismatches that of the leader.
backoffSchedulerVersionMismatch = 30 * time.Second

// dequeueTimeout is used to timeout an evaluation dequeue so that
// we can check if there is a shutdown event
dequeueTimeout = 500 * time.Millisecond
Expand Down Expand Up @@ -134,8 +138,9 @@ func (w *Worker) run() {
func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, string, bool) {
// Setup the request
req := structs.EvalDequeueRequest{
Schedulers: w.srv.config.EnabledSchedulers,
Timeout: timeout,
Schedulers: w.srv.config.EnabledSchedulers,
Timeout: timeout,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{
Region: w.srv.config.Region,
},
Expand All @@ -154,7 +159,16 @@ REQ:
if time.Since(w.start) > dequeueErrGrace && !w.srv.IsShutdown() {
w.logger.Printf("[ERR] worker: failed to dequeue evaluation: %v", err)
}
if w.backoffErr(backoffBaselineSlow, backoffLimitSlow) {

// Adjust the backoff based on the error. If it is a scheduler version
// mismatch we increase the baseline.
base, limit := backoffBaselineFast, backoffLimitSlow
if strings.Contains(err.Error(), "calling scheduler version") {
base = backoffSchedulerVersionMismatch
limit = backoffSchedulerVersionMismatch
}

if w.backoffErr(base, limit) {
return nil, "", true
}
goto REQ
Expand Down
8 changes: 8 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

const (
// SchedulerVersion is the version of the scheduler. Changes to the
// scheduler that are incompatible with prior schedulers will increment this
// version. It is used to disallow dequeueing when the versions do not match
// across the leader and the dequeueing scheduler.
SchedulerVersion uint16 = 1
)

// BuiltinSchedulers contains the built in registered schedulers
// which are available
var BuiltinSchedulers = map[string]Factory{
Expand Down

0 comments on commit 42fb115

Please sign in to comment.