Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scheduler version enforcement #1872

Merged
merged 2 commits into from
Oct 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func DefaultConfig() *Config {
ConsulConfig: config.DefaultConsulConfig(),
VaultConfig: config.DefaultVaultConfig(),
RPCHoldTimeout: 5 * time.Second,
TLSConfig: &config.TLSConfig{},
}

// Enable all known schedulers by default
Expand Down
7 changes: 7 additions & 0 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
"github.com/hashicorp/nomad/scheduler"
)

const (
Expand Down Expand Up @@ -77,6 +78,12 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
return fmt.Errorf("dequeue requires at least one scheduler type")
}

// Check that there isn't a scheduler version mismatch
if args.SchedulerVersion != scheduler.SchedulerVersion {
return fmt.Errorf("dequeue disallowed: calling scheduler version is %d; leader version is %d",
args.SchedulerVersion, scheduler.SchedulerVersion)
}

// Ensure there is a default timeout
if args.Timeout <= 0 {
args.Timeout = DefaultDequeueTimeout
Expand Down
32 changes: 30 additions & 2 deletions nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package nomad

import (
"reflect"
"strings"
"testing"
"time"

"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
"github.com/hashicorp/nomad/testutil"
)

Expand Down Expand Up @@ -142,8 +144,9 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {

// Dequeue the eval
get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
WriteRequest: structs.WriteRequest{Region: "global"},
Schedulers: defaultSched,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil {
Expand All @@ -164,6 +167,31 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
}
}

func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)

// Dequeue the eval
get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
SchedulerVersion: 0,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp)
if err == nil || !strings.Contains(err.Error(), "scheduler version is 0") {
t.Fatalf("err: %v", err)
}
}

func TestEvalEndpoint_Ack(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down
5 changes: 3 additions & 2 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,9 @@ type EvalAckRequest struct {

// EvalDequeueRequest is used when we want to dequeue an evaluation
type EvalDequeueRequest struct {
Schedulers []string
Timeout time.Duration
Schedulers []string
Timeout time.Duration
SchedulerVersion uint16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something about the uint16 feels dirty...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gives us a lot of versions while keeping it low overhead. We will never pass 65536 version bumps.

WriteRequest
}

Expand Down
20 changes: 17 additions & 3 deletions nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const (
// the slower backoff
backoffLimitSlow = 10 * time.Second

// backoffSchedulerVersionMismatch is the backoff between retries when the
// scheduler version mismatches that of the leader.
backoffSchedulerVersionMismatch = 30 * time.Second

// dequeueTimeout is used to timeout an evaluation dequeue so that
// we can check if there is a shutdown event
dequeueTimeout = 500 * time.Millisecond
Expand Down Expand Up @@ -134,8 +138,9 @@ func (w *Worker) run() {
func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, string, bool) {
// Setup the request
req := structs.EvalDequeueRequest{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The worker should probably not sit in a hot loop of calling dequeue on this error, e.g. the failure is basically semi-permanent.

Copy link
Contributor Author

@dadgar dadgar Oct 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I was thinking we want it to spam the logs so the operator knows they are doing something bad. On the error we start backing off:

nomad/nomad/worker.go

Lines 146 to 163 in 962f4d4

REQ:
// Check if we are paused
w.checkPaused()
// Make a blocking RPC
start := time.Now()
err := w.srv.RPC("Eval.Dequeue", &req, &resp)
metrics.MeasureSince([]string{"nomad", "worker", "dequeue_eval"}, start)
if err != nil {
if time.Since(w.start) > dequeueErrGrace && !w.srv.IsShutdown() {
w.logger.Printf("[ERR] worker: failed to dequeue evaluation: %v", err)
}
if w.backoffErr(backoffBaselineSlow, backoffLimitSlow) {
return nil, "", true
}
goto REQ
}
w.backoffReset()

Schedulers: w.srv.config.EnabledSchedulers,
Timeout: timeout,
Schedulers: w.srv.config.EnabledSchedulers,
Timeout: timeout,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{
Region: w.srv.config.Region,
},
Expand All @@ -154,7 +159,16 @@ REQ:
if time.Since(w.start) > dequeueErrGrace && !w.srv.IsShutdown() {
w.logger.Printf("[ERR] worker: failed to dequeue evaluation: %v", err)
}
if w.backoffErr(backoffBaselineSlow, backoffLimitSlow) {

// Adjust the backoff based on the error. If it is a scheduler version
// mismatch we increase the baseline.
base, limit := backoffBaselineFast, backoffLimitSlow
if strings.Contains(err.Error(), "calling scheduler version") {
base = backoffSchedulerVersionMismatch
limit = backoffSchedulerVersionMismatch
}

if w.backoffErr(base, limit) {
return nil, "", true
}
goto REQ
Expand Down
8 changes: 8 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

const (
// SchedulerVersion is the version of the scheduler. Changes to the
// scheduler that are incompatible with prior schedulers will increment this
// version. It is used to disallow dequeueing when the versions do not match
// across the leader and the dequeueing scheduler.
SchedulerVersion uint16 = 1
)

// BuiltinSchedulers contains the built in registered schedulers
// which are available
var BuiltinSchedulers = map[string]Factory{
Expand Down