From a4cfb2b797e5f042a7cc0567d864d2b986c0c425 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Thu, 16 Jun 2016 12:14:03 -0700 Subject: [PATCH] Rework `server_auto_join` to use a timer instead of the peer count. It is perfectly viable for an admin to downsize a Nomad Server cluster down to 1, 2, or `num % 2 == 0` (however ill-advised such activities may be). And instead of using `bootstrap_expect`, use a timeout-based strategy. If the `bootstrapFn` hasn't observed a leader in 15s it will fall back to Consul and will poll every ~60s until it sees a leader. --- nomad/server.go | 119 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 95 insertions(+), 24 deletions(-) diff --git a/nomad/server.go b/nomad/server.go index 3067b0b95f4..ef0f530f1c2 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -30,12 +30,21 @@ import ( ) const ( - // datacenterQueryFactor sets the max number of DCs that a Nomad - // Server will query to find bootstrap_expect servers. If - // bootstrap_expect is 5, then the Nomad Server bootstrapFn handler - // will search through up to 15 Consul DCs to find possible Serf - // peers. - datacenterQueryFactor = 5 + // datacenterQueryLimit sets the max number of DCs that a Nomad + // Server will query to find bootstrap_expect servers. + datacenterQueryLimit = 25 + + // maxStaleLeadership is the maximum time we will permit this Nomad + // Server to go without seeing a valid Raft leader. + maxStaleLeadership = 15 * time.Second + + // peersPollInterval is used as the polling interval between attempts + // to query Consul for Nomad Servers. + peersPollInterval = 45 * time.Second + + // peersPollJitter is used to provide a slight amount of variance to + // the retry interval when querying Consul Servers + peersPollJitterFactor = 2 raftState = "raft/" serfSnapshot = "serf/snapshot" @@ -377,9 +386,31 @@ func (s *Server) Leave() error { return nil } -// setupConsulSyncer creates Server-mode consul.Syncer which periodically -// executes callbacks on a fixed interval. -func (s *Server) setupConsulSyncer() error { +// setupBootstrapHandler() creates the closure necessary to support a Consul +// fallback handler. +func (s *Server) setupBootstrapHandler() error { + // peersTimeout is used to indicate to the Consul Syncer that the + // current Nomad Server has a stale peer set. peersTimeout will time + // out if the Consul Syncer bootstrapFn has not observed a Raft + // leader in maxStaleLeadership. If peersTimeout has been triggered, + // the Consul Syncer will begin querying Consul for other Nomad + // Servers. + // + // NOTE: time.Timer is used vs time.Time in order to handle clock + // drift because time.Timer is implemented as a monotonic clock. + var peersTimeout *time.Timer = time.NewTimer(0) + + // leadershipTimedOut is a helper method that returns true if the + // peersTimeout timer has expired. + leadershipTimedOut := func() bool { + select { + case <-peersTimeout.C: + return true + default: + return false + } + } + // The bootstrapFn callback handler is used to periodically poll // Consul to look up the Nomad Servers in Consul. In the event the // server has been brought up without a `retry-join` configuration @@ -390,22 +421,46 @@ func (s *Server) setupConsulSyncer() error { bootstrapFn := func() error { // If there is a raft leader, do nothing if s.raft.Leader() != "" { + peersTimeout.Reset(maxStaleLeadership) return nil } - // If the the number of Raft peers is more than the min - // quorum, do nothing. - raftPeers, err := s.raftPeers.Peers() - minQuorum := (s.config.BootstrapExpect / 2) + 1 - if err == nil && len(raftPeers) >= minQuorum { - return nil + // (ab)use serf.go's behavior of setting BootstrapExpect to + // zero if we have bootstrapped. If we have bootstrapped + bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect) + if bootstrapExpect == 0 { + // This Nomad Server has been bootstrapped. Rely on + // timeouts to determine health. + + if !leadershipTimedOut() { + return nil + } + } else { + // This Nomad Server has not been bootstrapped, reach + // out to Consul if our peer list is less than + // `bootstrap_expect`. + raftPeers, err := s.raftPeers.Peers() + if err != nil { + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) + return nil + } + + // The necessary number of Nomad Servers required for + // quorum has been reached, we do not need to poll + // Consul. Let the normal timeout-based strategy + // take over. + if len(raftPeers) >= int(bootstrapExpect) { + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) + return nil + } } - s.logger.Printf("[TRACE] server.consul: lost contact with Nomad quorum, falling back to Consul for server list") + s.logger.Printf("[DEBUG] server.consul: lost contact with Nomad quorum, falling back to Consul for server list") consulCatalog := s.consulSyncer.ConsulClient().Catalog() dcs, err := consulCatalog.Datacenters() if err != nil { + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) return fmt.Errorf("server.consul: unable to query Consul datacenters: %v", err) } if len(dcs) > 2 { @@ -417,7 +472,7 @@ func (s *Server) setupConsulSyncer() error { nearestDC := dcs[0] otherDCs := make([]string, 0, len(dcs)) shuffleStrings(otherDCs) - otherDCs = dcs[1:lib.MinInt(len(dcs), s.config.BootstrapExpect*datacenterQueryFactor)] + otherDCs = dcs[1:lib.MinInt(len(dcs), datacenterQueryLimit)] dcs = append([]string{nearestDC}, otherDCs...) } @@ -427,16 +482,16 @@ func (s *Server) setupConsulSyncer() error { const defaultMaxNumNomadServers = 8 nomadServerServices := make([]string, 0, defaultMaxNumNomadServers) for _, dc := range dcs { - opts := &consulapi.QueryOptions{ + consulOpts := &consulapi.QueryOptions{ AllowStale: true, Datacenter: dc, Near: "_agent", WaitTime: consul.DefaultQueryWaitDuration, } - consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, opts) + consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, consulOpts) if err != nil { - s.logger.Printf("[TRACE] server.consul: failed to query dc %+q's service %+q: %v", dc, nomadServerServiceName, err) - mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", nomadServerServiceName, dc, err)) + s.logger.Printf("[WARN] server.consul: failed to query service %+q in Consul datacenter %+q: %v", nomadServerServiceName, dc, err) + mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %q from Consul datacenter %q: %v", nomadServerServiceName, dc, err)) continue } @@ -453,28 +508,44 @@ func (s *Server) setupConsulSyncer() error { if len(nomadServerServices) == 0 { if len(mErr.Errors) > 0 { + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) return mErr.ErrorOrNil() } // Log the error and return nil so future handlers // can attempt to register the `nomad` service. - s.logger.Printf("[TRACE] server.consul: no Nomad Servers advertising service %+q in Consul datacenters: %+q", nomadServerServiceName, dcs) + s.logger.Printf("[TRACE] server.consul: no Nomad Servers advertising service %+q in Consul datacenters %+q", nomadServerServiceName, dcs) + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) return nil } numServersContacted, err := s.Join(nomadServerServices) if err != nil { + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err) } + + peersTimeout.Reset(maxStaleLeadership) s.logger.Printf("[INFO] server.consul: successfully contacted %d Nomad Servers", numServersContacted) return nil } + + s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn) + return nil +} + +// setupConsulSyncer creates Server-mode consul.Syncer which periodically +// executes callbacks on a fixed interval. +func (s *Server) setupConsulSyncer() error { + var mErr multierror.Error if s.config.ConsulConfig.ServerAutoJoin { - s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn) + if err := s.setupBootstrapHandler(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } } - return nil + return mErr.ErrorOrNil() } // setupRPC is used to setup the RPC listener