Skip to content

Commit

Permalink
move retry-join specific fields to the agent config
Browse files Browse the repository at this point in the history
  • Loading branch information
chelseakomlo committed May 10, 2018
1 parent 92af6a5 commit 47ef99e
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 119 deletions.
35 changes: 8 additions & 27 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/boltdb/bolt"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
discover "github.com/hashicorp/go-discover"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
Expand Down Expand Up @@ -281,7 +280,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
// Set the preconfigured list of static servers
c.configLock.RLock()
if len(c.configCopy.Servers) > 0 {
if err := c.setServersImpl(c.configCopy.Servers, true); err != nil {
if _, err := c.setServersImpl(c.configCopy.Servers, true); err != nil {
logger.Printf("[WARN] client: None of the configured servers are valid: %v", err)
}
}
Expand Down Expand Up @@ -616,29 +615,17 @@ func (c *Client) GetServers() []string {

// SetServers sets a new list of nomad servers to connect to. As long as one
// server is resolvable no error is returned.
func (c *Client) SetServers(in []string) error {
func (c *Client) SetServers(in []string) (int, error) {
return c.setServersImpl(in, false)
}

func (c *Client) getServerAddr(srv string) (string, error) {
if strings.HasPrefix(srv, "provider=") {
disc := &discover.Discover{}
discoveredServer, err := disc.Addrs(srv, c.logger)
if err != nil || len(discoveredServer) != 1 {
return "", err
}
return discoveredServer[0], nil
}
return srv, nil
}

// setServersImpl sets a new list of nomad servers to connect to. If force is
// set, we add the server to the internal serverlist even if the server could not
// be pinged. An error is returned if no endpoints were valid when non-forcing.
//
// Force should be used when setting the servers from the initial configuration
// since the server may be starting up in parallel and initial pings may fail.
func (c *Client) setServersImpl(in []string, force bool) error {
func (c *Client) setServersImpl(in []string, force bool) (int, error) {
var mu sync.Mutex
var wg sync.WaitGroup
var merr multierror.Error
Expand All @@ -647,18 +634,12 @@ func (c *Client) setServersImpl(in []string, force bool) error {
wg.Add(len(in))

for _, s := range in {
go func(srv string) {
go func(server string) {
defer wg.Done()

server, err := c.getServerAddr(srv)
if err != nil {
c.logger.Printf("[DEBUG] client: ignoring server %s due to resolution error: %v", srv, err)
merr.Errors = append(merr.Errors, err)
return
}
addr, err := resolveServer(server)
if err != nil {
c.logger.Printf("[DEBUG] client: ignoring server %s due to resolution error: %v", srv, err)
c.logger.Printf("[DEBUG] client: ignoring server %s due to resolution error: %v", server, err)
merr.Errors = append(merr.Errors, err)
return
}
Expand All @@ -685,13 +666,13 @@ func (c *Client) setServersImpl(in []string, force bool) error {
// Only return errors if no servers are valid
if len(endpoints) == 0 {
if len(merr.Errors) > 0 {
return merr.ErrorOrNil()
return 0, merr.ErrorOrNil()
}
return noServersErr
return 0, noServersErr
}

c.servers.SetServers(endpoints)
return nil
return len(endpoints), nil
}

// restoreState is used to restore our state from the data dir
Expand Down
6 changes: 3 additions & 3 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,13 +975,13 @@ func TestClient_ServerList(t *testing.T) {
if s := client.GetServers(); len(s) != 0 {
t.Fatalf("expected server lit to be empty but found: %+q", s)
}
if err := client.SetServers(nil); err != noServersErr {
if _, err := client.SetServers(nil); err != noServersErr {
t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err)
}
if err := client.SetServers([]string{"123.456.13123.123.13:80"}); err == nil {
if _, err := client.SetServers([]string{"123.456.13123.123.13:80"}); err == nil {
t.Fatalf("expected setting a bad server to return an error")
}
if err := client.SetServers([]string{"123.456.13123.123.13:80", "127.0.0.1:1234", "127.0.0.1"}); err == nil {
if _, err := client.SetServers([]string{"123.456.13123.123.13:80", "127.0.0.1:1234", "127.0.0.1"}); err == nil {
t.Fatalf("expected setting at least one good server to succeed but received: %v", err)
}
s := client.GetServers()
Expand Down
2 changes: 1 addition & 1 deletion command/agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *HTTPServer) updateServers(resp http.ResponseWriter, req *http.Request)

// Set the servers list into the client
s.agent.logger.Printf("[TRACE] Adding servers %+q to the client's primary server list", servers)
if err := client.SetServers(servers); err != nil {
if _, err := client.SetServers(servers); err != nil {
s.agent.logger.Printf("[ERR] Attempt to add servers %q to client failed: %v", servers, err)
//TODO is this the right error to return?
return nil, CodedError(400, err.Error())
Expand Down
19 changes: 10 additions & 9 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ func (c *Command) readConfig() *Config {
flags.IntVar(&cmdConfig.Server.BootstrapExpect, "bootstrap-expect", 0, "")
flags.BoolVar(&cmdConfig.Server.RejoinAfterLeave, "rejoin", false, "")
flags.Var((*flaghelper.StringFlag)(&cmdConfig.Server.StartJoin), "join", "")
flags.Var((*flaghelper.StringFlag)(&cmdConfig.Server.RetryJoin), "retry-join", "")
flags.IntVar(&cmdConfig.Server.RetryMaxAttempts, "retry-max", 0, "")
flags.StringVar(&cmdConfig.Server.RetryInterval, "retry-interval", "", "")
flags.StringVar(&cmdConfig.Server.EncryptKey, "encrypt", "", "gossip encryption key")
flags.IntVar(&cmdConfig.Server.RaftProtocol, "raft-protocol", 0, "")

Expand All @@ -103,6 +100,9 @@ func (c *Command) readConfig() *Config {
flags.StringVar(&cmdConfig.Datacenter, "dc", "", "")
flags.StringVar(&cmdConfig.LogLevel, "log-level", "", "")
flags.StringVar(&cmdConfig.NodeName, "node", "", "")
flags.Var((*flaghelper.StringFlag)(&cmdConfig.RetryJoin), "retry-join", "")
flags.IntVar(&cmdConfig.RetryMaxAttempts, "retry-max", 0, "")
flags.StringVar(&cmdConfig.RetryInterval, "retry-interval", "", "")

// Consul options
flags.StringVar(&cmdConfig.Consul.Auth, "consul-auth", "", "")
Expand Down Expand Up @@ -268,12 +268,12 @@ func (c *Command) readConfig() *Config {
}

// Parse the RetryInterval.
dur, err := time.ParseDuration(config.Server.RetryInterval)
dur, err := time.ParseDuration(config.RetryInterval)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing retry interval: %s", err))
return nil
}
config.Server.retryInterval = dur
config.retryInterval = dur

// Check that the server is running in at least one mode.
if !(config.Server.Enabled || config.Client.Enabled) {
Expand Down Expand Up @@ -548,10 +548,11 @@ func (c *Command) Run(args []string) int {
c.retryJoinErrCh = make(chan struct{})

joiner := retryJoiner{
join: c.agent.server.Join,
discover: &discover.Discover{},
errCh: c.retryJoinErrCh,
logger: c.agent.logger,
serverJoin: c.agent.server.Join,
clientJoin: c.agent.client.SetServers,
discover: &discover.Discover{},
errCh: c.retryJoinErrCh,
logger: c.agent.logger,
}
go joiner.RetryJoin(config)

Expand Down
50 changes: 19 additions & 31 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,20 @@ type Config struct {

// Autopilot contains the configuration for Autopilot behavior.
Autopilot *config.AutopilotConfig `mapstructure:"autopilot"`

// RetryMaxAttempts specifies the maximum number of times to retry joining a
// host on startup. This is useful for cases where we know the node will be
// online eventually.
RetryMaxAttempts int `mapstructure:"retry_max"`

// RetryInterval specifies the amount of time to wait in between join
// attempts on agent start. The minimum allowed value is 1 second and
// the default is 30s.
RetryInterval string `mapstructure:"retry_interval"`
retryInterval time.Duration `mapstructure:"-"`

// RetryJoin is a list of addresses to join with retry enabled.
RetryJoin []string `mapstructure:"retry_join"`
}

// ClientConfig is configuration specific to the client mode
Expand Down Expand Up @@ -313,20 +327,6 @@ type ServerConfig struct {
// addresses, then the agent will error and exit.
StartJoin []string `mapstructure:"start_join"`

// RetryJoin is a list of addresses to join with retry enabled.
RetryJoin []string `mapstructure:"retry_join"`

// RetryMaxAttempts specifies the maximum number of times to retry joining a
// host on startup. This is useful for cases where we know the node will be
// online eventually.
RetryMaxAttempts int `mapstructure:"retry_max"`

// RetryInterval specifies the amount of time to wait in between join
// attempts on agent start. The minimum allowed value is 1 second and
// the default is 30s.
RetryInterval string `mapstructure:"retry_interval"`
retryInterval time.Duration `mapstructure:"-"`

// RejoinAfterLeave controls our interaction with the cluster after leave.
// When set to false (default), a leave causes Consul to not rejoin
// the cluster until an explicit join is received. If this is set to
Expand Down Expand Up @@ -603,11 +603,8 @@ func DefaultConfig() *Config {
NoHostUUID: helper.BoolToPtr(true),
},
Server: &ServerConfig{
Enabled: false,
StartJoin: []string{},
RetryJoin: []string{},
RetryInterval: "30s",
RetryMaxAttempts: 0,
Enabled: false,
StartJoin: []string{},
},
ACL: &ACLConfig{
Enabled: false,
Expand All @@ -624,6 +621,9 @@ func DefaultConfig() *Config {
Version: version.GetVersion(),
Autopilot: config.DefaultAutopilotConfig(),
DisableUpdateCheck: helper.BoolToPtr(false),
RetryJoin: []string{},
RetryInterval: "30s",
RetryMaxAttempts: 0,
}
}

Expand Down Expand Up @@ -1033,13 +1033,6 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.MaxHeartbeatsPerSecond != 0.0 {
result.MaxHeartbeatsPerSecond = b.MaxHeartbeatsPerSecond
}
if b.RetryMaxAttempts != 0 {
result.RetryMaxAttempts = b.RetryMaxAttempts
}
if b.RetryInterval != "" {
result.RetryInterval = b.RetryInterval
result.retryInterval = b.retryInterval
}
if b.RejoinAfterLeave {
result.RejoinAfterLeave = true
}
Expand All @@ -1064,11 +1057,6 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
result.StartJoin = append(result.StartJoin, a.StartJoin...)
result.StartJoin = append(result.StartJoin, b.StartJoin...)

// Copy the retry join addresses
result.RetryJoin = make([]string, 0, len(a.RetryJoin)+len(b.RetryJoin))
result.RetryJoin = append(result.RetryJoin, a.RetryJoin...)
result.RetryJoin = append(result.RetryJoin, b.RetryJoin...)

return &result
}

Expand Down
20 changes: 10 additions & 10 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ func TestConfig_Parse(t *testing.T) {
{
"basic.hcl",
&Config{
Region: "foobar",
Datacenter: "dc2",
NodeName: "my-web",
DataDir: "/tmp/nomad",
LogLevel: "ERR",
BindAddr: "192.168.0.1",
EnableDebug: true,
Region: "foobar",
Datacenter: "dc2",
NodeName: "my-web",
DataDir: "/tmp/nomad",
LogLevel: "ERR",
BindAddr: "192.168.0.1",
EnableDebug: true,
RetryInterval: "15s",
RetryMaxAttempts: 3,
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
Ports: &Ports{
HTTP: 1234,
RPC: 2345,
Expand Down Expand Up @@ -97,11 +100,8 @@ func TestConfig_Parse(t *testing.T) {
HeartbeatGrace: 30 * time.Second,
MinHeartbeatTTL: 33 * time.Second,
MaxHeartbeatsPerSecond: 11.0,
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
StartJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: "15s",
RejoinAfterLeave: true,
RetryMaxAttempts: 3,
NonVotingServer: true,
RedundancyZone: "foo",
UpgradeVersion: "0.8.0",
Expand Down
6 changes: 3 additions & 3 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ func TestConfig_Merge(t *testing.T) {
DisableUpdateCheck: helper.BoolToPtr(true),
DisableAnonymousSignature: true,
BindAddr: "127.0.0.2",
RetryJoin: []string{"1.1.1.1"},
RetryInterval: "10s",
retryInterval: time.Second * 10,
Telemetry: &Telemetry{
StatsiteAddr: "127.0.0.2:8125",
StatsdAddr: "127.0.0.2:8125",
Expand Down Expand Up @@ -263,9 +266,6 @@ func TestConfig_Merge(t *testing.T) {
MaxHeartbeatsPerSecond: 200.0,
RejoinAfterLeave: true,
StartJoin: []string{"1.1.1.1"},
RetryJoin: []string{"1.1.1.1"},
RetryInterval: "10s",
retryInterval: time.Second * 10,
NonVotingServer: true,
RedundancyZone: "bar",
UpgradeVersion: "bar",
Expand Down
33 changes: 22 additions & 11 deletions command/agent/retry_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ type DiscoverInterface interface {
// retryJoiner is used to handle retrying a join until it succeeds or all of
// its tries are exhausted.
type retryJoiner struct {
// join adds the specified servers to the serf cluster
join func([]string) (int, error)
// serverJoin adds the specified servers to the serf cluster
serverJoin func([]string) (int, error)

// clientJoin adds the specified servers to the client server list
clientJoin func([]string) (int, error)

// discover is of type Discover, where this is either the go-discover
// implementation or a mock used for testing
Expand All @@ -45,20 +48,20 @@ type retryJoiner struct {
// retryJoin is used to handle retrying a join until it succeeds or all retries
// are exhausted.
func (r *retryJoiner) RetryJoin(config *Config) {
if len(config.Server.RetryJoin) == 0 || !config.Server.Enabled {
if len(config.RetryJoin) == 0 {
return
}

attempt := 0

addrsToJoin := strings.Join(config.Server.RetryJoin, " ")
addrsToJoin := strings.Join(config.RetryJoin, " ")
r.logger.Printf("[INFO] agent: Joining cluster... %s", addrsToJoin)

for {
var addrs []string
var err error

for _, addr := range config.Server.RetryJoin {
for _, addr := range config.RetryJoin {
switch {
case strings.HasPrefix(addr, "provider="):
servers, err := r.discover.Addrs(addr, r.logger)
Expand All @@ -73,23 +76,31 @@ func (r *retryJoiner) RetryJoin(config *Config) {
}

if len(addrs) > 0 {
n, err := r.join(addrs)
if err == nil {
r.logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n)
if config.Server != nil && config.Server.Enabled {
n, err := r.serverJoin(addrs)
if err == nil {
r.logger.Printf("[INFO] agent: Server Join completed. Synced with %d initial agents", n)
}
}
if config.Client != nil && config.Client.Enabled {
n, err := r.clientJoin(addrs)
if err == nil {
r.logger.Printf("[INFO] agent: Client Join completed. Synced with %d initial agents", n)
}
}
}

attempt++
if config.Server.RetryMaxAttempts > 0 && attempt > config.Server.RetryMaxAttempts {
if config.RetryMaxAttempts > 0 && attempt > config.RetryMaxAttempts {
r.logger.Printf("[ERR] agent: max join retry exhausted, exiting")
close(r.errCh)
return
}

if err != nil {
r.logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err,
config.Server.RetryInterval)
config.RetryInterval)
}
time.Sleep(config.Server.retryInterval)
time.Sleep(config.retryInterval)
}
}
Loading

0 comments on commit 47ef99e

Please sign in to comment.