Skip to content

Commit

Permalink
Merge pull request #4277 from hashicorp/f-retry-join-clients
Browse files Browse the repository at this point in the history
Add go-discover support to Nomad clients
  • Loading branch information
dadgar authored Jun 1, 2018
2 parents 8c88146 + 84b4e2c commit d834439
Show file tree
Hide file tree
Showing 18 changed files with 1,109 additions and 257 deletions.
12 changes: 6 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
// Set the preconfigured list of static servers
c.configLock.RLock()
if len(c.configCopy.Servers) > 0 {
if err := c.setServersImpl(c.configCopy.Servers, true); err != nil {
if _, err := c.setServersImpl(c.configCopy.Servers, true); err != nil {
logger.Printf("[WARN] client: None of the configured servers are valid: %v", err)
}
}
Expand Down Expand Up @@ -623,7 +623,7 @@ func (c *Client) GetServers() []string {

// SetServers sets a new list of nomad servers to connect to. As long as one
// server is resolvable no error is returned.
func (c *Client) SetServers(in []string) error {
func (c *Client) SetServers(in []string) (int, error) {
return c.setServersImpl(in, false)
}

Expand All @@ -633,7 +633,7 @@ func (c *Client) SetServers(in []string) error {
//
// Force should be used when setting the servers from the initial configuration
// since the server may be starting up in parallel and initial pings may fail.
func (c *Client) setServersImpl(in []string, force bool) error {
func (c *Client) setServersImpl(in []string, force bool) (int, error) {
var mu sync.Mutex
var wg sync.WaitGroup
var merr multierror.Error
Expand Down Expand Up @@ -673,13 +673,13 @@ func (c *Client) setServersImpl(in []string, force bool) error {
// Only return errors if no servers are valid
if len(endpoints) == 0 {
if len(merr.Errors) > 0 {
return merr.ErrorOrNil()
return 0, merr.ErrorOrNil()
}
return noServersErr
return 0, noServersErr
}

c.servers.SetServers(endpoints)
return nil
return len(endpoints), nil
}

// restoreState is used to restore our state from the data dir
Expand Down
6 changes: 3 additions & 3 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,13 +975,13 @@ func TestClient_ServerList(t *testing.T) {
if s := client.GetServers(); len(s) != 0 {
t.Fatalf("expected server lit to be empty but found: %+q", s)
}
if err := client.SetServers(nil); err != noServersErr {
if _, err := client.SetServers(nil); err != noServersErr {
t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err)
}
if err := client.SetServers([]string{"123.456.13123.123.13:80"}); err == nil {
if _, err := client.SetServers([]string{"123.456.13123.123.13:80"}); err == nil {
t.Fatalf("expected setting a bad server to return an error")
}
if err := client.SetServers([]string{"123.456.13123.123.13:80", "127.0.0.1:1234", "127.0.0.1"}); err == nil {
if _, err := client.SetServers([]string{"123.456.13123.123.13:80", "127.0.0.1:1234", "127.0.0.1"}); err == nil {
t.Fatalf("expected setting at least one good server to succeed but received: %v", err)
}
s := client.GetServers()
Expand Down
2 changes: 1 addition & 1 deletion command/agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *HTTPServer) updateServers(resp http.ResponseWriter, req *http.Request)

// Set the servers list into the client
s.agent.logger.Printf("[TRACE] Adding servers %+q to the client's primary server list", servers)
if err := client.SetServers(servers); err != nil {
if _, err := client.SetServers(servers); err != nil {
s.agent.logger.Printf("[ERR] Attempt to add servers %q to client failed: %v", servers, err)
//TODO is this the right error to return?
return nil, CodedError(400, err.Error())
Expand Down
140 changes: 114 additions & 26 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ func (c *Command) readConfig() *Config {
Client: &ClientConfig{},
Consul: &config.ConsulConfig{},
Ports: &Ports{},
Server: &ServerConfig{},
Vault: &config.VaultConfig{},
ACL: &ACLConfig{},
Server: &ServerConfig{
ServerJoin: &ServerJoin{},
},
Vault: &config.VaultConfig{},
ACL: &ACLConfig{},
}

flags := flag.NewFlagSet("agent", flag.ContinueOnError)
Expand All @@ -78,13 +80,16 @@ func (c *Command) readConfig() *Config {

// Server-only options
flags.IntVar(&cmdConfig.Server.BootstrapExpect, "bootstrap-expect", 0, "")
flags.BoolVar(&cmdConfig.Server.RejoinAfterLeave, "rejoin", false, "")
flags.Var((*flaghelper.StringFlag)(&cmdConfig.Server.StartJoin), "join", "")
flags.Var((*flaghelper.StringFlag)(&cmdConfig.Server.RetryJoin), "retry-join", "")
flags.IntVar(&cmdConfig.Server.RetryMaxAttempts, "retry-max", 0, "")
flags.StringVar(&cmdConfig.Server.RetryInterval, "retry-interval", "", "")
flags.StringVar(&cmdConfig.Server.EncryptKey, "encrypt", "", "gossip encryption key")
flags.IntVar(&cmdConfig.Server.RaftProtocol, "raft-protocol", 0, "")
flags.BoolVar(&cmdConfig.Server.RejoinAfterLeave, "rejoin", false, "")
flags.Var((*flaghelper.StringFlag)(&cmdConfig.Server.ServerJoin.StartJoin), "join", "")
flags.Var((*flaghelper.StringFlag)(&cmdConfig.Server.ServerJoin.RetryJoin), "retry-join", "")
flags.IntVar(&cmdConfig.Server.ServerJoin.RetryMaxAttempts, "retry-max", 0, "")
flags.Var((flaghelper.FuncDurationVar)(func(d time.Duration) error {
cmdConfig.Server.ServerJoin.RetryInterval = d
return nil
}), "retry-interval", "")

// Client-only options
flags.StringVar(&cmdConfig.Client.StateDir, "state-dir", "", "")
Expand Down Expand Up @@ -267,14 +272,6 @@ func (c *Command) readConfig() *Config {
}
}

// Parse the RetryInterval.
dur, err := time.ParseDuration(config.Server.RetryInterval)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing retry interval: %s", err))
return nil
}
config.Server.retryInterval = dur

// Check that the server is running in at least one mode.
if !(config.Server.Enabled || config.Client.Enabled) {
c.Ui.Error("Must specify either server, client or dev mode for the agent.")
Expand Down Expand Up @@ -547,20 +544,89 @@ func (c *Command) Run(args []string) int {
logGate.Flush()

// Start retry join process
c.retryJoinErrCh = make(chan struct{})

joiner := retryJoiner{
join: c.agent.server.Join,
discover: &discover.Discover{},
errCh: c.retryJoinErrCh,
logger: c.agent.logger,
if err := c.handleRetryJoin(config); err != nil {
c.Ui.Error(err.Error())
return 1
}
go joiner.RetryJoin(config)

// Wait for exit
return c.handleSignals()
}

// handleRetryJoin is used to start retry joining if it is configured.
func (c *Command) handleRetryJoin(config *Config) error {
c.retryJoinErrCh = make(chan struct{})

if config.Server.Enabled && len(config.Server.RetryJoin) != 0 {
joiner := retryJoiner{
discover: &discover.Discover{},
errCh: c.retryJoinErrCh,
logger: c.agent.logger,
serverJoin: c.agent.server.Join,
serverEnabled: true,
}

if err := joiner.Validate(config); err != nil {
return err
}

// Remove the duplicate fields
if len(config.Server.RetryJoin) != 0 {
config.Server.ServerJoin.RetryJoin = config.Server.RetryJoin
config.Server.RetryJoin = nil
}
if config.Server.RetryMaxAttempts != 0 {
config.Server.ServerJoin.RetryMaxAttempts = config.Server.RetryMaxAttempts
config.Server.RetryMaxAttempts = 0
}
if config.Server.RetryInterval != 0 {
config.Server.ServerJoin.RetryInterval = config.Server.RetryInterval
config.Server.RetryInterval = 0
}

c.agent.logger.Printf("[WARN] agent: Using deprecated retry_join fields. Upgrade configuration to use server_join")
}

if config.Server.Enabled &&
config.Server.ServerJoin != nil &&
len(config.Server.ServerJoin.RetryJoin) != 0 {

joiner := retryJoiner{
discover: &discover.Discover{},
errCh: c.retryJoinErrCh,
logger: c.agent.logger,
serverJoin: c.agent.server.Join,
serverEnabled: true,
}

if err := joiner.Validate(config); err != nil {
return err
}

go joiner.RetryJoin(config.Server.ServerJoin)
}

if config.Client.Enabled &&
config.Client.ServerJoin != nil &&
len(config.Client.ServerJoin.RetryJoin) != 0 {
joiner := retryJoiner{
discover: &discover.Discover{},
errCh: c.retryJoinErrCh,
logger: c.agent.logger,
clientJoin: c.agent.client.SetServers,
clientEnabled: true,
}

if err := joiner.Validate(config); err != nil {
return err
}

go joiner.RetryJoin(config.Client.ServerJoin)
}

return nil
}

// handleSignals blocks until we get an exit-causing signal
func (c *Command) handleSignals() int {
signalCh := make(chan os.Signal, 4)
Expand Down Expand Up @@ -831,12 +897,34 @@ func (c *Command) setupTelemetry(config *Config) (*metrics.InmemSink, error) {
}

func (c *Command) startupJoin(config *Config) error {
if len(config.Server.StartJoin) == 0 || !config.Server.Enabled {
// Nothing to do
if !config.Server.Enabled {
return nil
}

// Validate both old and new aren't being set
old := len(config.Server.StartJoin)
var new int
if config.Server.ServerJoin != nil {
new = len(config.Server.ServerJoin.StartJoin)
}
if old != 0 && new != 0 {
return fmt.Errorf("server_join and start_join cannot both be defined; prefer setting the server_join stanza")
}

// Nothing to do
if old+new == 0 {
return nil
}

// Combine the lists and join
joining := config.Server.StartJoin
if new != 0 {
joining = append(joining, config.Server.ServerJoin.StartJoin...)
}

c.Ui.Output("Joining cluster...")
n, err := c.agent.server.Join(config.Server.StartJoin)
n, err := c.agent.server.Join(joining)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit d834439

Please sign in to comment.