diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 8e0c2c33cab..c0bb369a1c1 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -31,7 +31,7 @@ func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) { conf.AllocDir = os.TempDir() upd := &MockAllocStateUpdater{} alloc := mock.Alloc() - consulClient, _ := NewConsulService(logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}) + consulClient, _ := NewConsulService(&consulServiceConfig{logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) if !restarts { alloc.Job.Type = structs.JobTypeBatch *alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0} @@ -142,7 +142,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { } // Create a new alloc runner - consulClient, err := NewConsulService(ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}) + consulClient, err := NewConsulService(&consulServiceConfig{ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, consulClient) err = ar2.RestoreState() diff --git a/client/client.go b/client/client.go index 6f88f714470..b371b5df90d 100644 --- a/client/client.go +++ b/client/client.go @@ -157,7 +157,16 @@ func (c *Client) setupConsulService() error { auth := c.config.Read("consul.auth") enableSSL := c.config.ReadBoolDefault("consul.ssl", false) verifySSL := c.config.ReadBoolDefault("consul.verifyssl", true) - if consulService, err = NewConsulService(c.logger, addr, token, auth, enableSSL, verifySSL, c.config.Node); err != nil { + consulServiceCfg := &consulServiceConfig{ + logger: c.logger, + consulAddr: addr, + token: token, + auth: auth, + enableSSL: enableSSL, + verifySSL: verifySSL, + node: c.config.Node, + } + if consulService, err = NewConsulService(consulServiceCfg); err != nil { return err } c.consulService = consulService diff --git a/client/consul.go b/client/consul.go index 3440064c7b7..9ac123ebc24 100644 --- a/client/consul.go +++ b/client/consul.go @@ -79,25 +79,34 @@ type ConsulService struct { trackedTskLock sync.Mutex } +type consulServiceConfig struct { + logger *log.Logger + consulAddr string + token string + auth string + enableSSL bool + verifySSL bool + node *structs.Node +} + // A factory method to create new consul service -func NewConsulService(logger *log.Logger, consulAddr string, token string, - auth string, enableSSL bool, verifySSL bool, node *structs.Node) (*ConsulService, error) { +func NewConsulService(config *consulServiceConfig) (*ConsulService, error) { var err error var c *consul.Client cfg := consul.DefaultConfig() - cfg.Address = consulAddr - if token != "" { - cfg.Token = token + cfg.Address = config.consulAddr + if config.token != "" { + cfg.Token = config.token } - if auth != "" { + if config.auth != "" { var username, password string - if strings.Contains(auth, ":") { - split := strings.SplitN(auth, ":", 2) + if strings.Contains(config.auth, ":") { + split := strings.SplitN(config.auth, ":", 2) username = split[0] password = split[1] } else { - username = auth + username = config.auth } cfg.HttpAuth = &consul.HttpBasicAuth{ @@ -105,10 +114,10 @@ func NewConsulService(logger *log.Logger, consulAddr string, token string, Password: password, } } - if enableSSL { + if config.enableSSL { cfg.Scheme = "https" } - if enableSSL && !verifySSL { + if config.enableSSL && !config.verifySSL { cfg.HttpClient.Transport = &http.Transport{ TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, @@ -122,8 +131,8 @@ func NewConsulService(logger *log.Logger, consulAddr string, token string, consulService := ConsulService{ client: &consulApiClient{client: c}, - logger: logger, - node: node, + logger: config.logger, + node: config.node, trackedTasks: make(map[string]*trackedTask), serviceStates: make(map[string]string), shutdownCh: make(chan struct{}), @@ -195,15 +204,18 @@ func (c *ConsulService) SyncWithConsul() { // services which are no longer present in tasks func (c *ConsulService) performSync() { // Get the list of the services and that Consul knows about - consulServices, err := c.client.Services() + srvcs, err := c.client.Services() if err != nil { return } - consulChecks, err := c.client.Checks() + chks, err := c.client.Checks() if err != nil { return } - delete(consulServices, "consul") + + // Filter the services and checks that isn't managed by consul + consulServices := c.filterConsulServices(srvcs) + consulChecks := c.filterConsulChecks(chks) knownChecks := make(map[string]struct{}) knownServices := make(map[string]struct{}) @@ -345,6 +357,35 @@ func (c *ConsulService) makeCheck(service *structs.Service, check *structs.Servi return cr } +// filterConsulServices prunes out all the service whose ids are not prefixed +// with nomad- +func (c *ConsulService) filterConsulServices(srvcs map[string]*consul.AgentService) map[string]*consul.AgentService { + nomadServices := make(map[string]*consul.AgentService) + delete(srvcs, "consul") + for _, srv := range srvcs { + if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) { + nomadServices[srv.ID] = srv + } + } + return nomadServices + +} + +// filterConsulChecks prunes out all the consul checks which do not have +// services with id prefixed with noamd- +func (c *ConsulService) filterConsulChecks(chks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck { + nomadChecks := make(map[string]*consul.AgentCheck) + for _, chk := range chks { + if strings.HasPrefix(chk.ServiceID, structs.NomadConsulPrefix) { + nomadChecks[chk.CheckID] = chk + } + } + return nomadChecks + +} + +// printLogMessage prints log messages only when the node attributes have consul +// related information func (c *ConsulService) printLogMessage(message string, v ...interface{}) { if _, ok := c.node.Attributes["consul.version"]; ok { c.logger.Printf(message, v) diff --git a/client/consul_test.go b/client/consul_test.go index 9cb38ede702..e7bb8012ee4 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -5,6 +5,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "log" "os" + "reflect" "testing" "time" ) @@ -46,7 +47,7 @@ func (a *mockConsulApiClient) Checks() (map[string]*consul.AgentCheck, error) { func newConsulService() *ConsulService { logger := log.New(os.Stdout, "logger: ", log.Lshortfile) - c, _ := NewConsulService(logger, "", "", "", false, false, &structs.Node{}) + c, _ := NewConsulService(&consulServiceConfig{logger, "", "", "", false, false, &structs.Node{}}) c.client = &mockConsulApiClient{} return c } @@ -278,3 +279,79 @@ func TestConsul_ModifyCheck(t *testing.T) { t.Fatalf("Expected number of check registrations: %v, Actual: %v", 2, apiClient.checkRegisterCallCount) } } + +func TestConsul_FilterNomadServicesAndChecks(t *testing.T) { + c := newConsulService() + srvs := map[string]*consul.AgentService{ + "foo-bar": { + ID: "foo-bar", + Service: "http-frontend", + Tags: []string{"global"}, + Port: 8080, + Address: "10.10.1.11", + }, + "nomad-2121212": { + ID: "nomad-2121212", + Service: "identity-service", + Tags: []string{"global"}, + Port: 8080, + Address: "10.10.1.11", + }, + } + + expSrvcs := map[string]*consul.AgentService{ + "nomad-2121212": { + ID: "nomad-2121212", + Service: "identity-service", + Tags: []string{"global"}, + Port: 8080, + Address: "10.10.1.11", + }, + } + + nomadServices := c.filterConsulServices(srvs) + if !reflect.DeepEqual(expSrvcs, nomadServices) { + t.Fatalf("Expected: %v, Actual: %v", expSrvcs, nomadServices) + } + + nomadServices = c.filterConsulServices(nil) + if len(nomadServices) != 0 { + t.Fatalf("Expected number of services: %v, Actual: %v", 0, len(nomadServices)) + } + + chks := map[string]*consul.AgentCheck{ + "foo-bar-chk": { + CheckID: "foo-bar-chk", + ServiceID: "foo-bar", + Name: "alive", + }, + "212121212": { + CheckID: "212121212", + ServiceID: "nomad-2121212", + Name: "ping", + }, + } + + expChks := map[string]*consul.AgentCheck{ + "212121212": { + CheckID: "212121212", + ServiceID: "nomad-2121212", + Name: "ping", + }, + } + + nomadChecks := c.filterConsulChecks(chks) + if !reflect.DeepEqual(expChks, nomadChecks) { + t.Fatalf("Expected: %v, Actual: %v", expChks, nomadChecks) + } + + if len(nomadChecks) != 1 { + t.Fatalf("Expected number of checks: %v, Actual: %v", 1, len(nomadChecks)) + } + + nomadChecks = c.filterConsulChecks(nil) + if len(nomadChecks) != 0 { + t.Fatalf("Expected number of checks: %v, Actual: %v", 0, len(nomadChecks)) + } + +} diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 96da69d4e55..dcdc845772f 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -32,7 +32,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { upd := &MockTaskStateUpdater{} alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] - consulClient, _ := NewConsulService(logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}) + consulClient, _ := NewConsulService(&consulServiceConfig{logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) // Initialize the port listing. This should be done by the offer process but // we have a mock so that doesn't happen. task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}} @@ -164,7 +164,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { } // Create a new task runner - consulClient, _ := NewConsulService(tr.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}) + consulClient, _ := NewConsulService(&consulServiceConfig{tr.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update, tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker, consulClient) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c817b6672da..84a171e678f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1145,6 +1145,10 @@ func (sc *ServiceCheck) Hash(serviceId string) string { return fmt.Sprintf("%x", h.Sum(nil)) } +const ( + NomadConsulPrefix = "nomad" +) + // The Service model represents a Consul service defintion type Service struct { Id string // Id of the service, this needs to be unique on a local machine @@ -1157,7 +1161,10 @@ type Service struct { // InitFields interpolates values of Job, Task Group and Task in the Service // Name. This also generates check names, service id and check ids. func (s *Service) InitFields(job string, taskGroup string, task string) { - s.Id = GenerateUUID() + // We add a prefix to the Service ID so that we can know that this service + // is managed by Consul since Consul can also have service which are not + // managed by Nomad + s.Id = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID()) s.Name = args.ReplaceEnv(s.Name, map[string]string{ "JOB": job, "TASKGROUP": taskGroup,