Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster join command line options and configuration options #527

Merged
merged 9 commits into from
Dec 9, 2015
94 changes: 92 additions & 2 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func (c *Command) readConfig() *Config {

// Server-only options
flags.IntVar(&cmdConfig.Server.BootstrapExpect, "bootstrap-expect", 0, "")
flags.BoolVar(&cmdConfig.Server.RejoinAfterLeave, "rejoin", false, "")
flags.Var((*sliceflag.StringFlag)(&cmdConfig.Server.StartJoin), "join", "")
flags.Var((*sliceflag.StringFlag)(&cmdConfig.Server.RetryJoin), "retry-join", "")
flags.IntVar(&cmdConfig.Server.RetryMaxAttempts, "retry-max", 0, "")
flags.StringVar(&cmdConfig.Server.RetryInterval, "retry-interval", "", "")

// Client-only options
flags.StringVar(&cmdConfig.Client.StateDir, "state-dir", "", "")
Expand Down Expand Up @@ -100,6 +105,15 @@ func (c *Command) readConfig() *Config {
return nil
}

if cmdConfig.Server.RetryInterval != "" {
dur, err := time.ParseDuration(cmdConfig.Server.RetryInterval)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error: %s", err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some extra information in the error message to indicate which field Nomad was not about to parse? I think that would help the user to quickly debug what's wrong in the configuration file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the error message to Error parsing retry interval

return nil
}
cmdConfig.Server.retryInterval = dur
}

// Split the servers.
if servers != "" {
cmdConfig.Client.Servers = strings.Split(servers, ",")
Expand Down Expand Up @@ -358,6 +372,12 @@ func (c *Command) Run(args []string) int {
}
}()

// Join startup nodes if specified
if err := c.startupJoin(config); err != nil {
c.Ui.Error(err.Error())
return 1
}

// Compile agent information for output later
info := make(map[string]string)
info["client"] = strconv.FormatBool(config.Client.Enabled)
Expand Down Expand Up @@ -396,12 +416,16 @@ func (c *Command) Run(args []string) int {
// Enable log streaming
logGate.Flush()

// Start retry join process
errCh := make(chan struct{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the retryJoinErr channel to the Command struct so we don't have to pass the channel into the functions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to the Command struct

go c.retryJoin(config, errCh)

// Wait for exit
return c.handleSignals(config)
return c.handleSignals(config, errCh)
}

// handleSignals blocks until we get an exit-causing signal
func (c *Command) handleSignals(config *Config) int {
func (c *Command) handleSignals(config *Config, retryJoin <-chan struct{}) int {
signalCh := make(chan os.Signal, 4)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)

Expand All @@ -413,6 +437,8 @@ WAIT:
sig = s
case <-c.ShutdownCh:
sig = os.Interrupt
case <-retryJoin:
return 1
}
c.Ui.Output(fmt.Sprintf("Caught signal: %v", sig))

Expand Down Expand Up @@ -559,6 +585,52 @@ func (c *Command) setupSCADA(config *Config) error {
return nil
}

func (c *Command) startupJoin(config *Config) error {
if len(config.Server.StartJoin) == 0 || !config.Server.Enabled {
return nil
}

c.Ui.Output("Joining cluster...")
n, err := c.agent.server.Join(config.Server.StartJoin)
if err != nil {
return err
}

c.Ui.Info(fmt.Sprintf("Join completed. Synced with %d initial agents", n))
return nil
}

// retryJoin is used to handle retrying a join until it succeeds or all retries
// are exhausted.
func (c *Command) retryJoin(config *Config, errCh chan<- struct{}) {
if len(config.Server.RetryJoin) == 0 || !config.Server.Enabled {
return
}

logger := c.agent.logger
logger.Printf("[INFO] agent: Joining cluster...")

attempt := 0
for {
n, err := c.agent.server.Join(config.Server.RetryJoin)
if err == nil {
logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n)
return
}

attempt++
if config.Server.RetryMaxAttempts > 0 && attempt > config.Server.RetryMaxAttempts {
logger.Printf("[ERROR] agent: max join retry exhausted, exiting")
close(errCh)
return
}

logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err,
config.Server.RetryInterval)
time.Sleep(config.Server.retryInterval)
}
}

func (c *Command) Synopsis() string {
return "Runs a Nomad agent"
}
Expand Down Expand Up @@ -632,6 +704,24 @@ Server Options:
bootstrapping the cluster. Once <num> servers have joined eachother,
Nomad initiates the bootstrap process.

-join=<address>
Address of an agent to join at start time. Can be specified
multiple times.

-retry-join=<address>
Address of an agent to join at start time with retries enabled.
Can be specified multiple times.

-retry-max=<num>
Maximum number of join attempts. Defaults to 0, which will retry
indefinitely.

-retry-interval=<dur>
Time to wait between join attempts.

-rejoin
Ignore a previous leave and attempts to rejoin the cluster.

Client Options:

-client
Expand Down
58 changes: 58 additions & 0 deletions command/agent/command_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package agent

import (
"fmt"
"io/ioutil"
"log"
"os"
"strings"
"testing"

"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/cli"
)

Expand Down Expand Up @@ -69,3 +72,58 @@ func TestCommand_Args(t *testing.T) {
}
}
}

func TestRetryJoin(t *testing.T) {
dir, agent := makeAgent(t, nil)
defer os.RemoveAll(dir)
defer agent.Shutdown()

tmpDir, err := ioutil.TempDir("", "nomad")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(tmpDir)

doneCh := make(chan struct{})
shutdownCh := make(chan struct{})

defer func() {
close(shutdownCh)
<-doneCh
}()

cmd := &Command{
ShutdownCh: shutdownCh,
Ui: new(cli.MockUi),
}

serfAddr := fmt.Sprintf(
"%s:%d",
agent.config.BindAddr,
agent.config.Ports.Serf)

args := []string{
"-server",
"-data-dir", tmpDir,
"-node", fmt.Sprintf(`"Node %d"`, getPort()),
"-retry-join", serfAddr,
"-retry-interval", "1s",
}

go func() {
if code := cmd.Run(args); code != 0 {
log.Printf("bad: %d", code)
}
close(doneCh)
}()

testutil.WaitForResult(func() (bool, error) {
mem := agent.server.Members()
if len(mem) != 2 {
return false, fmt.Errorf("bad :%#v", mem)
}
return true, nil
}, func(err error) {
t.Fatalf(err.Error())
})
}
79 changes: 51 additions & 28 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"runtime"
"strings"
"time"

"github.com/hashicorp/hcl"
client "github.com/hashicorp/nomad/client/config"
Expand Down Expand Up @@ -181,6 +182,31 @@ type ServerConfig struct {

// NodeGCThreshold contros how "old" a node must be to be collected by GC.
NodeGCThreshold string `hcl:"node_gc_threshold"`

// StartJoin is a list of addresses to attempt to join when the
// agent starts. If Serf is unable to communicate with any of these
// addresses, then the agent will error and exit.
StartJoin []string `hcl:"start_join"`

// RetryJoin is a list of addresses to join with retry enabled.
RetryJoin []string `hcl:"retry_join"`

// RetryMaxAttempts specifies the maximum number of times to retry joining a
// host on startup. This is useful for cases where we know the node will be
// online eventually.
RetryMaxAttempts int `hcl:"retry_max"`

// RetryInterval specifies the amount of time to wait in between join
// attempts on agent start. The minimum allowed value is 1 second and
// the default is 30s.
RetryInterval string `hcl:"retry_interval"`
retryInterval time.Duration `hcl:"-"`

// RejoinAfterLeave controls our interaction with the cluster after leave.
// When set to false (default), a leave causes Consul to not rejoin
// the cluster until an explicit join is received. If this is set to
// true, we ignore the leave, and rejoin the cluster on start.
RejoinAfterLeave bool `hcl:"rejoin_after_leave"`
}

// Telemetry is the telemetry configuration for the server
Expand Down Expand Up @@ -252,7 +278,11 @@ func DefaultConfig() *Config {
NetworkSpeed: 100,
},
Server: &ServerConfig{
Enabled: false,
Enabled: false,
StartJoin: []string{},
RetryJoin: []string{},
RetryInterval: "30s",
RetryMaxAttempts: 0,
},
}
}
Expand Down Expand Up @@ -358,14 +388,6 @@ func (a *Config) Merge(b *Config) *Config {
result.AdvertiseAddrs = result.AdvertiseAddrs.Merge(b.AdvertiseAddrs)
}

// Apply the Atlas configuration
if result.Atlas == nil && b.Atlas != nil {
atlasConfig := *b.Atlas
result.Atlas = &atlasConfig
} else if b.Atlas != nil {
result.Atlas = result.Atlas.Merge(b.Atlas)
}

return &result
}

Expand All @@ -391,10 +413,30 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.NodeGCThreshold != "" {
result.NodeGCThreshold = b.NodeGCThreshold
}
if b.RetryMaxAttempts != 0 {
result.RetryMaxAttempts = b.RetryMaxAttempts
}
if b.RetryInterval != "" {
result.RetryInterval = b.RetryInterval
result.retryInterval = b.retryInterval
}
if b.RejoinAfterLeave {
result.RejoinAfterLeave = true
}

// Add the schedulers
result.EnabledSchedulers = append(result.EnabledSchedulers, b.EnabledSchedulers...)

// Copy the start join addresses
result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin))
result.StartJoin = append(result.StartJoin, a.StartJoin...)
result.StartJoin = append(result.StartJoin, b.StartJoin...)

// Copy the retry join addresses
result.RetryJoin = make([]string, 0, len(a.RetryJoin)+len(b.RetryJoin))
result.RetryJoin = append(result.RetryJoin, a.RetryJoin...)
result.RetryJoin = append(result.RetryJoin, b.RetryJoin...)

return &result
}

Expand Down Expand Up @@ -507,25 +549,6 @@ func (a *AdvertiseAddrs) Merge(b *AdvertiseAddrs) *AdvertiseAddrs {
return &result
}

// Merge merges two Atlas configurations together.
func (a *AtlasConfig) Merge(b *AtlasConfig) *AtlasConfig {
var result AtlasConfig = *a

if b.Infrastructure != "" {
result.Infrastructure = b.Infrastructure
}
if b.Token != "" {
result.Token = b.Token
}
if b.Join {
result.Join = true
}
if b.Endpoint != "" {
result.Endpoint = b.Endpoint
}
return &result
}

// LoadConfig loads the configuration at the given path, regardless if
// its a file or directory.
func LoadConfig(path string) (*Config, error) {
Expand Down
Loading