Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't deregister services and checks which are not managed by Nomad #568

Merged
merged 7 commits into from
Dec 11, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) {
conf.AllocDir = os.TempDir()
upd := &MockAllocStateUpdater{}
alloc := mock.Alloc()
consulClient, _ := NewConsulService(logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{})
consulClient, _ := NewConsulService(&consulServiceConfig{logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
if !restarts {
alloc.Job.Type = structs.JobTypeBatch
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
}

// Create a new alloc runner
consulClient, err := NewConsulService(ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{})
consulClient, err := NewConsulService(&consulServiceConfig{ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, consulClient)
err = ar2.RestoreState()
Expand Down
11 changes: 10 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,16 @@ func (c *Client) setupConsulService() error {
auth := c.config.Read("consul.auth")
enableSSL := c.config.ReadBoolDefault("consul.ssl", false)
verifySSL := c.config.ReadBoolDefault("consul.verifyssl", true)
if consulService, err = NewConsulService(c.logger, addr, token, auth, enableSSL, verifySSL, c.config.Node); err != nil {
consulServiceCfg := &consulServiceConfig{
logger: c.logger,
consulAddr: addr,
token: token,
auth: auth,
enableSSL: enableSSL,
verifySSL: verifySSL,
node: c.config.Node,
}
if consulService, err = NewConsulService(consulServiceCfg); err != nil {
return err
}
c.consulService = consulService
Expand Down
73 changes: 57 additions & 16 deletions client/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,36 +79,45 @@ type ConsulService struct {
trackedTskLock sync.Mutex
}

type consulServiceConfig struct {
logger *log.Logger
consulAddr string
token string
auth string
enableSSL bool
verifySSL bool
node *structs.Node
}

// A factory method to create new consul service
func NewConsulService(logger *log.Logger, consulAddr string, token string,
auth string, enableSSL bool, verifySSL bool, node *structs.Node) (*ConsulService, error) {
func NewConsulService(config *consulServiceConfig) (*ConsulService, error) {
var err error
var c *consul.Client
cfg := consul.DefaultConfig()
cfg.Address = consulAddr
if token != "" {
cfg.Token = token
cfg.Address = config.consulAddr
if config.token != "" {
cfg.Token = config.token
}

if auth != "" {
if config.auth != "" {
var username, password string
if strings.Contains(auth, ":") {
split := strings.SplitN(auth, ":", 2)
if strings.Contains(config.auth, ":") {
split := strings.SplitN(config.auth, ":", 2)
username = split[0]
password = split[1]
} else {
username = auth
username = config.auth
}

cfg.HttpAuth = &consul.HttpBasicAuth{
Username: username,
Password: password,
}
}
if enableSSL {
if config.enableSSL {
cfg.Scheme = "https"
}
if enableSSL && !verifySSL {
if config.enableSSL && !config.verifySSL {
cfg.HttpClient.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
Expand All @@ -122,8 +131,8 @@ func NewConsulService(logger *log.Logger, consulAddr string, token string,

consulService := ConsulService{
client: &consulApiClient{client: c},
logger: logger,
node: node,
logger: config.logger,
node: config.node,
trackedTasks: make(map[string]*trackedTask),
serviceStates: make(map[string]string),
shutdownCh: make(chan struct{}),
Expand Down Expand Up @@ -195,15 +204,18 @@ func (c *ConsulService) SyncWithConsul() {
// services which are no longer present in tasks
func (c *ConsulService) performSync() {
// Get the list of the services and that Consul knows about
consulServices, err := c.client.Services()
srvcs, err := c.client.Services()
if err != nil {
return
}
consulChecks, err := c.client.Checks()
chks, err := c.client.Checks()
if err != nil {
return
}
delete(consulServices, "consul")

// Filter the services and checks that isn't managed by consul
consulServices := c.filterConsulServices(srvcs)
consulChecks := c.filterConsulChecks(chks)

knownChecks := make(map[string]struct{})
knownServices := make(map[string]struct{})
Expand Down Expand Up @@ -345,6 +357,35 @@ func (c *ConsulService) makeCheck(service *structs.Service, check *structs.Servi
return cr
}

// filterConsulServices prunes out all the service whose ids are not prefixed
// with nomad-
func (c *ConsulService) filterConsulServices(srvcs map[string]*consul.AgentService) map[string]*consul.AgentService {
nomadServices := make(map[string]*consul.AgentService)
delete(srvcs, "consul")
for _, srv := range srvcs {
if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) {
nomadServices[srv.ID] = srv
}
}
return nomadServices

}

// filterConsulChecks prunes out all the consul checks which do not have
// services with id prefixed with noamd-
func (c *ConsulService) filterConsulChecks(chks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck {
nomadChecks := make(map[string]*consul.AgentCheck)
for _, chk := range chks {
if strings.HasPrefix(chk.ServiceID, structs.NomadConsulPrefix) {
nomadChecks[chk.CheckID] = chk
}
}
return nomadChecks

}

// printLogMessage prints log messages only when the node attributes have consul
// related information
func (c *ConsulService) printLogMessage(message string, v ...interface{}) {
if _, ok := c.node.Attributes["consul.version"]; ok {
c.logger.Printf(message, v)
Expand Down
79 changes: 78 additions & 1 deletion client/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"log"
"os"
"reflect"
"testing"
"time"
)
Expand Down Expand Up @@ -46,7 +47,7 @@ func (a *mockConsulApiClient) Checks() (map[string]*consul.AgentCheck, error) {

func newConsulService() *ConsulService {
logger := log.New(os.Stdout, "logger: ", log.Lshortfile)
c, _ := NewConsulService(logger, "", "", "", false, false, &structs.Node{})
c, _ := NewConsulService(&consulServiceConfig{logger, "", "", "", false, false, &structs.Node{}})
c.client = &mockConsulApiClient{}
return c
}
Expand Down Expand Up @@ -278,3 +279,79 @@ func TestConsul_ModifyCheck(t *testing.T) {
t.Fatalf("Expected number of check registrations: %v, Actual: %v", 2, apiClient.checkRegisterCallCount)
}
}

func TestConsul_FilterNomadServicesAndChecks(t *testing.T) {
c := newConsulService()
srvs := map[string]*consul.AgentService{
"foo-bar": {
ID: "foo-bar",
Service: "http-frontend",
Tags: []string{"global"},
Port: 8080,
Address: "10.10.1.11",
},
"nomad-2121212": {
ID: "nomad-2121212",
Service: "identity-service",
Tags: []string{"global"},
Port: 8080,
Address: "10.10.1.11",
},
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The best practice is to create a new expected map and then use reflect.DeepEquals:

exp := map[string]*consul.AgentService{
   "nomad-2121212": {
            ID:      "nomad-2121212",
            Service: "identity-service",
            Tags:    []string{"global"},
            Port:    8080,
        Address: "10.10.1.11",
        },

   act :=  c.filterConsulServices(serves)
   if !reflect.DeepEquals(act, exp) {
       t.Fatalf(...)
   }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

expSrvcs := map[string]*consul.AgentService{
"nomad-2121212": {
ID: "nomad-2121212",
Service: "identity-service",
Tags: []string{"global"},
Port: 8080,
Address: "10.10.1.11",
},
}

nomadServices := c.filterConsulServices(srvs)
if !reflect.DeepEqual(expSrvcs, nomadServices) {
t.Fatalf("Expected: %v, Actual: %v", expSrvcs, nomadServices)
}

nomadServices = c.filterConsulServices(nil)
if len(nomadServices) != 0 {
t.Fatalf("Expected number of services: %v, Actual: %v", 0, len(nomadServices))
}

chks := map[string]*consul.AgentCheck{
"foo-bar-chk": {
CheckID: "foo-bar-chk",
ServiceID: "foo-bar",
Name: "alive",
},
"212121212": {
CheckID: "212121212",
ServiceID: "nomad-2121212",
Name: "ping",
},
}

expChks := map[string]*consul.AgentCheck{
"212121212": {
CheckID: "212121212",
ServiceID: "nomad-2121212",
Name: "ping",
},
}

nomadChecks := c.filterConsulChecks(chks)
if !reflect.DeepEqual(expChks, nomadChecks) {
t.Fatalf("Expected: %v, Actual: %v", expChks, nomadChecks)
}

if len(nomadChecks) != 1 {
t.Fatalf("Expected number of checks: %v, Actual: %v", 1, len(nomadChecks))
}

nomadChecks = c.filterConsulChecks(nil)
if len(nomadChecks) != 0 {
t.Fatalf("Expected number of checks: %v, Actual: %v", 0, len(nomadChecks))
}

}
4 changes: 2 additions & 2 deletions client/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) {
upd := &MockTaskStateUpdater{}
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
consulClient, _ := NewConsulService(logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{})
consulClient, _ := NewConsulService(&consulServiceConfig{logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
// Initialize the port listing. This should be done by the offer process but
// we have a mock so that doesn't happen.
task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}}
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) {
}

// Create a new task runner
consulClient, _ := NewConsulService(tr.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{})
consulClient, _ := NewConsulService(&consulServiceConfig{tr.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update,
tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker,
consulClient)
Expand Down
9 changes: 8 additions & 1 deletion nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,10 @@ func (sc *ServiceCheck) Hash(serviceId string) string {
return fmt.Sprintf("%x", h.Sum(nil))
}

const (
NomadConsulPrefix = "nomad"
)

// The Service model represents a Consul service defintion
type Service struct {
Id string // Id of the service, this needs to be unique on a local machine
Expand All @@ -1157,7 +1161,10 @@ type Service struct {
// InitFields interpolates values of Job, Task Group and Task in the Service
// Name. This also generates check names, service id and check ids.
func (s *Service) InitFields(job string, taskGroup string, task string) {
s.Id = GenerateUUID()
// We add a prefix to the Service ID so that we can know that this service
// is managed by Consul since Consul can also have service which are not
// managed by Nomad
s.Id = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID())
s.Name = args.ReplaceEnv(s.Name, map[string]string{
"JOB": job,
"TASKGROUP": taskGroup,
Expand Down