diff --git a/README.md b/README.md index 6cf1e67..086bfd4 100644 --- a/README.md +++ b/README.md @@ -159,6 +159,13 @@ enable_syslog = false // The syslog facility to use, if enabled. syslog_facility = "" +// The unique id for this agent to use when registering itself with Consul. +// If unconfigured, a UUID will be generated for the instance id. +// Note: do not reuse the same instance id value for other agents. This id +// must be unique to disambiguate different instances on the same host. +// Failure to maintain uniqueness will result in an already-exists error. +instance_id = "" + // The service name for this agent to use when registering itself with Consul. consul_service = "consul-esm" diff --git a/agent.go b/agent.go index 677cf1c..4145157 100644 --- a/agent.go +++ b/agent.go @@ -11,7 +11,6 @@ import ( "github.com/hashicorp/consul-esm/version" "github.com/hashicorp/consul/api" - "github.com/hashicorp/go-uuid" ) const LeaderKey = "leader" @@ -74,20 +73,10 @@ func NewAgent(config *Config, logger *log.Logger) (*Agent, error) { return nil, err } - // Generate a unique ID for this agent so we can disambiguate different - // instances on the same host. - id := config.id - if id == "" { - id, err = uuid.GenerateUUID() - if err != nil { - return nil, err - } - } - agent := Agent{ config: config, client: client, - id: id, + id: config.InstanceID, logger: logger, shutdownCh: make(chan struct{}), inflightPings: make(map[string]struct{}), @@ -162,8 +151,21 @@ func (a *Agent) serviceID() string { return fmt.Sprintf("%s:%s", a.config.Service, a.id) } +type alreadyExistsError struct { + serviceID string +} + +func (e *alreadyExistsError) Error() string { + return fmt.Sprintf("ESM instance with service id '%s' is already registered with Consul", e.serviceID) +} + // register is used to register this agent with Consul service discovery. func (a *Agent) register() error { + // agent ids need to be unique to disambiguate different instances on same host + if existing, _, _ := a.client.Agent().Service(a.serviceID(), nil); existing != nil { + return &alreadyExistsError{a.serviceID()} + } + service := &api.AgentServiceRegistration{ ID: a.serviceID(), Name: a.config.Service, diff --git a/agent_test.go b/agent_test.go index 5cad09f..c469596 100644 --- a/agent_test.go +++ b/agent_test.go @@ -12,7 +12,10 @@ import ( func testAgent(t *testing.T, cb func(*Config)) *Agent { logger := log.New(LOGOUT, "", log.LstdFlags) - conf := DefaultConfig() + conf, err := DefaultConfig() + if err != nil { + t.Fatal(err) + } conf.CoordinateUpdateInterval = 200 * time.Millisecond if cb != nil { cb(conf) @@ -167,10 +170,15 @@ func TestAgent_shouldUpdateNodeStatus(t *testing.T) { }, } + conf, err := DefaultConfig() + if err != nil { + t.Fatal(err) + } + for _, tc := range cases { agent := Agent{ - config: DefaultConfig(), + config: conf, knownNodeStatuses: make(map[string]lastKnownStatus), } @@ -242,3 +250,115 @@ func TestAgent_VerifyConsulCompatibility(t *testing.T) { t.Fatalf("unexpected error: %s", err) } } + +func TestAgent_uniqueInstanceID(t *testing.T) { + t.Parallel() + + s, err := NewTestServer() + if err != nil { + t.Fatal(err) + } + defer s.Stop() + + // Register first ESM instance + agent1 := testAgent(t, func(c *Config) { + c.HTTPAddr = s.HTTPAddr + c.InstanceID = "unique-instance-id-1" + }) + defer agent1.Shutdown() + + // Make sure the first ESM service is registered + retry.Run(t, func(r *retry.R) { + services, _, err := agent1.client.Catalog().Service(agent1.config.Service, "", nil) + if err != nil { + r.Fatal(err) + } + if len(services) != 1 { + r.Fatalf("1 service should be registered: %v", services) + } + if got, want := services[0].ServiceID, agent1.serviceID(); got != want { + r.Fatalf("got service id %q, want %q", got, want) + } + }) + + // Register second ESM instance + agent2 := testAgent(t, func(c *Config) { + c.HTTPAddr = s.HTTPAddr + c.InstanceID = "unique-instance-id-2" + }) + defer agent2.Shutdown() + + // Make sure second ESM service is registered + retry.Run(t, func(r *retry.R) { + services, _, err := agent2.client.Catalog().Service(agent2.config.Service, "", nil) + if err != nil { + r.Fatal(err) + } + if len(services) != 2 { + r.Fatalf("2 service should be registered, got: %v", services) + } + if got, want := services[1].ServiceID, agent2.serviceID(); got != want { + r.Fatalf("got service id %q, want %q", got, want) + } + }) +} + +func TestAgent_notUniqueInstanceIDFails(t *testing.T) { + t.Parallel() + notUniqueInstanceID := "not-unique-instance-id" + + s, err := NewTestServer() + if err != nil { + t.Fatal(err) + } + defer s.Stop() + + // Register first ESM instance + agent := testAgent(t, func(c *Config) { + c.HTTPAddr = s.HTTPAddr + c.InstanceID = notUniqueInstanceID + }) + defer agent.Shutdown() + + // Make sure the ESM service is registered + ensureRegistered := func(r *retry.R) { + services, _, err := agent.client.Catalog().Service(agent.config.Service, "", nil) + if err != nil { + r.Fatal(err) + } + if len(services) != 1 { + r.Fatalf("1 service should be registered: %v", services) + } + if got, want := services[0].ServiceID, agent.serviceID(); got != want { + r.Fatalf("got service id %q, want %q", got, want) + } + } + retry.Run(t, ensureRegistered) + + // Create second ESM service with same instance ID + logger := log.New(LOGOUT, "", log.LstdFlags) + conf, err := DefaultConfig() + if err != nil { + t.Fatal(err) + } + conf.InstanceID = notUniqueInstanceID + conf.HTTPAddr = s.HTTPAddr + + duplicateAgent, err := NewAgent(conf, logger) + if err != nil { + t.Fatal(err) + } + + err = duplicateAgent.Run() + defer duplicateAgent.Shutdown() + + if err == nil { + t.Fatal("Failed to error when registering ESM service with same instance ID") + } + + switch e := err.(type) { + case *alreadyExistsError: + default: + t.Fatalf("Unexpected error type. Wanted an alreadyExistsError type. Error: '%v'", e) + } +} diff --git a/config.go b/config.go index 32bbbe6..13a5e00 100644 --- a/config.go +++ b/config.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/command/flags" + "github.com/hashicorp/go-uuid" "github.com/hashicorp/hcl" "github.com/hashicorp/hcl/hcl/ast" "github.com/mitchellh/mapstructure" @@ -29,6 +30,7 @@ type Config struct { Tag string KVPath string + InstanceID string NodeMeta map[string]string Interval time.Duration DeregisterAfter time.Duration @@ -49,9 +51,6 @@ type Config struct { PingType string DisableCoordinateUpdates bool - - // Test-only fields. - id string } func (c *Config) ClientConfig() *api.Config { @@ -85,11 +84,19 @@ func (c *Config) ClientConfig() *api.Config { return conf } -func DefaultConfig() *Config { +// DefaultConfig generates esm config with default values +func DefaultConfig() (*Config, error) { + // if no ID is configured, generate a unique ID for this agent + instanceID, err := uuid.GenerateUUID() + if err != nil { + return nil, err + } + return &Config{ - LogLevel: "INFO", - Service: "consul-esm", - KVPath: "consul-esm/", + InstanceID: instanceID, + LogLevel: "INFO", + Service: "consul-esm", + KVPath: "consul-esm/", NodeMeta: map[string]string{ "external-node": "true", }, @@ -101,18 +108,20 @@ func DefaultConfig() *Config { NodeReconnectTimeout: 72 * time.Hour, PingType: PingTypeUDP, DisableCoordinateUpdates: false, - } + }, nil } +// HumanConfig contains configuration that the practitioner can set type HumanConfig struct { LogLevel flags.StringValue `mapstructure:"log_level"` EnableSyslog flags.BoolValue `mapstructure:"enable_syslog"` SyslogFacility flags.StringValue `mapstructure:"syslog_facility"` - Service flags.StringValue `mapstructure:"consul_service"` - Tag flags.StringValue `mapstructure:"consul_service_tag"` - KVPath flags.StringValue `mapstructure:"consul_kv_path"` - NodeMeta []map[string]string `mapstructure:"external_node_meta"` + InstanceID flags.StringValue `mapstructure:"instance_id"` + Service flags.StringValue `mapstructure:"consul_service"` + Tag flags.StringValue `mapstructure:"consul_service_tag"` + KVPath flags.StringValue `mapstructure:"consul_kv_path"` + NodeMeta []map[string]string `mapstructure:"external_node_meta"` NodeReconnectTimeout flags.DurationValue `mapstructure:"node_reconnect_timeout"` NodeProbeInterval flags.DurationValue `mapstructure:"node_probe_interval"` @@ -131,6 +140,8 @@ type HumanConfig struct { DisableCoordinateUpdates flags.BoolValue `mapstructure:"disable_coordinate_updates"` } +// DecodeConfig takes a reader containing config file and returns +// configuration struct func DecodeConfig(r io.Reader) (*HumanConfig, error) { // Parse the file (could be HCL or JSON) bytes, err := ioutil.ReadAll(r) @@ -178,7 +189,10 @@ func DecodeConfig(r io.Reader) (*HumanConfig, error) { // BuildConfig builds a new Config object from the default configuration // and the list of config files given and returns it after validation. func BuildConfig(configFiles []string) (*Config, error) { - config := DefaultConfig() + config, err := DefaultConfig() + if err != nil { + return nil, err + } if err := MergeConfigPaths(config, configFiles); err != nil { return nil, fmt.Errorf("Error loading config: %v", err) } @@ -253,8 +267,11 @@ func MergeConfigPaths(dst *Config, paths []string) error { return nil } +// MergeConfig merges the default config with any configuration +// set by the practitioner func MergeConfig(dst *Config, src *HumanConfig) { src.LogLevel.Merge(&dst.LogLevel) + src.InstanceID.Merge(&dst.InstanceID) src.Service.Merge(&dst.Service) src.Tag.Merge(&dst.Tag) src.KVPath.Merge(&dst.KVPath) diff --git a/config_test.go b/config_test.go index eed8a9a..53c4bb9 100644 --- a/config_test.go +++ b/config_test.go @@ -11,6 +11,7 @@ import ( func TestDecodeMergeConfig(t *testing.T) { raw := bytes.NewBufferString(` log_level = "INFO" +instance_id = "test-instance-id" consul_service = "service" consul_service_tag = "asdf" consul_kv_path = "custom-esm/" @@ -34,6 +35,7 @@ ping_type = "socket" expected := &Config{ LogLevel: "INFO", + InstanceID: "test-instance-id", Service: "service", Tag: "asdf", KVPath: "custom-esm/", @@ -90,7 +92,11 @@ func TestValidateConfig(t *testing.T) { for _, tc := range cases { buf := bytes.NewBufferString(tc.raw) - result := DefaultConfig() + result, err := DefaultConfig() + if err != nil { + t.Fatal(err) + } + humanConfig, err := DecodeConfig(buf) if err != nil { t.Fatal(err) diff --git a/coordinate_test.go b/coordinate_test.go index 7f11027..e3e6e0a 100644 --- a/coordinate_test.go +++ b/coordinate_test.go @@ -34,9 +34,14 @@ func TestCoordinate_updateNodeCoordinate(t *testing.T) { t.Fatal(err) } + conf, err := DefaultConfig() + if err != nil { + t.Fatal(err) + } + agent := &Agent{ client: client, - config: DefaultConfig(), + config: conf, logger: log.New(LOGOUT, "", log.LstdFlags), knownNodeStatuses: make(map[string]lastKnownStatus), } @@ -78,9 +83,14 @@ func TestCoordinate_updateNodeCheck(t *testing.T) { t.Fatal(err) } + conf, err := DefaultConfig() + if err != nil { + t.Fatal(err) + } + agent := &Agent{ client: client, - config: DefaultConfig(), + config: conf, logger: log.New(LOGOUT, "", log.LstdFlags), knownNodeStatuses: make(map[string]lastKnownStatus), } @@ -173,9 +183,14 @@ func TestCoordinate_reapFailedNode(t *testing.T) { t.Fatal(err) } + conf, err := DefaultConfig() + if err != nil { + t.Fatal(err) + } + agent := &Agent{ client: client, - config: DefaultConfig(), + config: conf, logger: log.New(LOGOUT, "", log.LstdFlags), knownNodeStatuses: make(map[string]lastKnownStatus), } diff --git a/leader_test.go b/leader_test.go index 7383f6a..bebd018 100644 --- a/leader_test.go +++ b/leader_test.go @@ -109,7 +109,7 @@ func TestLeader_rebalanceHealthWatches(t *testing.T) { // Register one ESM agent to start. agent1 := testAgent(t, func(c *Config) { c.HTTPAddr = s.HTTPAddr - c.id = "agent1" + c.InstanceID = "agent1" }) defer agent1.Shutdown() @@ -119,7 +119,7 @@ func TestLeader_rebalanceHealthWatches(t *testing.T) { // Add a 2nd ESM agent. agent2 := testAgent(t, func(c *Config) { c.HTTPAddr = s.HTTPAddr - c.id = "agent2" + c.InstanceID = "agent2" }) defer agent2.Shutdown() @@ -130,7 +130,7 @@ func TestLeader_rebalanceHealthWatches(t *testing.T) { // Add a 3rd ESM agent. agent3 := testAgent(t, func(c *Config) { c.HTTPAddr = s.HTTPAddr - c.id = "agent3" + c.InstanceID = "agent3" }) defer agent3.Shutdown() @@ -200,13 +200,13 @@ func TestLeader_divideCoordinates(t *testing.T) { // Register two ESM agents. agent1 := testAgent(t, func(c *Config) { c.HTTPAddr = s.HTTPAddr - c.id = "agent1" + c.InstanceID = "agent1" }) defer agent1.Shutdown() agent2 := testAgent(t, func(c *Config) { c.HTTPAddr = s.HTTPAddr - c.id = "agent2" + c.InstanceID = "agent2" }) defer agent2.Shutdown() @@ -316,14 +316,14 @@ func TestLeader_divideHealthChecks(t *testing.T) { // Register two ESM agents. agent1 := testAgent(t, func(c *Config) { c.HTTPAddr = s.HTTPAddr - c.id = "agent1" + c.InstanceID = "agent1" c.CoordinateUpdateInterval = time.Second }) defer agent1.Shutdown() agent2 := testAgent(t, func(c *Config) { c.HTTPAddr = s.HTTPAddr - c.id = "agent2" + c.InstanceID = "agent2" c.CoordinateUpdateInterval = time.Second }) defer agent2.Shutdown()