From d4d7572604c2afcddd675d4f3410591e55db7e18 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 14 Dec 2015 14:53:49 -0800 Subject: [PATCH 01/11] Making the allocs hold service ids --- api/allocations.go | 1 + client/alloc_runner.go | 4 +-- client/consul.go | 56 +++++++++++++++++++++----------------- client/task_runner.go | 32 +++++++++++----------- nomad/structs/structs.go | 23 +++++++++++----- scheduler/generic_sched.go | 4 +++ scheduler/system_sched.go | 4 +++ 7 files changed, 74 insertions(+), 50 deletions(-) diff --git a/api/allocations.go b/api/allocations.go index 00ab6984dde..73f600d7e8a 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -47,6 +47,7 @@ type Allocation struct { TaskGroup string Resources *Resources TaskResources map[string]*Resources + Services map[string]string Metrics *AllocationMetric DesiredStatus string DesiredDescription string diff --git a/client/alloc_runner.go b/client/alloc_runner.go index d48aaf166fd..14b93dd0da0 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -112,7 +112,7 @@ func (r *AllocRunner) RestoreState() error { task := &structs.Task{Name: name} restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy) tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, - r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker, + r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker, r.consulService) r.tasks[name] = tr @@ -324,7 +324,7 @@ func (r *AllocRunner) Run() { task.Resources = alloc.TaskResources[task.Name] restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy) tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, - r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker, + r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker, r.consulService) r.tasks[task.Name] = tr go tr.Run() diff --git a/client/consul.go b/client/consul.go index e75c71f6a50..39f3985797a 100644 --- a/client/consul.go +++ b/client/consul.go @@ -62,8 +62,8 @@ func (a *consulApiClient) Checks() (map[string]*consul.AgentCheck, error) { // trackedTask is a Task that we are tracking for changes in service and check // definitions and keep them sycned with Consul Agent type trackedTask struct { - allocID string - task *structs.Task + task *structs.Task + alloc *structs.Allocation } // ConsulService is the service which tracks tasks and syncs the services and @@ -143,15 +143,16 @@ func NewConsulService(config *consulServiceConfig) (*ConsulService, error) { // Register starts tracking a task for changes to it's services and tasks and // adds/removes services and checks associated with it. -func (c *ConsulService) Register(task *structs.Task, allocID string) error { +func (c *ConsulService) Register(task *structs.Task, alloc *structs.Allocation) error { var mErr multierror.Error c.trackedTskLock.Lock() - tt := &trackedTask{allocID: allocID, task: task} - c.trackedTasks[fmt.Sprintf("%s-%s", allocID, task.Name)] = tt + tt := &trackedTask{task: task, alloc: alloc} + c.trackedTasks[fmt.Sprintf("%s-%s", alloc.ID, task.Name)] = tt c.trackedTskLock.Unlock() for _, service := range task.Services { c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name) - if err := c.registerService(service, task, allocID); err != nil { + if err := c.registerService(service, task, alloc); err != nil { + fmt.Printf("DIPTANU ERR %v\n", err) mErr.Errors = append(mErr.Errors, err) } } @@ -161,17 +162,18 @@ func (c *ConsulService) Register(task *structs.Task, allocID string) error { // Deregister stops tracking a task for changes to it's services and checks and // removes all the services and checks associated with the Task -func (c *ConsulService) Deregister(task *structs.Task, allocID string) error { +func (c *ConsulService) Deregister(task *structs.Task, alloc *structs.Allocation) error { var mErr multierror.Error c.trackedTskLock.Lock() - delete(c.trackedTasks, fmt.Sprintf("%s-%s", allocID, task.Name)) + delete(c.trackedTasks, fmt.Sprintf("%s-%s", alloc.ID, task.Name)) c.trackedTskLock.Unlock() for _, service := range task.Services { - if service.Id == "" { + serviceId := alloc.Services[service.Name] + if serviceId == "" { continue } c.logger.Printf("[INFO] consul: deregistering service %v with consul", service.Name) - if err := c.deregisterService(service.Id); err != nil { + if err := c.deregisterService(serviceId); err != nil { c.printLogMessage("[DEBUG] consul: error in deregistering service %v from consul", service.Name) mErr.Errors = append(mErr.Errors, err) } @@ -223,28 +225,30 @@ func (c *ConsulService) performSync() { // Add services and checks which Consul doesn't know about for _, trackedTask := range c.trackedTasks { for _, service := range trackedTask.task.Services { + serviceId := trackedTask.alloc.Services[service.Name] // Add new services which Consul agent isn't aware of - knownServices[service.Id] = struct{}{} - if _, ok := consulServices[service.Id]; !ok { + knownServices[serviceId] = struct{}{} + if _, ok := consulServices[serviceId]; !ok { c.printLogMessage("[INFO] consul: registering service %s with consul.", service.Name) - c.registerService(service, trackedTask.task, trackedTask.allocID) + c.registerService(service, trackedTask.task, trackedTask.alloc) continue } // If a service has changed, re-register it with Consul agent - if service.Hash() != c.serviceStates[service.Id] { + if service.Hash() != c.serviceStates[serviceId] { c.printLogMessage("[INFO] consul: reregistering service %s with consul.", service.Name) - c.registerService(service, trackedTask.task, trackedTask.allocID) + c.registerService(service, trackedTask.task, trackedTask.alloc) continue } // Add new checks that Consul isn't aware of for _, check := range service.Checks { - knownChecks[check.Id] = struct{}{} - if _, ok := consulChecks[check.Id]; !ok { + checkId := check.Hash(serviceId) + knownChecks[checkId] = struct{}{} + if _, ok := consulChecks[checkId]; !ok { host, port := trackedTask.task.FindHostAndPortFor(service.PortLabel) - cr := c.makeCheck(service, check, host, port) + cr := c.makeCheck(serviceId, check, host, port) c.registerCheck(cr) } } @@ -276,16 +280,17 @@ func (c *ConsulService) performSync() { } // registerService registers a Service with Consul -func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, allocID string) error { +func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, alloc *structs.Allocation) error { var mErr multierror.Error host, port := task.FindHostAndPortFor(service.PortLabel) if host == "" || port == 0 { return fmt.Errorf("consul: the port:%q marked for registration of service: %q couldn't be found", service.PortLabel, service.Name) } - c.serviceStates[service.Id] = service.Hash() + serviceId := alloc.Services[service.Name] + c.serviceStates[serviceId] = service.Hash() asr := &consul.AgentServiceRegistration{ - ID: service.Id, + ID: serviceId, Name: service.Name, Tags: service.Tags, Port: port, @@ -297,7 +302,7 @@ func (c *ConsulService) registerService(service *structs.Service, task *structs. mErr.Errors = append(mErr.Errors, err) } for _, check := range service.Checks { - cr := c.makeCheck(service, check, host, port) + cr := c.makeCheck(serviceId, check, host, port) if err := c.registerCheck(cr); err != nil { c.printLogMessage("[DEBUG] consul: error while registerting check %v with consul: %v", check.Name, err) mErr.Errors = append(mErr.Errors, err) @@ -329,11 +334,12 @@ func (c *ConsulService) deregisterService(serviceId string) error { } // makeCheck creates a Consul Check Registration struct -func (c *ConsulService) makeCheck(service *structs.Service, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration { +func (c *ConsulService) makeCheck(serviceId string, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration { + checkId := check.Hash(serviceId) cr := &consul.AgentCheckRegistration{ - ID: check.Id, + ID: checkId, Name: check.Name, - ServiceID: service.Id, + ServiceID: serviceId, } cr.Interval = check.Interval.String() cr.Timeout = check.Timeout.String() diff --git a/client/task_runner.go b/client/task_runner.go index 7f6cc40ff5c..cf446c39535 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -23,7 +23,7 @@ type TaskRunner struct { updater TaskStateUpdater logger *log.Logger ctx *driver.ExecContext - allocID string + alloc *structs.Allocation restartTracker restartTracker consulService *ConsulService @@ -52,7 +52,7 @@ type TaskStateUpdater func(taskName string) // NewTaskRunner is used to create a new task context func NewTaskRunner(logger *log.Logger, config *config.Config, updater TaskStateUpdater, ctx *driver.ExecContext, - allocID string, task *structs.Task, state *structs.TaskState, + alloc *structs.Allocation, task *structs.Task, state *structs.TaskState, restartTracker restartTracker, consulService *ConsulService) *TaskRunner { tc := &TaskRunner{ @@ -62,7 +62,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, restartTracker: restartTracker, consulService: consulService, ctx: ctx, - allocID: allocID, + alloc: alloc, task: task, state: state, updateCh: make(chan *structs.Task, 8), @@ -85,7 +85,7 @@ func (r *TaskRunner) stateFilePath() string { dirName := fmt.Sprintf("task-%s", hashHex) // Generate the path - path := filepath.Join(r.config.StateDir, "alloc", r.allocID, + path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, dirName, "state.json") return path } @@ -113,7 +113,7 @@ func (r *TaskRunner) RestoreState() error { // In the case it fails, we relaunch the task in the Run() method. if err != nil { r.logger.Printf("[ERR] client: failed to open handle to task '%s' for alloc '%s': %v", - r.task.Name, r.allocID, err) + r.task.Name, r.alloc.ID, err) return nil } r.handle = handle @@ -176,7 +176,7 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) { driver, err := driver.NewDriver(r.task.Driver, driverCtx) if err != nil { err = fmt.Errorf("failed to create driver '%s' for alloc %s: %v", - r.task.Driver, r.allocID, err) + r.task.Driver, r.alloc.ID, err) r.logger.Printf("[ERR] client: %s", err) } return driver, err @@ -196,7 +196,7 @@ func (r *TaskRunner) startTask() error { handle, err := driver.Start(r.ctx, r.task) if err != nil { r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s': %v", - r.task.Name, r.allocID, err) + r.task.Name, r.alloc.ID, err) e := structs.NewTaskEvent(structs.TaskDriverFailure). SetDriverError(fmt.Errorf("failed to start: %v", err)) r.setState(structs.TaskStateDead, e) @@ -211,7 +211,7 @@ func (r *TaskRunner) startTask() error { func (r *TaskRunner) Run() { defer close(r.waitCh) r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')", - r.task.Name, r.allocID) + r.task.Name, r.alloc.ID) r.run() return @@ -234,10 +234,10 @@ func (r *TaskRunner) run() { destroyed := false // Register the services defined by the task with Consil - r.consulService.Register(r.task, r.allocID) + r.consulService.Register(r.task, r.alloc) // De-Register the services belonging to the task from consul - defer r.consulService.Deregister(r.task, r.allocID) + defer r.consulService.Deregister(r.task, r.alloc) OUTER: // Wait for updates @@ -249,7 +249,7 @@ func (r *TaskRunner) run() { // Update r.task = update if err := r.handle.Update(update); err != nil { - r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err) + r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err) } case <-r.destroyCh: // Avoid destroying twice @@ -259,7 +259,7 @@ func (r *TaskRunner) run() { // Send the kill signal, and use the WaitCh to block until complete if err := r.handle.Kill(); err != nil { - r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err) + r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err) destroyErr = err } destroyed = true @@ -274,16 +274,16 @@ func (r *TaskRunner) run() { // Log whether the task was successful or not. if !waitRes.Successful() { - r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.allocID, waitRes) + r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, waitRes) } else { - r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.allocID) + r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.alloc.ID) } // Check if we should restart. If not mark task as dead and exit. shouldRestart, when := r.restartTracker.nextRestart(waitRes.ExitCode) waitEvent := r.waitErrorToEvent(waitRes) if !shouldRestart { - r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.allocID) + r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID) r.setState(structs.TaskStateDead, waitEvent) return } @@ -329,7 +329,7 @@ func (r *TaskRunner) Update(update *structs.Task) { case r.updateCh <- update: default: r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')", - update.Name, r.allocID) + update.Name, r.alloc.ID) } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 82723965c53..6366c73bcb6 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1106,7 +1106,6 @@ const ( // The ServiceCheck data model represents the consul health check that // Nomad registers for a Task type ServiceCheck struct { - Id string // Id of the check, must be unique and it is autogenrated Name string // Name of the check, defaults to id Type string // Type of the check - tcp, http, docker and script Script string // Script to invoke for script check @@ -1151,7 +1150,6 @@ const ( // The Service model represents a Consul service defintion type Service struct { - Id string // Id of the service, this needs to be unique on a local machine Name string // Name of the service, defaults to id Tags []string // List of tags for the service PortLabel string `mapstructure:"port"` // port for the service @@ -1161,10 +1159,6 @@ type Service struct { // InitFields interpolates values of Job, Task Group and Task in the Service // Name. This also generates check names, service id and check ids. func (s *Service) InitFields(job string, taskGroup string, task string) { - // We add a prefix to the Service ID so that we can know that this service - // is managed by Consul since Consul can also have service which are not - // managed by Nomad - s.Id = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID()) s.Name = args.ReplaceEnv(s.Name, map[string]string{ "JOB": job, "TASKGROUP": taskGroup, @@ -1174,7 +1168,6 @@ func (s *Service) InitFields(job string, taskGroup string, task string) { ) for _, check := range s.Checks { - check.Id = check.Hash(s.Id) if check.Name == "" { check.Name = fmt.Sprintf("service: %q check", s.Name) } @@ -1451,6 +1444,9 @@ type Allocation struct { // task. These should sum to the total Resources. TaskResources map[string]*Resources + // Services is a map of service names and service ids + Services map[string]string + // Metrics associated with this allocation Metrics *AllocMetric @@ -1504,6 +1500,19 @@ func (a *Allocation) Stub() *AllocListStub { } } +func (a *Allocation) PopulateServiceIds() { + a.Services = make(map[string]string) + tg := a.Job.LookupTaskGroup(a.TaskGroup) + for _, task := range tg.Tasks { + for _, service := range task.Services { + // We add a prefix to the Service ID so that we can know that this service + // is managed by Consul since Consul can also have service which are not + // managed by Nomad + a.Services[service.Name] = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID()) + } + } +} + // AllocListStub is used to return a subset of alloc information type AllocListStub struct { ID string diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index b3b48665883..436e4991bcd 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -279,6 +279,10 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { Metrics: s.ctx.Metrics(), } + // Generate the service ids for the tasks which this allocation is going + // to run + alloc.PopulateServiceIds() + // Set fields based on if we found an allocation option if option != nil { alloc.NodeID = option.Node.ID diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index d448642ff53..b8b99bee66b 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -246,6 +246,10 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { Metrics: s.ctx.Metrics(), } + // Generate the service ids for the tasks that this allocation is going + // to run + alloc.PopulateServiceIds() + // Set fields based on if we found an allocation option if option != nil { alloc.NodeID = option.Node.ID From 76666998bbbe074fc71bfb0fe1e7803628ab9853 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 14 Dec 2015 15:09:57 -0800 Subject: [PATCH 02/11] Fixed the jobspec tests --- jobspec/parse_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 8c7d7919974..4497348eba5 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -96,13 +96,11 @@ func TestParse(t *testing.T) { }, Services: []*structs.Service{ { - Id: "", Name: "binstore-storagelocker-binsl-binstore", Tags: []string{"foo", "bar"}, PortLabel: "http", Checks: []*structs.ServiceCheck{ { - Id: "", Name: "check-name", Type: "tcp", Interval: 10 * time.Second, From ed04b11862edecee37154b80a95ee18cde29ce5d Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 14 Dec 2015 15:47:01 -0800 Subject: [PATCH 03/11] Fixed tests --- client/consul_test.go | 30 +++++++++++++++--------------- client/task_runner_test.go | 4 ++-- nomad/mock/mock.go | 1 + nomad/structs/structs_test.go | 7 +------ 4 files changed, 19 insertions(+), 23 deletions(-) diff --git a/client/consul_test.go b/client/consul_test.go index f2fb18ef880..ac16a123381 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -1,7 +1,9 @@ package client import ( + "fmt" consul "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "log" "os" @@ -70,7 +72,6 @@ func newTask() *structs.Task { func TestConsul_MakeChecks(t *testing.T) { service := &structs.Service{ - Id: "Foo", Name: "Bar", Checks: []*structs.ServiceCheck{ { @@ -95,10 +96,11 @@ func TestConsul_MakeChecks(t *testing.T) { } c := newConsulService() + serviceId := fmt.Sprintf("%s-1234", structs.NomadConsulPrefix) - check1 := c.makeCheck(service, service.Checks[0], "10.10.0.1", 8090) - check2 := c.makeCheck(service, service.Checks[1], "10.10.0.1", 8090) - check3 := c.makeCheck(service, service.Checks[2], "10.10.0.1", 8090) + check1 := c.makeCheck(serviceId, service.Checks[0], "10.10.0.1", 8090) + check2 := c.makeCheck(serviceId, service.Checks[1], "10.10.0.1", 8090) + check3 := c.makeCheck(serviceId, service.Checks[2], "10.10.0.1", 8090) if check1.HTTP != "http://10.10.0.1:8090/foo/bar" { t.Fatalf("Invalid http url for check: %v", check1.HTTP) @@ -142,7 +144,6 @@ func TestConsul_InvalidPortLabelForService(t *testing.T) { }, } service := &structs.Service{ - Id: "service-id", Name: "foo", Tags: []string{"a", "b"}, PortLabel: "https", @@ -150,7 +151,7 @@ func TestConsul_InvalidPortLabelForService(t *testing.T) { } c := newConsulService() - if err := c.registerService(service, task, "allocid"); err == nil { + if err := c.registerService(service, task, mock.Alloc()); err == nil { t.Fatalf("Service should be invalid") } } @@ -175,7 +176,7 @@ func TestConsul_Services_Deleted_From_Task(t *testing.T) { }, }, } - c.Register(&task, "1") + c.Register(&task, mock.Alloc()) if len(c.serviceStates) != 1 { t.Fatalf("Expected tracked services: %v, Actual: %v", 1, len(c.serviceStates)) } @@ -191,13 +192,14 @@ func TestConsul_Service_Should_Be_Re_Reregistered_On_Change(t *testing.T) { c := newConsulService() task := newTask() s1 := structs.Service{ - Id: "1-example-cache-redis", Name: "example-cache-redis", Tags: []string{"global"}, PortLabel: "db", } task.Services = append(task.Services, &s1) - c.Register(task, "1") + alloc := mock.Alloc() + serviceID := alloc.Services[s1.Name] + c.Register(task, alloc) s1.Tags = []string{"frontcache"} @@ -207,8 +209,8 @@ func TestConsul_Service_Should_Be_Re_Reregistered_On_Change(t *testing.T) { t.Fatal("We should be tracking one service") } - if c.serviceStates[s1.Id] != s1.Hash() { - t.Fatalf("Hash is %v, expected %v", c.serviceStates[s1.Id], s1.Hash()) + if c.serviceStates[serviceID] != s1.Hash() { + t.Fatalf("Hash is %v, expected %v", c.serviceStates[serviceID], s1.Hash()) } } @@ -219,14 +221,13 @@ func TestConsul_AddCheck_To_Service(t *testing.T) { task := newTask() var checks []*structs.ServiceCheck s1 := structs.Service{ - Id: "1-example-cache-redis", Name: "example-cache-redis", Tags: []string{"global"}, PortLabel: "db", Checks: checks, } task.Services = append(task.Services, &s1) - c.Register(task, "1") + c.Register(task, mock.Alloc()) check1 := structs.ServiceCheck{ Name: "alive", @@ -250,14 +251,13 @@ func TestConsul_ModifyCheck(t *testing.T) { task := newTask() var checks []*structs.ServiceCheck s1 := structs.Service{ - Id: "1-example-cache-redis", Name: "example-cache-redis", Tags: []string{"global"}, PortLabel: "db", Checks: checks, } task.Services = append(task.Services, &s1) - c.Register(task, "1") + c.Register(task, mock.Alloc()) check1 := structs.ServiceCheck{ Name: "alive", diff --git a/client/task_runner_test.go b/client/task_runner_test.go index dcdc845772f..f6b581eb7df 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -48,7 +48,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { } state := alloc.TaskStates[task.Name] - tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, state, restartTracker, consulClient) + tr := NewTaskRunner(logger, conf, upd.Update, ctx, mock.Alloc(), task, state, restartTracker, consulClient) return upd, tr } @@ -166,7 +166,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { // Create a new task runner consulClient, _ := NewConsulService(&consulServiceConfig{tr.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update, - tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker, + tr.ctx, tr.alloc, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker, consulClient) if err := tr2.RestoreState(); err != nil { t.Fatalf("err: %v", err) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 0d2949d8045..8de856483f4 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -226,6 +226,7 @@ func Alloc() *structs.Allocation { }, }, }, + Services: map[string]string{"web-frontend": "nomad-registered-task-1234"}, TaskStates: map[string]*structs.TaskState{ "web": &structs.TaskState{ State: structs.TaskStatePending, diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index a94a2750c29..78c3828dd7b 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -391,13 +391,11 @@ func TestEncodeDecode(t *testing.T) { func TestInvalidServiceCheck(t *testing.T) { s := Service{ - Id: "service-id", Name: "service-name", PortLabel: "bar", Checks: []*ServiceCheck{ { - Id: "check-id", Name: "check-name", Type: "lol", }, @@ -442,7 +440,7 @@ func TestDistinctCheckId(t *testing.T) { } -func TestService_InitFiels(t *testing.T) { +func TestService_InitFields(t *testing.T) { job := "example" taskGroup := "cache" task := "redis" @@ -455,9 +453,6 @@ func TestService_InitFiels(t *testing.T) { if s.Name != "redis-db" { t.Fatalf("Expected name: %v, Actual: %v", "redis-db", s.Name) } - if s.Id == "" { - t.Fatalf("Expected a GUID for Service ID, Actual: %v", s.Id) - } s.Name = "db" s.InitFields(job, taskGroup, task) From f089e249c815c3b2fa78592efb7c442e415c77c0 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 14 Dec 2015 15:57:56 -0800 Subject: [PATCH 04/11] Renamed serviceId to serviceID --- client/consul.go | 54 +++++++++++++++++------------------ client/consul_test.go | 10 +++---- nomad/structs/structs.go | 6 ++-- nomad/structs/structs_test.go | 10 +++---- scheduler/generic_sched.go | 2 +- scheduler/system_sched.go | 2 +- 6 files changed, 42 insertions(+), 42 deletions(-) diff --git a/client/consul.go b/client/consul.go index 39f3985797a..4fff4fbb1db 100644 --- a/client/consul.go +++ b/client/consul.go @@ -47,8 +47,8 @@ func (a *consulApiClient) ServiceRegister(service *consul.AgentServiceRegistrati return a.client.Agent().ServiceRegister(service) } -func (a *consulApiClient) ServiceDeregister(serviceId string) error { - return a.client.Agent().ServiceDeregister(serviceId) +func (a *consulApiClient) ServiceDeregister(serviceID string) error { + return a.client.Agent().ServiceDeregister(serviceID) } func (a *consulApiClient) Services() (map[string]*consul.AgentService, error) { @@ -168,12 +168,12 @@ func (c *ConsulService) Deregister(task *structs.Task, alloc *structs.Allocation delete(c.trackedTasks, fmt.Sprintf("%s-%s", alloc.ID, task.Name)) c.trackedTskLock.Unlock() for _, service := range task.Services { - serviceId := alloc.Services[service.Name] - if serviceId == "" { + serviceID := alloc.Services[service.Name] + if serviceID == "" { continue } c.logger.Printf("[INFO] consul: deregistering service %v with consul", service.Name) - if err := c.deregisterService(serviceId); err != nil { + if err := c.deregisterService(serviceID); err != nil { c.printLogMessage("[DEBUG] consul: error in deregistering service %v from consul", service.Name) mErr.Errors = append(mErr.Errors, err) } @@ -225,18 +225,18 @@ func (c *ConsulService) performSync() { // Add services and checks which Consul doesn't know about for _, trackedTask := range c.trackedTasks { for _, service := range trackedTask.task.Services { - serviceId := trackedTask.alloc.Services[service.Name] + serviceID := trackedTask.alloc.Services[service.Name] // Add new services which Consul agent isn't aware of - knownServices[serviceId] = struct{}{} - if _, ok := consulServices[serviceId]; !ok { + knownServices[serviceID] = struct{}{} + if _, ok := consulServices[serviceID]; !ok { c.printLogMessage("[INFO] consul: registering service %s with consul.", service.Name) c.registerService(service, trackedTask.task, trackedTask.alloc) continue } // If a service has changed, re-register it with Consul agent - if service.Hash() != c.serviceStates[serviceId] { + if service.Hash() != c.serviceStates[serviceID] { c.printLogMessage("[INFO] consul: reregistering service %s with consul.", service.Name) c.registerService(service, trackedTask.task, trackedTask.alloc) continue @@ -244,11 +244,11 @@ func (c *ConsulService) performSync() { // Add new checks that Consul isn't aware of for _, check := range service.Checks { - checkId := check.Hash(serviceId) - knownChecks[checkId] = struct{}{} - if _, ok := consulChecks[checkId]; !ok { + checkID := check.Hash(serviceID) + knownChecks[checkID] = struct{}{} + if _, ok := consulChecks[checkID]; !ok { host, port := trackedTask.task.FindHostAndPortFor(service.PortLabel) - cr := c.makeCheck(serviceId, check, host, port) + cr := c.makeCheck(serviceID, check, host, port) c.registerCheck(cr) } } @@ -256,9 +256,9 @@ func (c *ConsulService) performSync() { } // Remove services from the service tracker which no longer exists - for serviceId := range c.serviceStates { - if _, ok := knownServices[serviceId]; !ok { - delete(c.serviceStates, serviceId) + for serviceID := range c.serviceStates { + if _, ok := knownServices[serviceID]; !ok { + delete(c.serviceStates, serviceID) } } @@ -286,11 +286,11 @@ func (c *ConsulService) registerService(service *structs.Service, task *structs. if host == "" || port == 0 { return fmt.Errorf("consul: the port:%q marked for registration of service: %q couldn't be found", service.PortLabel, service.Name) } - serviceId := alloc.Services[service.Name] - c.serviceStates[serviceId] = service.Hash() + serviceID := alloc.Services[service.Name] + c.serviceStates[serviceID] = service.Hash() asr := &consul.AgentServiceRegistration{ - ID: serviceId, + ID: serviceID, Name: service.Name, Tags: service.Tags, Port: port, @@ -302,7 +302,7 @@ func (c *ConsulService) registerService(service *structs.Service, task *structs. mErr.Errors = append(mErr.Errors, err) } for _, check := range service.Checks { - cr := c.makeCheck(serviceId, check, host, port) + cr := c.makeCheck(serviceID, check, host, port) if err := c.registerCheck(cr); err != nil { c.printLogMessage("[DEBUG] consul: error while registerting check %v with consul: %v", check.Name, err) mErr.Errors = append(mErr.Errors, err) @@ -325,21 +325,21 @@ func (c *ConsulService) deregisterCheck(checkID string) error { } // deregisterService de-registers a Service with a specific id from Consul -func (c *ConsulService) deregisterService(serviceId string) error { - delete(c.serviceStates, serviceId) - if err := c.client.ServiceDeregister(serviceId); err != nil { +func (c *ConsulService) deregisterService(serviceID string) error { + delete(c.serviceStates, serviceID) + if err := c.client.ServiceDeregister(serviceID); err != nil { return err } return nil } // makeCheck creates a Consul Check Registration struct -func (c *ConsulService) makeCheck(serviceId string, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration { - checkId := check.Hash(serviceId) +func (c *ConsulService) makeCheck(serviceID string, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration { + checkID := check.Hash(serviceID) cr := &consul.AgentCheckRegistration{ - ID: checkId, + ID: checkID, Name: check.Name, - ServiceID: serviceId, + ServiceID: serviceID, } cr.Interval = check.Interval.String() cr.Timeout = check.Timeout.String() diff --git a/client/consul_test.go b/client/consul_test.go index ac16a123381..1b111382aba 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -34,7 +34,7 @@ func (a *mockConsulApiClient) ServiceRegister(service *consul.AgentServiceRegist return nil } -func (a *mockConsulApiClient) ServiceDeregister(serviceId string) error { +func (a *mockConsulApiClient) ServiceDeregister(serviceID string) error { a.serviceDeregisterCallCount += 1 return nil } @@ -96,11 +96,11 @@ func TestConsul_MakeChecks(t *testing.T) { } c := newConsulService() - serviceId := fmt.Sprintf("%s-1234", structs.NomadConsulPrefix) + serviceID := fmt.Sprintf("%s-1234", structs.NomadConsulPrefix) - check1 := c.makeCheck(serviceId, service.Checks[0], "10.10.0.1", 8090) - check2 := c.makeCheck(serviceId, service.Checks[1], "10.10.0.1", 8090) - check3 := c.makeCheck(serviceId, service.Checks[2], "10.10.0.1", 8090) + check1 := c.makeCheck(serviceID, service.Checks[0], "10.10.0.1", 8090) + check2 := c.makeCheck(serviceID, service.Checks[1], "10.10.0.1", 8090) + check3 := c.makeCheck(serviceID, service.Checks[2], "10.10.0.1", 8090) if check1.HTTP != "http://10.10.0.1:8090/foo/bar" { t.Fatalf("Invalid http url for check: %v", check1.HTTP) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 6366c73bcb6..5303dd64ab6 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1130,9 +1130,9 @@ func (sc *ServiceCheck) Validate() error { return nil } -func (sc *ServiceCheck) Hash(serviceId string) string { +func (sc *ServiceCheck) Hash(serviceID string) string { h := sha1.New() - io.WriteString(h, serviceId) + io.WriteString(h, serviceID) io.WriteString(h, sc.Name) io.WriteString(h, sc.Type) io.WriteString(h, sc.Script) @@ -1500,7 +1500,7 @@ func (a *Allocation) Stub() *AllocListStub { } } -func (a *Allocation) PopulateServiceIds() { +func (a *Allocation) PopulateServiceIDs() { a.Services = make(map[string]string) tg := a.Job.LookupTaskGroup(a.TaskGroup) for _, task := range tg.Tasks { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 78c3828dd7b..5111ea4a5e0 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -406,7 +406,7 @@ func TestInvalidServiceCheck(t *testing.T) { } } -func TestDistinctCheckId(t *testing.T) { +func TestDistinctCheckID(t *testing.T) { c1 := ServiceCheck{ Name: "web-health", Type: "http", @@ -429,10 +429,10 @@ func TestDistinctCheckId(t *testing.T) { Interval: 4 * time.Second, Timeout: 3 * time.Second, } - serviceId := "123" - c1Hash := c1.Hash(serviceId) - c2Hash := c2.Hash(serviceId) - c3Hash := c3.Hash(serviceId) + serviceID := "123" + c1Hash := c1.Hash(serviceID) + c2Hash := c2.Hash(serviceID) + c3Hash := c3.Hash(serviceID) if c1Hash == c2Hash || c1Hash == c3Hash || c3Hash == c2Hash { t.Fatalf("Checks need to be uniq c1: %s, c2: %s, c3: %s", c1Hash, c2Hash, c3Hash) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 436e4991bcd..26bb737bd25 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -281,7 +281,7 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { // Generate the service ids for the tasks which this allocation is going // to run - alloc.PopulateServiceIds() + alloc.PopulateServiceIDs() // Set fields based on if we found an allocation option if option != nil { diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index b8b99bee66b..42370b8fd79 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -248,7 +248,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { // Generate the service ids for the tasks that this allocation is going // to run - alloc.PopulateServiceIds() + alloc.PopulateServiceIDs() // Set fields based on if we found an allocation option if option != nil { From f4526cd7e32471b9e26215add018b800dd488e36 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 14 Dec 2015 17:06:58 -0800 Subject: [PATCH 05/11] Re-initializing the service map for in place updates --- scheduler/util.go | 1 + scheduler/util_test.go | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/scheduler/util.go b/scheduler/util.go index 44bd2ae0851..27a714a0bc8 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -383,6 +383,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, newAlloc.Metrics = ctx.Metrics() newAlloc.DesiredStatus = structs.AllocDesiredStatusRun newAlloc.ClientStatus = structs.AllocClientStatusPending + newAlloc.PopulateServiceIDs() ctx.Plan().AppendAlloc(newAlloc) // Remove this allocation from the slice diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 747161e51c1..d1e610c8d41 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -533,17 +533,19 @@ func TestInplaceUpdate_Success(t *testing.T) { state, ctx := testContext(t) eval := mock.Eval() job := mock.Job() + job.InitAllServiceFields() node := mock.Node() noErr(t, state.UpsertNode(1000, node)) // Register an alloc alloc := &structs.Allocation{ - ID: structs.GenerateUUID(), - EvalID: eval.ID, - NodeID: node.ID, - JobID: job.ID, - Job: job, + ID: structs.GenerateUUID(), + EvalID: eval.ID, + NodeID: node.ID, + JobID: job.ID, + Job: job, + TaskGroup: job.TaskGroups[0].Name, Resources: &structs.Resources{ CPU: 2048, MemoryMB: 2048, @@ -551,13 +553,19 @@ func TestInplaceUpdate_Success(t *testing.T) { DesiredStatus: structs.AllocDesiredStatusRun, } alloc.TaskResources = map[string]*structs.Resources{"web": alloc.Resources} + alloc.PopulateServiceIDs() noErr(t, state.UpsertAllocs(1001, []*structs.Allocation{alloc})) + if alloc.Services["web-frontend"] == "" { + t.Fatal("Service ID needs to be generated for service") + } + // Create a new task group that updates the resources. tg := &structs.TaskGroup{} *tg = *job.TaskGroups[0] resource := &structs.Resources{CPU: 737} tg.Tasks[0].Resources = resource + tg.Tasks[0].Services = []*structs.Service{} updates := []allocTuple{{Alloc: alloc, TaskGroup: tg}} stack := NewGenericStack(false, ctx) From c52a5865c0c59ebc8bbfb8bb1e57e0ac183480db Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 14 Dec 2015 17:17:29 -0800 Subject: [PATCH 06/11] Fixed the job endpoints test --- nomad/job_endpoint_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 53934bf215a..986ebc102c0 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -359,15 +359,12 @@ func TestJobEndpoint_GetJob(t *testing.T) { for tgix, tg := range j.TaskGroups { for tidx, t := range tg.Tasks { for sidx, service := range t.Services { - service.Id = resp2.Job.TaskGroups[tgix].Tasks[tidx].Services[sidx].Id for cidx, check := range service.Checks { check.Name = resp2.Job.TaskGroups[tgix].Tasks[tidx].Services[sidx].Checks[cidx].Name - check.Id = resp2.Job.TaskGroups[tgix].Tasks[tidx].Services[sidx].Checks[cidx].Id } } } } - j.TaskGroups[0].Tasks[0].Services[0].Id = resp2.Job.TaskGroups[0].Tasks[0].Services[0].Id if !reflect.DeepEqual(j, resp2.Job) { t.Fatalf("bad: %#v %#v", job, resp2.Job) From cbc70a54656e4f720a975b30b7878f81ee3c10f7 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 14 Dec 2015 18:05:58 -0800 Subject: [PATCH 07/11] Changed some comments --- client/consul.go | 1 - nomad/mock/mock.go | 1 + nomad/structs/structs.go | 6 ++++-- scheduler/generic_sched.go | 8 ++++---- scheduler/util_test.go | 1 - 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/client/consul.go b/client/consul.go index 4fff4fbb1db..0340f9bed8c 100644 --- a/client/consul.go +++ b/client/consul.go @@ -152,7 +152,6 @@ func (c *ConsulService) Register(task *structs.Task, alloc *structs.Allocation) for _, service := range task.Services { c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name) if err := c.registerService(service, task, alloc); err != nil { - fmt.Printf("DIPTANU ERR %v\n", err) mErr.Errors = append(mErr.Errors, err) } } diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 8de856483f4..d645f58ee35 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -122,6 +122,7 @@ func Job() *structs.Job { CreateIndex: 42, ModifyIndex: 99, } + job.InitAllServiceFields() return job } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5303dd64ab6..b8f3ddaae6c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1444,7 +1444,7 @@ type Allocation struct { // task. These should sum to the total Resources. TaskResources map[string]*Resources - // Services is a map of service names and service ids + // Services is a map of service names to service ids Services map[string]string // Metrics associated with this allocation @@ -1500,13 +1500,15 @@ func (a *Allocation) Stub() *AllocListStub { } } +// PopulateServiceIDs generates the service IDs for all the service definitions +// in that Allocation func (a *Allocation) PopulateServiceIDs() { a.Services = make(map[string]string) tg := a.Job.LookupTaskGroup(a.TaskGroup) for _, task := range tg.Tasks { for _, service := range task.Services { // We add a prefix to the Service ID so that we can know that this service - // is managed by Consul since Consul can also have service which are not + // is managed by Nomad Consul since Consul can also have service which are not // managed by Nomad a.Services[service.Name] = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID()) } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 26bb737bd25..edaeaf94c4d 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -279,12 +279,12 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { Metrics: s.ctx.Metrics(), } - // Generate the service ids for the tasks which this allocation is going - // to run - alloc.PopulateServiceIDs() - // Set fields based on if we found an allocation option if option != nil { + // Generate the service ids for the tasks which this allocation is going + // to run + alloc.PopulateServiceIDs() + alloc.NodeID = option.Node.ID alloc.TaskResources = option.TaskResources alloc.DesiredStatus = structs.AllocDesiredStatusRun diff --git a/scheduler/util_test.go b/scheduler/util_test.go index d1e610c8d41..cb5acb3accf 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -533,7 +533,6 @@ func TestInplaceUpdate_Success(t *testing.T) { state, ctx := testContext(t) eval := mock.Eval() job := mock.Job() - job.InitAllServiceFields() node := mock.Node() noErr(t, state.UpsertNode(1000, node)) From 962bdf744d54f114dc2903e8b8114a29d55eb97e Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Tue, 15 Dec 2015 08:35:26 -0800 Subject: [PATCH 08/11] Added a test to prove services are removed from the map in Alloc if they are removed from the Tasks --- scheduler/util_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/scheduler/util_test.go b/scheduler/util_test.go index cb5acb3accf..2b287b83a8a 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -580,6 +580,12 @@ func TestInplaceUpdate_Success(t *testing.T) { if len(ctx.plan.NodeAllocation) != 1 { t.Fatal("inplaceUpdate did not do an inplace update") } + + // Get the alloc we inserted. + a := ctx.plan.NodeAllocation[alloc.NodeID][0] + if len(a.Services) != 0 { + t.Fatalf("Expected number of services: %v, Actual: %v", 0, len(a.Services)) + } } func TestEvictAndPlace_LimitGreaterThanAllocs(t *testing.T) { From fa5beb7fe5e7d2b34b86dec616129cac94572e0a Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Tue, 15 Dec 2015 08:38:18 -0800 Subject: [PATCH 09/11] Populating service ids only if allocations can be placed for system jobs --- nomad/structs/structs.go | 2 +- scheduler/system_sched.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b8f3ddaae6c..7223199014b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1508,7 +1508,7 @@ func (a *Allocation) PopulateServiceIDs() { for _, task := range tg.Tasks { for _, service := range task.Services { // We add a prefix to the Service ID so that we can know that this service - // is managed by Nomad Consul since Consul can also have service which are not + // is managed by Nomad since Consul can also have service which are not // managed by Nomad a.Services[service.Name] = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID()) } diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 42370b8fd79..80a87fe94f8 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -246,12 +246,12 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { Metrics: s.ctx.Metrics(), } - // Generate the service ids for the tasks that this allocation is going - // to run - alloc.PopulateServiceIDs() - // Set fields based on if we found an allocation option if option != nil { + // Generate the service ids for the tasks that this allocation is going + // to run + alloc.PopulateServiceIDs() + alloc.NodeID = option.Node.ID alloc.TaskResources = option.TaskResources alloc.DesiredStatus = structs.AllocDesiredStatusRun From 9686f6e26f8f6de7522e1845250a117fcac74de9 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Tue, 15 Dec 2015 09:14:32 -0800 Subject: [PATCH 10/11] Making sure existing ids for services are not re-generated --- nomad/structs/structs.go | 22 ++++++++++++++++++---- scheduler/util_test.go | 25 +++++++++++++++++++++---- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7223199014b..a86fae84334 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1503,14 +1503,28 @@ func (a *Allocation) Stub() *AllocListStub { // PopulateServiceIDs generates the service IDs for all the service definitions // in that Allocation func (a *Allocation) PopulateServiceIDs() { + // Make a copy of the old map which contains the service names and their + // generated IDs + oldIDs := make(map[string]string) + for k, v := range a.Services { + oldIDs[k] = v + } + a.Services = make(map[string]string) tg := a.Job.LookupTaskGroup(a.TaskGroup) for _, task := range tg.Tasks { for _, service := range task.Services { - // We add a prefix to the Service ID so that we can know that this service - // is managed by Nomad since Consul can also have service which are not - // managed by Nomad - a.Services[service.Name] = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID()) + // If the ID for a service name is already generated then we re-use + // it + if ID, ok := oldIDs[service.Name]; ok { + a.Services[service.Name] = ID + } else { + // If the service hasn't been generated an ID, we generate one. + // We add a prefix to the Service ID so that we can know that this service + // is managed by Nomad since Consul can also have service which are not + // managed by Nomad + a.Services[service.Name] = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID()) + } } } } diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 2b287b83a8a..421ca8f1fe9 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -555,7 +555,9 @@ func TestInplaceUpdate_Success(t *testing.T) { alloc.PopulateServiceIDs() noErr(t, state.UpsertAllocs(1001, []*structs.Allocation{alloc})) - if alloc.Services["web-frontend"] == "" { + webFeSrvID := alloc.Services["web-frontend"] + + if webFeSrvID == "" { t.Fatal("Service ID needs to be generated for service") } @@ -564,7 +566,17 @@ func TestInplaceUpdate_Success(t *testing.T) { *tg = *job.TaskGroups[0] resource := &structs.Resources{CPU: 737} tg.Tasks[0].Resources = resource - tg.Tasks[0].Services = []*structs.Service{} + newServices := []*structs.Service{ + { + Name: "dummy-service", + PortLabel: "http", + }, + { + Name: "dummy-service2", + PortLabel: "http", + }, + } + tg.Tasks[0].Services = append(tg.Tasks[0].Services, newServices...) updates := []allocTuple{{Alloc: alloc, TaskGroup: tg}} stack := NewGenericStack(false, ctx) @@ -583,8 +595,13 @@ func TestInplaceUpdate_Success(t *testing.T) { // Get the alloc we inserted. a := ctx.plan.NodeAllocation[alloc.NodeID][0] - if len(a.Services) != 0 { - t.Fatalf("Expected number of services: %v, Actual: %v", 0, len(a.Services)) + if len(a.Services) != 3 { + t.Fatalf("Expected number of services: %v, Actual: %v", 3, len(a.Services)) + } + + // Test that the service id for the old service is still the same + if a.Services["web-frontend"] != webFeSrvID { + t.Fatalf("Expected service ID: %v, Actual: %v", webFeSrvID, a.Services["web-frontend"]) } } From ed911ca0116aab028e879ce72659c7449e63c848 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Tue, 15 Dec 2015 10:43:56 -0800 Subject: [PATCH 11/11] Added a test to make sure services no longer present are being removed --- nomad/mock/mock.go | 6 +++++- scheduler/util_test.go | 14 +++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index d645f58ee35..012677f18b8 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -95,6 +95,10 @@ func Job() *structs.Job { Name: "${TASK}-frontend", PortLabel: "http", }, + { + Name: "${TASK}-admin", + PortLabel: "admin", + }, }, Resources: &structs.Resources{ CPU: 500, @@ -102,7 +106,7 @@ func Job() *structs.Job { Networks: []*structs.NetworkResource{ &structs.NetworkResource{ MBits: 50, - DynamicPorts: []structs.Port{{Label: "http"}}, + DynamicPorts: []structs.Port{{Label: "http"}, {Label: "admin"}}, }, }, }, diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 421ca8f1fe9..4d6d21db85f 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -556,8 +556,9 @@ func TestInplaceUpdate_Success(t *testing.T) { noErr(t, state.UpsertAllocs(1001, []*structs.Allocation{alloc})) webFeSrvID := alloc.Services["web-frontend"] + adminSrvID := alloc.Services["web-admin"] - if webFeSrvID == "" { + if webFeSrvID == "" || adminSrvID == "" { t.Fatal("Service ID needs to be generated for service") } @@ -576,6 +577,11 @@ func TestInplaceUpdate_Success(t *testing.T) { PortLabel: "http", }, } + + // Delete service 2 + tg.Tasks[0].Services = tg.Tasks[0].Services[:1] + + // Add the new services tg.Tasks[0].Services = append(tg.Tasks[0].Services, newServices...) updates := []allocTuple{{Alloc: alloc, TaskGroup: tg}} @@ -603,6 +609,12 @@ func TestInplaceUpdate_Success(t *testing.T) { if a.Services["web-frontend"] != webFeSrvID { t.Fatalf("Expected service ID: %v, Actual: %v", webFeSrvID, a.Services["web-frontend"]) } + + // Test that the map doesn't contain the service ID of the admin Service + // anymore + if _, ok := a.Services["web-admin"]; ok { + t.Fatal("Service shouldn't be present") + } } func TestEvictAndPlace_LimitGreaterThanAllocs(t *testing.T) {