Skip to content

Commit

Permalink
Add scheduler version enforcement
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Oct 26, 2016
1 parent 00a816c commit 962f4d4
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 6 deletions.
1 change: 1 addition & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func DefaultConfig() *Config {
ConsulConfig: config.DefaultConsulConfig(),
VaultConfig: config.DefaultVaultConfig(),
RPCHoldTimeout: 5 * time.Second,
TLSConfig: &config.TLSConfig{},
}

// Enable all known schedulers by default
Expand Down
7 changes: 7 additions & 0 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
"github.com/hashicorp/nomad/scheduler"
)

const (
Expand Down Expand Up @@ -77,6 +78,12 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
return fmt.Errorf("dequeue requires at least one scheduler type")
}

// Check that there isn't a scheduler version mismatch
if args.SchedulerVersion != scheduler.SchedulerVersion {
return fmt.Errorf("dequeue disallowed: calling scheduler version is %d; leader version is %d",
args.SchedulerVersion, scheduler.SchedulerVersion)
}

// Ensure there is a default timeout
if args.Timeout <= 0 {
args.Timeout = DefaultDequeueTimeout
Expand Down
32 changes: 30 additions & 2 deletions nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package nomad

import (
"reflect"
"strings"
"testing"
"time"

"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
"github.com/hashicorp/nomad/testutil"
)

Expand Down Expand Up @@ -142,8 +144,9 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {

// Dequeue the eval
get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
WriteRequest: structs.WriteRequest{Region: "global"},
Schedulers: defaultSched,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil {
Expand All @@ -164,6 +167,31 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
}
}

func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)

// Dequeue the eval
get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
SchedulerVersion: 0,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp)
if err == nil || !strings.Contains(err.Error(), "scheduler version is 0") {
t.Fatalf("err: %v", err)
}
}

func TestEvalEndpoint_Ack(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down
5 changes: 3 additions & 2 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,9 @@ type EvalAckRequest struct {

// EvalDequeueRequest is used when we want to dequeue an evaluation
type EvalDequeueRequest struct {
Schedulers []string
Timeout time.Duration
Schedulers []string
Timeout time.Duration
SchedulerVersion uint16
WriteRequest
}

Expand Down
5 changes: 3 additions & 2 deletions nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ func (w *Worker) run() {
func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, string, bool) {
// Setup the request
req := structs.EvalDequeueRequest{
Schedulers: w.srv.config.EnabledSchedulers,
Timeout: timeout,
Schedulers: w.srv.config.EnabledSchedulers,
Timeout: timeout,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{
Region: w.srv.config.Region,
},
Expand Down
8 changes: 8 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

const (
// SchedulerVersion is the version of the scheduler. Changes to the
// scheduler that are incompatible with prior schedulers will increment this
// version. It is used to disallow dequeueing when the versions do not match
// across the leader and the dequeueing scheduler.
SchedulerVersion uint16 = 1
)

// BuiltinSchedulers contains the built in registered schedulers
// which are available
var BuiltinSchedulers = map[string]Factory{
Expand Down

0 comments on commit 962f4d4

Please sign in to comment.