Skip to content

Commit

Permalink
make version checks specific to region
Browse files Browse the repository at this point in the history
* One-time tokens are not replicated between regions, so we don't want to enforce
  that the version check across all of serf, just members in the same region.
* Scheduler: Disconnected clients handling is specific to a single region, so we
  don't want to enforce that the version check across all of serf, just members in
  the same region.
* Variables: enforce version check in Apply RPC
* Cleans up a bunch of legacy checks.

This changeset is specific to 1.4.x and the changes for previous versions of
Nomad will be manually backported in a separate PR.
  • Loading branch information
tgross committed Oct 17, 2022
1 parent f6838f6 commit ebc9d74
Show file tree
Hide file tree
Showing 13 changed files with 47 additions and 55 deletions.
9 changes: 9 additions & 0 deletions .changelog/14912.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
```release-note:bug
variables: Fixed a bug where Nomad version checking was not enforced for writing to variables
```
```release-note:bug
acl: Fixed a bug where Nomad version checking for one-time tokens was enforced across regions
```
```release-note:bug
scheduler: Fixed a bug where version checking for disconnected clients handling was enforced across regions
```
6 changes: 3 additions & 3 deletions nomad/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ func (a *ACL) UpsertOneTimeToken(args *structs.OneTimeTokenUpsertRequest, reply
defer metrics.MeasureSince(
[]string{"nomad", "acl", "upsert_one_time_token"}, time.Now())

if !ServersMeetMinimumVersion(a.srv.Members(), minOneTimeAuthenticationTokenVersion, false) {
if !ServersMeetMinimumVersion(a.srv.Members(), a.srv.Region(), minOneTimeAuthenticationTokenVersion, false) {
return fmt.Errorf("All servers should be running version %v or later to use one-time authentication tokens", minAutopilotVersion)
}

Expand Down Expand Up @@ -996,7 +996,7 @@ func (a *ACL) ExchangeOneTimeToken(args *structs.OneTimeTokenExchangeRequest, re
defer metrics.MeasureSince(
[]string{"nomad", "acl", "exchange_one_time_token"}, time.Now())

if !ServersMeetMinimumVersion(a.srv.Members(), minOneTimeAuthenticationTokenVersion, false) {
if !ServersMeetMinimumVersion(a.srv.Members(), a.srv.Region(), minOneTimeAuthenticationTokenVersion, false) {
return fmt.Errorf("All servers should be running version %v or later to use one-time authentication tokens", minAutopilotVersion)
}

Expand Down Expand Up @@ -1053,7 +1053,7 @@ func (a *ACL) ExpireOneTimeTokens(args *structs.OneTimeTokenExpireRequest, reply
defer metrics.MeasureSince(
[]string{"nomad", "acl", "expire_one_time_tokens"}, time.Now())

if !ServersMeetMinimumVersion(a.srv.Members(), minOneTimeAuthenticationTokenVersion, false) {
if !ServersMeetMinimumVersion(a.srv.Members(), a.srv.Region(), minOneTimeAuthenticationTokenVersion, false) {
return fmt.Errorf("All servers should be running version %v or later to use one-time authentication tokens", minAutopilotVersion)
}

Expand Down
2 changes: 1 addition & 1 deletion nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ OUTER:
func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) error {
// For old clusters, send single deregistration messages COMPAT(0.11)
minVersionBatchNodeDeregister := version.Must(version.NewVersion("0.9.4"))
if !ServersMeetMinimumVersion(c.srv.Members(), minVersionBatchNodeDeregister, true) {
if !ServersMeetMinimumVersion(c.srv.Members(), c.srv.Region(), minVersionBatchNodeDeregister, true) {
for _, id := range nodeIDs {
req := structs.NodeDeregisterRequest{
NodeID: id,
Expand Down
4 changes: 2 additions & 2 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis

// COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check to always set args.Eval
// 0.12.1 introduced atomic eval job registration
if eval != nil && ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) {
if eval != nil && ServersMeetMinimumVersion(j.srv.Members(), j.srv.Region(), minJobRegisterAtomicEvalVersion, false) {
args.Eval = eval
submittedEval = true
}
Expand Down Expand Up @@ -848,7 +848,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
}

// COMPAT(1.1.0): remove conditional and always set args.Eval
if ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) {
if ServersMeetMinimumVersion(j.srv.Members(), j.srv.Region(), minJobRegisterAtomicEvalVersion, false) {
args.Eval = eval
}

Expand Down
17 changes: 5 additions & 12 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobCSIVolumeClaimGC, index))
}
case <-oneTimeTokenGC.C:
if !ServersMeetMinimumVersion(s.Members(), minOneTimeAuthenticationTokenVersion, false) {
if !ServersMeetMinimumVersion(s.Members(), s.Region(), minOneTimeAuthenticationTokenVersion, false) {
continue
}

Expand Down Expand Up @@ -1922,7 +1922,7 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig {
return config
}

if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion, false) {
if !ServersMeetMinimumVersion(s.Members(), AllRegions, minAutopilotVersion, false) {
s.logger.Named("autopilot").Warn("can't initialize until all servers are above minimum version", "min_version", minAutopilotVersion)
return nil
}
Expand All @@ -1949,7 +1949,7 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration {
if config != nil {
return config
}
if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion, false) {
if !ServersMeetMinimumVersion(s.Members(), s.Region(), minSchedulerConfigVersion, false) {
s.logger.Named("core").Warn("can't initialize scheduler config until all servers are above minimum version", "min_version", minSchedulerConfigVersion)
return nil
}
Expand Down Expand Up @@ -1991,14 +1991,7 @@ func (s *Server) initializeKeyring(stopCh <-chan struct{}) {
default:
}

members := s.serf.Members()
regionMembers := []serf.Member{}
for _, member := range members {
if member.Tags["region"] == s.Region() {
regionMembers = append(regionMembers, member)
}
}
if ServersMeetMinimumVersion(regionMembers, minVersionKeyring, true) {
if ServersMeetMinimumVersion(s.serf.Members(), s.Region(), minVersionKeyring, true) {
break
}
}
Expand Down Expand Up @@ -2034,7 +2027,7 @@ func (s *Server) initializeKeyring(stopCh <-chan struct{}) {
}

func (s *Server) generateClusterID() (string, error) {
if !ServersMeetMinimumVersion(s.Members(), minClusterIDVersion, false) {
if !ServersMeetMinimumVersion(s.Members(), AllRegions, minClusterIDVersion, false) {
s.logger.Named("core").Warn("cannot initialize cluster ID until all servers are above minimum version", "min_version", minClusterIDVersion)
return "", fmt.Errorf("cluster ID cannot be created until all servers are above minimum version %s", minClusterIDVersion)
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
}

// All servers should be at or above 0.8.0 to apply this operatation
if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion, false) {
if !ServersMeetMinimumVersion(op.srv.Members(), op.srv.Region(), minAutopilotVersion, false) {
return fmt.Errorf("All servers should be running version %v to update autopilot config", minAutopilotVersion)
}

Expand Down Expand Up @@ -315,7 +315,7 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe
}

// All servers should be at or above 0.9.0 to apply this operation
if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion, false) {
if !ServersMeetMinimumVersion(op.srv.Members(), op.srv.Region(), minSchedulerConfigVersion, false) {
return fmt.Errorf("All servers should be running version %v to update scheduler config", minSchedulerConfigVersion)
}

Expand Down
2 changes: 1 addition & 1 deletion nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) {
eval.ModifyIndex = index

// COMPAT(1.1): Remove in 1.1.0 - 0.12.1 introduced atomic eval job registration
if !ServersMeetMinimumVersion(s.Members(), minJobRegisterAtomicEvalVersion, false) {
if !ServersMeetMinimumVersion(s.Members(), s.Region(), minJobRegisterAtomicEvalVersion, false) {
// Create a new evaluation
eval.JobModifyIndex = index
update := &structs.EvalUpdateRequest{
Expand Down
2 changes: 1 addition & 1 deletion nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap

preemptedJobIDs := make(map[structs.NamespacedID]struct{})

if ServersMeetMinimumVersion(p.Members(), MinVersionPlanNormalization, true) {
if ServersMeetMinimumVersion(p.Members(), p.Region(), MinVersionPlanNormalization, true) {
// Initialize the allocs request using the new optimized log entry format.
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
Expand Down
34 changes: 9 additions & 25 deletions nomad/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/serf/serf"
"golang.org/x/exp/slices"
)

const (
Expand Down Expand Up @@ -143,15 +144,20 @@ func isNomadServer(m serf.Member) (bool, *serverParts) {
return true, parts
}

const AllRegions = ""

// ServersMeetMinimumVersion returns whether the Nomad servers are at least on the
// given Nomad version. The checkFailedServers parameter specifies whether version
// for the failed servers should be verified.
func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version, checkFailedServers bool) bool {
func ServersMeetMinimumVersion(members []serf.Member, region string, minVersion *version.Version, checkFailedServers bool) bool {
for _, member := range members {
if valid, parts := isNomadServer(member); valid && (parts.Status == serf.StatusAlive || (checkFailedServers && parts.Status == serf.StatusFailed)) {
valid, parts := isNomadServer(member)
if valid &&
(parts.Region == region || region == AllRegions) &&
(parts.Status == serf.StatusAlive || (checkFailedServers && parts.Status == serf.StatusFailed)) {
// Check if the versions match - version.LessThan will return true for
// 0.8.0-rc1 < 0.8.0, so we want to ignore the metadata
versionsMatch := slicesMatch(minVersion.Segments(), parts.Build.Segments())
versionsMatch := slices.Equal(minVersion.Segments(), parts.Build.Segments())
if parts.Build.LessThan(minVersion) && !versionsMatch {
return false
}
Expand All @@ -161,28 +167,6 @@ func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Versio
return true
}

func slicesMatch(a, b []int) bool {
if a == nil && b == nil {
return true
}

if a == nil || b == nil {
return false
}

if len(a) != len(b) {
return false
}

for i := range a {
if a[i] != b[i] {
return false
}
}

return true
}

// shuffleStrings randomly shuffles the list of strings
func shuffleStrings(list []string) {
for i := range list {
Expand Down
6 changes: 3 additions & 3 deletions nomad/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestServersMeetMinimumVersionExcludingFailed(t *testing.T) {
}

for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, tc.ver, false)
result := ServersMeetMinimumVersion(tc.members, AllRegions, tc.ver, false)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestServersMeetMinimumVersionIncludingFailed(t *testing.T) {
}

for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, tc.ver, true)
result := ServersMeetMinimumVersion(tc.members, AllRegions, tc.ver, true)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestServersMeetMinimumVersionSuffix(t *testing.T) {
}

for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, tc.ver, true)
result := ServersMeetMinimumVersion(tc.members, AllRegions, tc.ver, true)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}
Expand Down
5 changes: 5 additions & 0 deletions nomad/variables_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ func (sv *Variables) Apply(args *structs.VariablesApplyRequest, reply *structs.V
args.Var.Namespace = targetNS
}

if !ServersMeetMinimumVersion(
sv.srv.serf.Members(), sv.srv.Region(), minVersionKeyring, true) {
return fmt.Errorf("all servers must be running version 1.4.0 or later to apply variables")
}

canRead, err := svePreApply(sv, args, args.Var)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func (w *Worker) invokeScheduler(snap *state.StateSnapshot, eval *structs.Evalua
// other packages to perform server version checks without direct references to
// the Nomad server.
func (w *Worker) ServersMeetMinimumVersion(minVersion *version.Version, checkFailedServers bool) bool {
return ServersMeetMinimumVersion(w.srv.Members(), minVersion, checkFailedServers)
return ServersMeetMinimumVersion(w.srv.Members(), w.srv.Region(), minVersion, checkFailedServers)
}

// SubmitPlan is used to submit a plan for consideration. This allows
Expand All @@ -606,7 +606,7 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler.
plan.SnapshotIndex = w.snapshotIndex

// Normalize stopped and preempted allocs before RPC
normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), MinVersionPlanNormalization, true)
normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), w.srv.Region(), MinVersionPlanNormalization, true)
if normalizePlan {
plan.NormalizeAllocations()
}
Expand Down
7 changes: 4 additions & 3 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ type Planner interface {
// that on leader changes, the evaluation will be reblocked properly.
ReblockEval(*structs.Evaluation) error

// ServersMeetMinimumVersion returns whether the Nomad servers are at least on the
// given Nomad version. The checkFailedServers parameter specifies whether version
// for the failed servers should be verified.
// ServersMeetMinimumVersion returns whether the Nomad servers in the
// worker's region are at least on the given Nomad version. The
// checkFailedServers parameter specifies whether version for the failed
// servers should be verified.
ServersMeetMinimumVersion(minVersion *version.Version, checkFailedServers bool) bool
}

0 comments on commit ebc9d74

Please sign in to comment.