From ea89976d9d7cadaf6e247ee8a06f2c7b1b440be0 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Mon, 13 Jun 2016 22:58:39 -0700 Subject: [PATCH 01/13] Teach Nomad servers how to fall back to Consul. --- command/agent/agent.go | 4 +- nomad/config.go | 4 ++ nomad/server.go | 106 ++++++++++++++++++++++++++++++++++++++++- nomad/server_test.go | 11 ++++- 4 files changed, 121 insertions(+), 4 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index b0e176aa033..860d10bd01f 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -235,7 +235,7 @@ func (a *Agent) serverConfig() (*nomad.Config, error) { return nil, fmt.Errorf("server_service_name must be set when auto_advertise is enabled") } - // conf.ConsulConfig = a.config.Consul + conf.ConsulConfig = a.config.Consul return conf, nil } @@ -379,7 +379,7 @@ func (a *Agent) setupServer() error { } // Create the server - server, err := nomad.NewServer(conf) + server, err := nomad.NewServer(conf, a.consulSyncer) if err != nil { return fmt.Errorf("server setup failed: %v", err) } diff --git a/nomad/config.go b/nomad/config.go index d998045d9ab..d6405c5f6e5 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/memberlist" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" @@ -176,6 +177,9 @@ type Config struct { // a new leader is elected, since we no longer know the status // of all the heartbeats. FailoverHeartbeatTTL time.Duration + + // ConsulConfig is this Agent's Consul configuration + ConsulConfig *config.ConsulConfig } // CheckVersion is used to check if the ProtocolVersion is valid diff --git a/nomad/server.go b/nomad/server.go index 1932d18b197..191cfc0a52b 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -16,7 +16,11 @@ import ( "sync" "time" + consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/tlsutil" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/raft" @@ -25,6 +29,12 @@ import ( ) const ( + // datacenterQueryFactor sets the max number of DCs that a Nomad + // Server will query to find bootstrap_expect servers. If + // bootstrap_expect is 3, then the Nomad Server bootstrapFn handler + // will search through up to 9 Consul DCs to find its quorum. + datacenterQueryFactor = 3 + raftState = "raft/" serfSnapshot = "serf/snapshot" snapshotsRetained = 2 @@ -116,6 +126,9 @@ type Server struct { heartbeatTimers map[string]*time.Timer heartbeatTimersLock sync.Mutex + // consulSyncer advertises this Nomad Agent with Consul + consulSyncer *consul.Syncer + // Worker used for processing workers []*Worker @@ -140,7 +153,7 @@ type endpoints struct { // NewServer is used to construct a new Nomad server from the // configuration, potentially returning an error -func NewServer(config *Config) (*Server, error) { +func NewServer(config *Config, consulSyncer *consul.Syncer) (*Server, error) { // Check the protocol version if err := config.CheckVersion(); err != nil { return nil, err @@ -172,6 +185,7 @@ func NewServer(config *Config) (*Server, error) { // Create the server s := &Server{ config: config, + consulSyncer: consulSyncer, connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, nil), logger: logger, rpcServer: rpc.NewServer(), @@ -218,6 +232,11 @@ func NewServer(config *Config) (*Server, error) { return nil, fmt.Errorf("Failed to start workers: %v", err) } + // Setup the Consul syncer + if err := s.setupConsulSyncer(); err != nil { + return nil, fmt.Errorf("failed to create server Consul syncer: %v") + } + // Monitor leadership changes go s.monitorLeadership() @@ -356,6 +375,91 @@ func (s *Server) Leave() error { return nil } +// setupConsulSyncer creates Server-mode consul.Syncer which periodically +// executes callbacks on a fixed interval. +func (s *Server) setupConsulSyncer() error { + // The bootstrapFn callback handler is used to periodically poll + // Consul to look up the Nomad Servers in Consul. In the event the + // server has been brought up without a `retry-join` configuration + // and this Server is partitioned from the rest of the cluster, + // periodically poll Consul to reattach this Server to other servers + // in the same region and automatically reform a quorum (assuming the + // correct number of servers required for quorum are present). + bootstrapFn := func() error { + // If the the number of Members in Serf is more than the + // bootstrap quorum, do nothing. + if len(s.Members()) < s.config.BootstrapExpect { + return nil + } + + s.logger.Printf("[TRACE] server.consul: lost contact with Nomad quorum, falling back to Consul for server list") + + consulCatalog := s.consulSyncer.ConsulClient().Catalog() + dcs, err := consulCatalog.Datacenters() + if err != nil { + return fmt.Errorf("server.consul: unable to query Consul datacenters: %v", err) + } + if len(dcs) > 2 { + // Query the local DC first, then shuffle the + // remaining DCs. If additional calls to bootstrapFn + // are necessary, this Nomad Server will eventually + // walk all datacenter until it finds enough hosts to + // form a quorum. + nearestDC := dcs[0] + otherDCs := make([]string, 0, len(dcs)) + otherDCs = dcs[1:lib.MinInt(len(dcs), s.config.BootstrapExpect*datacenterQueryFactor)] + shuffleStrings(otherDCs) + + dcs = append([]string{nearestDC}, otherDCs...) + } + + nomadServerServiceName := s.config.ConsulConfig.ServerServiceName + var mErr multierror.Error + const defaultMaxNumNomadServers = 8 + nomadServerServices := make([]string, 0, defaultMaxNumNomadServers) + for _, dc := range dcs { + opts := &consulapi.QueryOptions{ + AllowStale: true, + Datacenter: dc, + Near: "_agent", + WaitTime: consul.DefaultQueryWaitDuration, + } + consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, opts) + if err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", nomadServerServiceName, dc, err)) + continue + } + + for _, cs := range consulServices { + port := strconv.FormatInt(int64(cs.ServicePort), 10) + addr := cs.ServiceAddress + if addr == "" { + addr = cs.Address + } + serverAddr := net.JoinHostPort(addr, port) + nomadServerServices = append(nomadServerServices, serverAddr) + } + } + if len(nomadServerServices) == 0 { + if len(mErr.Errors) > 0 { + return mErr.ErrorOrNil() + } + + return fmt.Errorf("no Nomad Servers advertising service %+q in Consul datacenters: %+q", nomadServerServiceName, dcs) + } + numServersContacted, err := s.Join(nomadServerServices) + if err != nil { + return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err) + } + s.logger.Printf("[INFO] successfully contacted %d Nomad Servers", numServersContacted) + + return nil + } + s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn) + + return nil +} + // setupRPC is used to setup the RPC listener func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { // Create endpoints diff --git a/nomad/server_test.go b/nomad/server_test.go index a44cb88da1f..e93f63305d0 100644 --- a/nomad/server_test.go +++ b/nomad/server_test.go @@ -3,11 +3,14 @@ package nomad import ( "fmt" "io/ioutil" + "log" "net" + "os" "sync/atomic" "testing" "time" + "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/testutil" ) @@ -63,8 +66,14 @@ func testServer(t *testing.T, cb func(*Config)) *Server { // Enable raft as leader if we have bootstrap on config.RaftConfig.StartAsLeader = !config.DevDisableBootstrap + shutdownCh := make(chan struct{}) + consulSyncer, err := consul.NewSyncer(config.ConsulConfig, shutdownCh, log.New(os.Stderr, "", log.LstdFlags)) + if err != nil { + t.Fatalf("err: %v", err) + } + // Create server - server, err := NewServer(config) + server, err := NewServer(config, consulSyncer) if err != nil { t.Fatalf("err: %v", err) } From c8ac6524ae238d2d8843e05069aaed3d3778cad9 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Tue, 14 Jun 2016 00:07:04 -0700 Subject: [PATCH 02/13] Short-circuit the bootstrapFn if we have a leader --- nomad/server.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/nomad/server.go b/nomad/server.go index 191cfc0a52b..a14664854d8 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -386,9 +386,16 @@ func (s *Server) setupConsulSyncer() error { // in the same region and automatically reform a quorum (assuming the // correct number of servers required for quorum are present). bootstrapFn := func() error { - // If the the number of Members in Serf is more than the - // bootstrap quorum, do nothing. - if len(s.Members()) < s.config.BootstrapExpect { + // If there is a raft leader, do nothing + if s.raft.Leader() != "" { + return nil + } + + // If the the number of Raft peers or Members in Serf is more + // than the min quorum, do nothing. + raftPeers, err := s.raftPeers.Peers() + minQuorum := (s.config.BootstrapExpect / 2) + 1 + if err == nil && (len(raftPeers) >= minQuorum || len(s.Members()) >= minQuorum) { return nil } @@ -426,6 +433,7 @@ func (s *Server) setupConsulSyncer() error { } consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, opts) if err != nil { + s.logger.Printf("[TRACE] server.consul: failed to query dc %+q's service %+q: %v", dc, nomadServerServiceName, err) mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", nomadServerServiceName, dc, err)) continue } @@ -440,13 +448,18 @@ func (s *Server) setupConsulSyncer() error { nomadServerServices = append(nomadServerServices, serverAddr) } } + if len(nomadServerServices) == 0 { if len(mErr.Errors) > 0 { return mErr.ErrorOrNil() } - return fmt.Errorf("no Nomad Servers advertising service %+q in Consul datacenters: %+q", nomadServerServiceName, dcs) + // Log the error and return nil so future handlers + // can attempt to register the `nomad` service. + s.logger.Printf("[TRACE] server.consul: no Nomad Servers advertising service %+q in Consul datacenters: %+q", nomadServerServiceName, dcs) + return nil } + numServersContacted, err := s.Join(nomadServerServices) if err != nil { return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err) From 972b4fb3bce8ac6fdb075bdc56c49524f6c91489 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Tue, 14 Jun 2016 15:05:34 -0700 Subject: [PATCH 03/13] Bump various Consul search limits Client: Search limit increased from 4 random DCs to 8 random DCs, plus nearest. Server: Search factor increased from 3 to 5 times the bootstrap_expect. This should allow for faster convergence in large environments (e.g. sub-5min for 10K Consul DCs). --- client/client.go | 2 +- nomad/server.go | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/client/client.go b/client/client.go index bd82bb33d24..687ff4fa5d4 100644 --- a/client/client.go +++ b/client/client.go @@ -39,7 +39,7 @@ const ( // datacenterQueryLimit searches through up to this many adjacent // datacenters looking for the Nomad server service. - datacenterQueryLimit = 5 + datacenterQueryLimit = 9 // registerRetryIntv is minimum interval on which we retry // registration. We pick a value between this and 2x this. diff --git a/nomad/server.go b/nomad/server.go index a14664854d8..3bb8c460ba2 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -31,9 +31,10 @@ import ( const ( // datacenterQueryFactor sets the max number of DCs that a Nomad // Server will query to find bootstrap_expect servers. If - // bootstrap_expect is 3, then the Nomad Server bootstrapFn handler - // will search through up to 9 Consul DCs to find its quorum. - datacenterQueryFactor = 3 + // bootstrap_expect is 5, then the Nomad Server bootstrapFn handler + // will search through up to 15 Consul DCs to find possible Serf + // peers. + datacenterQueryFactor = 5 raftState = "raft/" serfSnapshot = "serf/snapshot" From d7a0401d725c6d3eb1d9f54432c0014f936f9634 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Tue, 14 Jun 2016 15:07:34 -0700 Subject: [PATCH 04/13] Shuffle all datacenters vs only the nearest N datacenters. Per discussion, we want to be aggressive about fanning out vs possibly fixating on only local DCs. With RPC forwarding in place, a random walk may be less optimal from a network latency perspective, but it is guaranteed to eventually result in a converged state because all DCs are candidates during the bootstrapping process. --- client/client.go | 2 +- nomad/server.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client/client.go b/client/client.go index 687ff4fa5d4..a7583be2592 100644 --- a/client/client.go +++ b/client/client.go @@ -1255,8 +1255,8 @@ func (c *Client) setupConsulSyncer() error { // a new set of servers so it's okay. nearestDC := dcs[0] otherDCs := make([]string, 0, len(dcs)) - otherDCs = dcs[1:lib.MinInt(len(dcs), datacenterQueryLimit)] shuffleStrings(otherDCs) + otherDCs = dcs[1:lib.MinInt(len(dcs), datacenterQueryLimit)] dcs = append([]string{nearestDC}, otherDCs...) } diff --git a/nomad/server.go b/nomad/server.go index 3bb8c460ba2..5c063a88079 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -415,8 +415,8 @@ func (s *Server) setupConsulSyncer() error { // form a quorum. nearestDC := dcs[0] otherDCs := make([]string, 0, len(dcs)) - otherDCs = dcs[1:lib.MinInt(len(dcs), s.config.BootstrapExpect*datacenterQueryFactor)] shuffleStrings(otherDCs) + otherDCs = dcs[1:lib.MinInt(len(dcs), s.config.BootstrapExpect*datacenterQueryFactor)] dcs = append([]string{nearestDC}, otherDCs...) } From ecd13f86c23da5d5059dccc7f3b59e00fc66529f Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Tue, 14 Jun 2016 15:27:15 -0700 Subject: [PATCH 05/13] Guard the auto-join functionality behind its `consul.server_auto_join` tunable --- nomad/server.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nomad/server.go b/nomad/server.go index 5c063a88079..f2d522638c3 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -469,7 +469,9 @@ func (s *Server) setupConsulSyncer() error { return nil } - s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn) + if s.config.ConsulConfig.ServerAutoJoin { + s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn) + } return nil } From 3daf51489ba3f94ec9f18bdaaca6f7e808dd5b95 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Tue, 14 Jun 2016 16:29:02 -0700 Subject: [PATCH 06/13] Do not consider the number of Serf members when considering falling back to Consul. --- nomad/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nomad/server.go b/nomad/server.go index f2d522638c3..c6e48710b12 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -392,11 +392,11 @@ func (s *Server) setupConsulSyncer() error { return nil } - // If the the number of Raft peers or Members in Serf is more - // than the min quorum, do nothing. + // If the the number of Raft peers is more than the min + // quorum, do nothing. raftPeers, err := s.raftPeers.Peers() minQuorum := (s.config.BootstrapExpect / 2) + 1 - if err == nil && (len(raftPeers) >= minQuorum || len(s.Members()) >= minQuorum) { + if err == nil && len(raftPeers) >= minQuorum { return nil } From fa26e1ff64df52a08ffe2f221b9bc5b1f82d0407 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Tue, 14 Jun 2016 16:30:01 -0700 Subject: [PATCH 07/13] Namespace the log messages --- nomad/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/server.go b/nomad/server.go index c6e48710b12..5ca922530e7 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -465,7 +465,7 @@ func (s *Server) setupConsulSyncer() error { if err != nil { return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err) } - s.logger.Printf("[INFO] successfully contacted %d Nomad Servers", numServersContacted) + s.logger.Printf("[INFO] server.consul: successfully contacted %d Nomad Servers", numServersContacted) return nil } From bd9fbd24c070995c5c44f2c5d95621e5649a7a4e Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Tue, 14 Jun 2016 16:30:30 -0700 Subject: [PATCH 08/13] Use the config's log output --- nomad/server_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nomad/server_test.go b/nomad/server_test.go index e93f63305d0..d82e5065db5 100644 --- a/nomad/server_test.go +++ b/nomad/server_test.go @@ -5,7 +5,6 @@ import ( "io/ioutil" "log" "net" - "os" "sync/atomic" "testing" "time" @@ -67,7 +66,7 @@ func testServer(t *testing.T, cb func(*Config)) *Server { config.RaftConfig.StartAsLeader = !config.DevDisableBootstrap shutdownCh := make(chan struct{}) - consulSyncer, err := consul.NewSyncer(config.ConsulConfig, shutdownCh, log.New(os.Stderr, "", log.LstdFlags)) + consulSyncer, err := consul.NewSyncer(config.ConsulConfig, shutdownCh, log.New(config.LogOutput, "", log.LstdFlags)) if err != nil { t.Fatalf("err: %v", err) } From a915cdc3378f1b549402879bb344895f3b795b59 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Thu, 16 Jun 2016 11:03:51 -0700 Subject: [PATCH 09/13] Use the `%+q` verb in log messages (vs `%q`). --- client/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index a7583be2592..241bca436ed 100644 --- a/client/client.go +++ b/client/client.go @@ -1272,7 +1272,7 @@ func (c *Client) setupConsulSyncer() error { var mErr multierror.Error const defaultMaxNumNomadServers = 8 nomadServerServices := make([]string, 0, defaultMaxNumNomadServers) - c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %q", dcs) + c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %+q", dcs) for _, dc := range dcs { consulOpts := &consulapi.QueryOptions{ AllowStale: true, From 4d149888cc2e804889bfe74df28d36073850f9c1 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Thu, 16 Jun 2016 12:00:15 -0700 Subject: [PATCH 10/13] Define `BootstrapExepct` as an `int32` so it can be manipulated atomically. --- command/agent/agent.go | 3 ++- nomad/config.go | 5 +++-- nomad/serf.go | 18 +++++++++++------- nomad/server.go | 6 ++++-- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 860d10bd01f..f971ff407a6 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/hashicorp/nomad/client" @@ -110,7 +111,7 @@ func (a *Agent) serverConfig() (*nomad.Config, error) { if a.config.Server.BootstrapExpect == 1 { conf.Bootstrap = true } else { - conf.BootstrapExpect = a.config.Server.BootstrapExpect + atomic.StoreInt32(&conf.BootstrapExpect, int32(a.config.Server.BootstrapExpect)) } } if a.config.DataDir != "" { diff --git a/nomad/config.go b/nomad/config.go index d6405c5f6e5..ff91ac3a294 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -52,8 +52,9 @@ type Config struct { // BootstrapExpect mode is used to automatically bring up a // collection of Nomad servers. This can be used to automatically - // bring up a collection of nodes. - BootstrapExpect int + // bring up a collection of nodes. All operations on BootstrapExpect + // must be handled via `atomic.*Int32()` calls. + BootstrapExpect int32 // DataDir is the directory to store our state in DataDir string diff --git a/nomad/serf.go b/nomad/serf.go index 9678dbf2a4a..8f1b6cdaf6f 100644 --- a/nomad/serf.go +++ b/nomad/serf.go @@ -1,6 +1,10 @@ package nomad -import "github.com/hashicorp/serf/serf" +import ( + "sync/atomic" + + "github.com/hashicorp/serf/serf" +) const ( // StatusReap is used to update the status of a node if we @@ -66,7 +70,7 @@ func (s *Server) nodeJoin(me serf.MemberEvent) { s.peerLock.Unlock() // If we still expecting to bootstrap, may need to handle this - if s.config.BootstrapExpect != 0 { + if atomic.LoadInt32(&s.config.BootstrapExpect) != 0 { s.maybeBootstrap() } } @@ -91,7 +95,7 @@ func (s *Server) maybeBootstrap() { // Bootstrap can only be done if there are no committed logs, // remove our expectations of bootstrapping if index != 0 { - s.config.BootstrapExpect = 0 + atomic.StoreInt32(&s.config.BootstrapExpect, 0) return } @@ -106,7 +110,7 @@ func (s *Server) maybeBootstrap() { if p.Region != s.config.Region { continue } - if p.Expect != 0 && p.Expect != s.config.BootstrapExpect { + if p.Expect != 0 && p.Expect != int(atomic.LoadInt32(&s.config.BootstrapExpect)) { s.logger.Printf("[ERR] nomad: peer %v has a conflicting expect value. All nodes should expect the same number.", member) return } @@ -118,7 +122,7 @@ func (s *Server) maybeBootstrap() { } // Skip if we haven't met the minimum expect count - if len(addrs) < s.config.BootstrapExpect { + if len(addrs) < int(atomic.LoadInt32(&s.config.BootstrapExpect)) { return } @@ -128,8 +132,8 @@ func (s *Server) maybeBootstrap() { s.logger.Printf("[ERR] nomad: failed to bootstrap peers: %v", err) } - // Bootstrapping comlete, don't enter this again - s.config.BootstrapExpect = 0 + // Bootstrapping complete, don't enter this again + atomic.StoreInt32(&s.config.BootstrapExpect, 0) } // nodeFailed is used to handle fail events on the serf cluster diff --git a/nomad/server.go b/nomad/server.go index 5ca922530e7..3067b0b95f4 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -14,6 +14,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" consulapi "github.com/hashicorp/consul/api" @@ -651,8 +652,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( if s.config.Bootstrap || (s.config.DevMode && !s.config.DevDisableBootstrap) { conf.Tags["bootstrap"] = "1" } - if s.config.BootstrapExpect != 0 { - conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect) + bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect) + if bootstrapExpect != 0 { + conf.Tags["expect"] = fmt.Sprintf("%d", bootstrapExpect) } conf.MemberlistConfig.LogOutput = s.config.LogOutput conf.LogOutput = s.config.LogOutput From a4cfb2b797e5f042a7cc0567d864d2b986c0c425 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Thu, 16 Jun 2016 12:14:03 -0700 Subject: [PATCH 11/13] Rework `server_auto_join` to use a timer instead of the peer count. It is perfectly viable for an admin to downsize a Nomad Server cluster down to 1, 2, or `num % 2 == 0` (however ill-advised such activities may be). And instead of using `bootstrap_expect`, use a timeout-based strategy. If the `bootstrapFn` hasn't observed a leader in 15s it will fall back to Consul and will poll every ~60s until it sees a leader. --- nomad/server.go | 119 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 95 insertions(+), 24 deletions(-) diff --git a/nomad/server.go b/nomad/server.go index 3067b0b95f4..ef0f530f1c2 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -30,12 +30,21 @@ import ( ) const ( - // datacenterQueryFactor sets the max number of DCs that a Nomad - // Server will query to find bootstrap_expect servers. If - // bootstrap_expect is 5, then the Nomad Server bootstrapFn handler - // will search through up to 15 Consul DCs to find possible Serf - // peers. - datacenterQueryFactor = 5 + // datacenterQueryLimit sets the max number of DCs that a Nomad + // Server will query to find bootstrap_expect servers. + datacenterQueryLimit = 25 + + // maxStaleLeadership is the maximum time we will permit this Nomad + // Server to go without seeing a valid Raft leader. + maxStaleLeadership = 15 * time.Second + + // peersPollInterval is used as the polling interval between attempts + // to query Consul for Nomad Servers. + peersPollInterval = 45 * time.Second + + // peersPollJitter is used to provide a slight amount of variance to + // the retry interval when querying Consul Servers + peersPollJitterFactor = 2 raftState = "raft/" serfSnapshot = "serf/snapshot" @@ -377,9 +386,31 @@ func (s *Server) Leave() error { return nil } -// setupConsulSyncer creates Server-mode consul.Syncer which periodically -// executes callbacks on a fixed interval. -func (s *Server) setupConsulSyncer() error { +// setupBootstrapHandler() creates the closure necessary to support a Consul +// fallback handler. +func (s *Server) setupBootstrapHandler() error { + // peersTimeout is used to indicate to the Consul Syncer that the + // current Nomad Server has a stale peer set. peersTimeout will time + // out if the Consul Syncer bootstrapFn has not observed a Raft + // leader in maxStaleLeadership. If peersTimeout has been triggered, + // the Consul Syncer will begin querying Consul for other Nomad + // Servers. + // + // NOTE: time.Timer is used vs time.Time in order to handle clock + // drift because time.Timer is implemented as a monotonic clock. + var peersTimeout *time.Timer = time.NewTimer(0) + + // leadershipTimedOut is a helper method that returns true if the + // peersTimeout timer has expired. + leadershipTimedOut := func() bool { + select { + case <-peersTimeout.C: + return true + default: + return false + } + } + // The bootstrapFn callback handler is used to periodically poll // Consul to look up the Nomad Servers in Consul. In the event the // server has been brought up without a `retry-join` configuration @@ -390,22 +421,46 @@ func (s *Server) setupConsulSyncer() error { bootstrapFn := func() error { // If there is a raft leader, do nothing if s.raft.Leader() != "" { + peersTimeout.Reset(maxStaleLeadership) return nil } - // If the the number of Raft peers is more than the min - // quorum, do nothing. - raftPeers, err := s.raftPeers.Peers() - minQuorum := (s.config.BootstrapExpect / 2) + 1 - if err == nil && len(raftPeers) >= minQuorum { - return nil + // (ab)use serf.go's behavior of setting BootstrapExpect to + // zero if we have bootstrapped. If we have bootstrapped + bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect) + if bootstrapExpect == 0 { + // This Nomad Server has been bootstrapped. Rely on + // timeouts to determine health. + + if !leadershipTimedOut() { + return nil + } + } else { + // This Nomad Server has not been bootstrapped, reach + // out to Consul if our peer list is less than + // `bootstrap_expect`. + raftPeers, err := s.raftPeers.Peers() + if err != nil { + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) + return nil + } + + // The necessary number of Nomad Servers required for + // quorum has been reached, we do not need to poll + // Consul. Let the normal timeout-based strategy + // take over. + if len(raftPeers) >= int(bootstrapExpect) { + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) + return nil + } } - s.logger.Printf("[TRACE] server.consul: lost contact with Nomad quorum, falling back to Consul for server list") + s.logger.Printf("[DEBUG] server.consul: lost contact with Nomad quorum, falling back to Consul for server list") consulCatalog := s.consulSyncer.ConsulClient().Catalog() dcs, err := consulCatalog.Datacenters() if err != nil { + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) return fmt.Errorf("server.consul: unable to query Consul datacenters: %v", err) } if len(dcs) > 2 { @@ -417,7 +472,7 @@ func (s *Server) setupConsulSyncer() error { nearestDC := dcs[0] otherDCs := make([]string, 0, len(dcs)) shuffleStrings(otherDCs) - otherDCs = dcs[1:lib.MinInt(len(dcs), s.config.BootstrapExpect*datacenterQueryFactor)] + otherDCs = dcs[1:lib.MinInt(len(dcs), datacenterQueryLimit)] dcs = append([]string{nearestDC}, otherDCs...) } @@ -427,16 +482,16 @@ func (s *Server) setupConsulSyncer() error { const defaultMaxNumNomadServers = 8 nomadServerServices := make([]string, 0, defaultMaxNumNomadServers) for _, dc := range dcs { - opts := &consulapi.QueryOptions{ + consulOpts := &consulapi.QueryOptions{ AllowStale: true, Datacenter: dc, Near: "_agent", WaitTime: consul.DefaultQueryWaitDuration, } - consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, opts) + consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, consulOpts) if err != nil { - s.logger.Printf("[TRACE] server.consul: failed to query dc %+q's service %+q: %v", dc, nomadServerServiceName, err) - mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", nomadServerServiceName, dc, err)) + s.logger.Printf("[WARN] server.consul: failed to query service %+q in Consul datacenter %+q: %v", nomadServerServiceName, dc, err) + mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %q from Consul datacenter %q: %v", nomadServerServiceName, dc, err)) continue } @@ -453,28 +508,44 @@ func (s *Server) setupConsulSyncer() error { if len(nomadServerServices) == 0 { if len(mErr.Errors) > 0 { + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) return mErr.ErrorOrNil() } // Log the error and return nil so future handlers // can attempt to register the `nomad` service. - s.logger.Printf("[TRACE] server.consul: no Nomad Servers advertising service %+q in Consul datacenters: %+q", nomadServerServiceName, dcs) + s.logger.Printf("[TRACE] server.consul: no Nomad Servers advertising service %+q in Consul datacenters %+q", nomadServerServiceName, dcs) + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) return nil } numServersContacted, err := s.Join(nomadServerServices) if err != nil { + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err) } + + peersTimeout.Reset(maxStaleLeadership) s.logger.Printf("[INFO] server.consul: successfully contacted %d Nomad Servers", numServersContacted) return nil } + + s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn) + return nil +} + +// setupConsulSyncer creates Server-mode consul.Syncer which periodically +// executes callbacks on a fixed interval. +func (s *Server) setupConsulSyncer() error { + var mErr multierror.Error if s.config.ConsulConfig.ServerAutoJoin { - s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn) + if err := s.setupBootstrapHandler(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } } - return nil + return mErr.ErrorOrNil() } // setupRPC is used to setup the RPC listener From 9cd44ce695ba0a3839f8d644e39f9fa56957c5e0 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Thu, 16 Jun 2016 14:27:10 -0700 Subject: [PATCH 12/13] Immediately query Consul upon initialization if we have no peers. Also don't attempt to join the Server with itself. --- nomad/server.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/nomad/server.go b/nomad/server.go index ef0f530f1c2..c7a689e071d 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -400,6 +400,10 @@ func (s *Server) setupBootstrapHandler() error { // drift because time.Timer is implemented as a monotonic clock. var peersTimeout *time.Timer = time.NewTimer(0) + // consulQueryCount is the number of times the bootstrapFn has been + // called, regardless of success. + var consulQueryCount uint64 + // leadershipTimedOut is a helper method that returns true if the // peersTimeout timer has expired. leadershipTimedOut := func() bool { @@ -430,12 +434,16 @@ func (s *Server) setupBootstrapHandler() error { bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect) if bootstrapExpect == 0 { // This Nomad Server has been bootstrapped. Rely on - // timeouts to determine health. - + // the peersTimeout firing as a guard to prevent + // aggressive querying of Consul. if !leadershipTimedOut() { return nil } } else { + if consulQueryCount > 0 && !leadershipTimedOut() { + return nil + } + // This Nomad Server has not been bootstrapped, reach // out to Consul if our peer list is less than // `bootstrap_expect`. @@ -454,6 +462,7 @@ func (s *Server) setupBootstrapHandler() error { return nil } } + consulQueryCount++ s.logger.Printf("[DEBUG] server.consul: lost contact with Nomad quorum, falling back to Consul for server list") @@ -481,6 +490,7 @@ func (s *Server) setupBootstrapHandler() error { var mErr multierror.Error const defaultMaxNumNomadServers = 8 nomadServerServices := make([]string, 0, defaultMaxNumNomadServers) + localNode := s.serf.Memberlist().LocalNode() for _, dc := range dcs { consulOpts := &consulapi.QueryOptions{ AllowStale: true, @@ -501,6 +511,9 @@ func (s *Server) setupBootstrapHandler() error { if addr == "" { addr = cs.Address } + if localNode.Addr.String() == addr && int(localNode.Port) == cs.ServicePort { + continue + } serverAddr := net.JoinHostPort(addr, port) nomadServerServices = append(nomadServerServices, serverAddr) } @@ -514,8 +527,9 @@ func (s *Server) setupBootstrapHandler() error { // Log the error and return nil so future handlers // can attempt to register the `nomad` service. - s.logger.Printf("[TRACE] server.consul: no Nomad Servers advertising service %+q in Consul datacenters %+q", nomadServerServiceName, dcs) - peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) + pollInterval := peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor) + s.logger.Printf("[TRACE] server.consul: no Nomad Servers advertising service %+q in Consul datacenters %+q, sleeping for %v", nomadServerServiceName, dcs, pollInterval) + peersTimeout.Reset(pollInterval) return nil } From b82f872a821e3e340c4e7f91589f5ecd480fff83 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Thu, 16 Jun 2016 14:40:09 -0700 Subject: [PATCH 13/13] Fix up various error handling --- nomad/server.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/nomad/server.go b/nomad/server.go index c7a689e071d..d604ef6033e 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -500,8 +500,9 @@ func (s *Server) setupBootstrapHandler() error { } consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, consulOpts) if err != nil { - s.logger.Printf("[WARN] server.consul: failed to query service %+q in Consul datacenter %+q: %v", nomadServerServiceName, dc, err) - mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %q from Consul datacenter %q: %v", nomadServerServiceName, dc, err)) + err := fmt.Errorf("failed to query service %q in Consul datacenter %q: %v", nomadServerServiceName, dc, err) + s.logger.Printf("[WARN] server.consul: %v", err) + mErr.Errors = append(mErr.Errors, err) continue } @@ -552,14 +553,13 @@ func (s *Server) setupBootstrapHandler() error { // setupConsulSyncer creates Server-mode consul.Syncer which periodically // executes callbacks on a fixed interval. func (s *Server) setupConsulSyncer() error { - var mErr multierror.Error if s.config.ConsulConfig.ServerAutoJoin { if err := s.setupBootstrapHandler(); err != nil { - mErr.Errors = append(mErr.Errors, err) + return err } } - return mErr.ErrorOrNil() + return nil } // setupRPC is used to setup the RPC listener