Skip to content

Commit

Permalink
Make ESM Instance ID Configurable
Browse files Browse the repository at this point in the history
Make ESM Instance ID Configurable

Feature request to allow practitioners to set instance ids in order for ESM
instances to terminate and reregister with Consul with the same id. Ungraceful
terminations currently take 30 minutes to be reaped. Without this feature, when
a new instance starts up, it has a new id and leads to multiple ESM instances
rather than 'replacing' the terminated one.

- Add instance_id in configuration file
- Replace existing `id` used for testing with instance_id
- Add a check to ensure instance_id is unique since we are no longer generating
a unique id on behalf of the practitioner
  • Loading branch information
lornasong authored Jun 23, 2020
2 parents ef82992 + 2e60fae commit 7face0b
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 38 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ enable_syslog = false
// The syslog facility to use, if enabled.
syslog_facility = ""
// The unique id for this agent to use when registering itself with Consul.
// If unconfigured, a UUID will be generated for the instance id.
// Note: do not reuse the same instance id value for other agents. This id
// must be unique to disambiguate different instances on the same host.
// Failure to maintain uniqueness will result in an already-exists error.
instance_id = ""
// The service name for this agent to use when registering itself with Consul.
consul_service = "consul-esm"
Expand Down
26 changes: 14 additions & 12 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/hashicorp/consul-esm/version"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-uuid"
)

const LeaderKey = "leader"
Expand Down Expand Up @@ -74,20 +73,10 @@ func NewAgent(config *Config, logger *log.Logger) (*Agent, error) {
return nil, err
}

// Generate a unique ID for this agent so we can disambiguate different
// instances on the same host.
id := config.id
if id == "" {
id, err = uuid.GenerateUUID()
if err != nil {
return nil, err
}
}

agent := Agent{
config: config,
client: client,
id: id,
id: config.InstanceID,
logger: logger,
shutdownCh: make(chan struct{}),
inflightPings: make(map[string]struct{}),
Expand Down Expand Up @@ -162,8 +151,21 @@ func (a *Agent) serviceID() string {
return fmt.Sprintf("%s:%s", a.config.Service, a.id)
}

type alreadyExistsError struct {
serviceID string
}

func (e *alreadyExistsError) Error() string {
return fmt.Sprintf("ESM instance with service id '%s' is already registered with Consul", e.serviceID)
}

// register is used to register this agent with Consul service discovery.
func (a *Agent) register() error {
// agent ids need to be unique to disambiguate different instances on same host
if existing, _, _ := a.client.Agent().Service(a.serviceID(), nil); existing != nil {
return &alreadyExistsError{a.serviceID()}
}

service := &api.AgentServiceRegistration{
ID: a.serviceID(),
Name: a.config.Service,
Expand Down
124 changes: 122 additions & 2 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (

func testAgent(t *testing.T, cb func(*Config)) *Agent {
logger := log.New(LOGOUT, "", log.LstdFlags)
conf := DefaultConfig()
conf, err := DefaultConfig()
if err != nil {
t.Fatal(err)
}
conf.CoordinateUpdateInterval = 200 * time.Millisecond
if cb != nil {
cb(conf)
Expand Down Expand Up @@ -167,10 +170,15 @@ func TestAgent_shouldUpdateNodeStatus(t *testing.T) {
},
}

conf, err := DefaultConfig()
if err != nil {
t.Fatal(err)
}

for _, tc := range cases {

agent := Agent{
config: DefaultConfig(),
config: conf,
knownNodeStatuses: make(map[string]lastKnownStatus),
}

Expand Down Expand Up @@ -242,3 +250,115 @@ func TestAgent_VerifyConsulCompatibility(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
}

func TestAgent_uniqueInstanceID(t *testing.T) {
t.Parallel()

s, err := NewTestServer()
if err != nil {
t.Fatal(err)
}
defer s.Stop()

// Register first ESM instance
agent1 := testAgent(t, func(c *Config) {
c.HTTPAddr = s.HTTPAddr
c.InstanceID = "unique-instance-id-1"
})
defer agent1.Shutdown()

// Make sure the first ESM service is registered
retry.Run(t, func(r *retry.R) {
services, _, err := agent1.client.Catalog().Service(agent1.config.Service, "", nil)
if err != nil {
r.Fatal(err)
}
if len(services) != 1 {
r.Fatalf("1 service should be registered: %v", services)
}
if got, want := services[0].ServiceID, agent1.serviceID(); got != want {
r.Fatalf("got service id %q, want %q", got, want)
}
})

// Register second ESM instance
agent2 := testAgent(t, func(c *Config) {
c.HTTPAddr = s.HTTPAddr
c.InstanceID = "unique-instance-id-2"
})
defer agent2.Shutdown()

// Make sure second ESM service is registered
retry.Run(t, func(r *retry.R) {
services, _, err := agent2.client.Catalog().Service(agent2.config.Service, "", nil)
if err != nil {
r.Fatal(err)
}
if len(services) != 2 {
r.Fatalf("2 service should be registered, got: %v", services)
}
if got, want := services[1].ServiceID, agent2.serviceID(); got != want {
r.Fatalf("got service id %q, want %q", got, want)
}
})
}

func TestAgent_notUniqueInstanceIDFails(t *testing.T) {
t.Parallel()
notUniqueInstanceID := "not-unique-instance-id"

s, err := NewTestServer()
if err != nil {
t.Fatal(err)
}
defer s.Stop()

// Register first ESM instance
agent := testAgent(t, func(c *Config) {
c.HTTPAddr = s.HTTPAddr
c.InstanceID = notUniqueInstanceID
})
defer agent.Shutdown()

// Make sure the ESM service is registered
ensureRegistered := func(r *retry.R) {
services, _, err := agent.client.Catalog().Service(agent.config.Service, "", nil)
if err != nil {
r.Fatal(err)
}
if len(services) != 1 {
r.Fatalf("1 service should be registered: %v", services)
}
if got, want := services[0].ServiceID, agent.serviceID(); got != want {
r.Fatalf("got service id %q, want %q", got, want)
}
}
retry.Run(t, ensureRegistered)

// Create second ESM service with same instance ID
logger := log.New(LOGOUT, "", log.LstdFlags)
conf, err := DefaultConfig()
if err != nil {
t.Fatal(err)
}
conf.InstanceID = notUniqueInstanceID
conf.HTTPAddr = s.HTTPAddr

duplicateAgent, err := NewAgent(conf, logger)
if err != nil {
t.Fatal(err)
}

err = duplicateAgent.Run()
defer duplicateAgent.Shutdown()

if err == nil {
t.Fatal("Failed to error when registering ESM service with same instance ID")
}

switch e := err.(type) {
case *alreadyExistsError:
default:
t.Fatalf("Unexpected error type. Wanted an alreadyExistsError type. Error: '%v'", e)
}
}
43 changes: 30 additions & 13 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/flags"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/hcl"
"github.com/hashicorp/hcl/hcl/ast"
"github.com/mitchellh/mapstructure"
Expand All @@ -29,6 +30,7 @@ type Config struct {
Tag string
KVPath string

InstanceID string
NodeMeta map[string]string
Interval time.Duration
DeregisterAfter time.Duration
Expand All @@ -49,9 +51,6 @@ type Config struct {
PingType string

DisableCoordinateUpdates bool

// Test-only fields.
id string
}

func (c *Config) ClientConfig() *api.Config {
Expand Down Expand Up @@ -85,11 +84,19 @@ func (c *Config) ClientConfig() *api.Config {
return conf
}

func DefaultConfig() *Config {
// DefaultConfig generates esm config with default values
func DefaultConfig() (*Config, error) {
// if no ID is configured, generate a unique ID for this agent
instanceID, err := uuid.GenerateUUID()
if err != nil {
return nil, err
}

return &Config{
LogLevel: "INFO",
Service: "consul-esm",
KVPath: "consul-esm/",
InstanceID: instanceID,
LogLevel: "INFO",
Service: "consul-esm",
KVPath: "consul-esm/",
NodeMeta: map[string]string{
"external-node": "true",
},
Expand All @@ -101,18 +108,20 @@ func DefaultConfig() *Config {
NodeReconnectTimeout: 72 * time.Hour,
PingType: PingTypeUDP,
DisableCoordinateUpdates: false,
}
}, nil
}

// HumanConfig contains configuration that the practitioner can set
type HumanConfig struct {
LogLevel flags.StringValue `mapstructure:"log_level"`
EnableSyslog flags.BoolValue `mapstructure:"enable_syslog"`
SyslogFacility flags.StringValue `mapstructure:"syslog_facility"`

Service flags.StringValue `mapstructure:"consul_service"`
Tag flags.StringValue `mapstructure:"consul_service_tag"`
KVPath flags.StringValue `mapstructure:"consul_kv_path"`
NodeMeta []map[string]string `mapstructure:"external_node_meta"`
InstanceID flags.StringValue `mapstructure:"instance_id"`
Service flags.StringValue `mapstructure:"consul_service"`
Tag flags.StringValue `mapstructure:"consul_service_tag"`
KVPath flags.StringValue `mapstructure:"consul_kv_path"`
NodeMeta []map[string]string `mapstructure:"external_node_meta"`

NodeReconnectTimeout flags.DurationValue `mapstructure:"node_reconnect_timeout"`
NodeProbeInterval flags.DurationValue `mapstructure:"node_probe_interval"`
Expand All @@ -131,6 +140,8 @@ type HumanConfig struct {
DisableCoordinateUpdates flags.BoolValue `mapstructure:"disable_coordinate_updates"`
}

// DecodeConfig takes a reader containing config file and returns
// configuration struct
func DecodeConfig(r io.Reader) (*HumanConfig, error) {
// Parse the file (could be HCL or JSON)
bytes, err := ioutil.ReadAll(r)
Expand Down Expand Up @@ -178,7 +189,10 @@ func DecodeConfig(r io.Reader) (*HumanConfig, error) {
// BuildConfig builds a new Config object from the default configuration
// and the list of config files given and returns it after validation.
func BuildConfig(configFiles []string) (*Config, error) {
config := DefaultConfig()
config, err := DefaultConfig()
if err != nil {
return nil, err
}
if err := MergeConfigPaths(config, configFiles); err != nil {
return nil, fmt.Errorf("Error loading config: %v", err)
}
Expand Down Expand Up @@ -253,8 +267,11 @@ func MergeConfigPaths(dst *Config, paths []string) error {
return nil
}

// MergeConfig merges the default config with any configuration
// set by the practitioner
func MergeConfig(dst *Config, src *HumanConfig) {
src.LogLevel.Merge(&dst.LogLevel)
src.InstanceID.Merge(&dst.InstanceID)
src.Service.Merge(&dst.Service)
src.Tag.Merge(&dst.Tag)
src.KVPath.Merge(&dst.KVPath)
Expand Down
8 changes: 7 additions & 1 deletion config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
func TestDecodeMergeConfig(t *testing.T) {
raw := bytes.NewBufferString(`
log_level = "INFO"
instance_id = "test-instance-id"
consul_service = "service"
consul_service_tag = "asdf"
consul_kv_path = "custom-esm/"
Expand All @@ -34,6 +35,7 @@ ping_type = "socket"

expected := &Config{
LogLevel: "INFO",
InstanceID: "test-instance-id",
Service: "service",
Tag: "asdf",
KVPath: "custom-esm/",
Expand Down Expand Up @@ -90,7 +92,11 @@ func TestValidateConfig(t *testing.T) {
for _, tc := range cases {
buf := bytes.NewBufferString(tc.raw)

result := DefaultConfig()
result, err := DefaultConfig()
if err != nil {
t.Fatal(err)
}

humanConfig, err := DecodeConfig(buf)
if err != nil {
t.Fatal(err)
Expand Down
21 changes: 18 additions & 3 deletions coordinate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ func TestCoordinate_updateNodeCoordinate(t *testing.T) {
t.Fatal(err)
}

conf, err := DefaultConfig()
if err != nil {
t.Fatal(err)
}

agent := &Agent{
client: client,
config: DefaultConfig(),
config: conf,
logger: log.New(LOGOUT, "", log.LstdFlags),
knownNodeStatuses: make(map[string]lastKnownStatus),
}
Expand Down Expand Up @@ -78,9 +83,14 @@ func TestCoordinate_updateNodeCheck(t *testing.T) {
t.Fatal(err)
}

conf, err := DefaultConfig()
if err != nil {
t.Fatal(err)
}

agent := &Agent{
client: client,
config: DefaultConfig(),
config: conf,
logger: log.New(LOGOUT, "", log.LstdFlags),
knownNodeStatuses: make(map[string]lastKnownStatus),
}
Expand Down Expand Up @@ -173,9 +183,14 @@ func TestCoordinate_reapFailedNode(t *testing.T) {
t.Fatal(err)
}

conf, err := DefaultConfig()
if err != nil {
t.Fatal(err)
}

agent := &Agent{
client: client,
config: DefaultConfig(),
config: conf,
logger: log.New(LOGOUT, "", log.LstdFlags),
knownNodeStatuses: make(map[string]lastKnownStatus),
}
Expand Down
Loading

0 comments on commit 7face0b

Please sign in to comment.