diff --git a/api/allocations.go b/api/allocations.go index 00ab6984dde..73f600d7e8a 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -47,6 +47,7 @@ type Allocation struct { TaskGroup string Resources *Resources TaskResources map[string]*Resources + Services map[string]string Metrics *AllocationMetric DesiredStatus string DesiredDescription string diff --git a/client/alloc_runner.go b/client/alloc_runner.go index d48aaf166fd..14b93dd0da0 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -112,7 +112,7 @@ func (r *AllocRunner) RestoreState() error { task := &structs.Task{Name: name} restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy) tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, - r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker, + r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker, r.consulService) r.tasks[name] = tr @@ -324,7 +324,7 @@ func (r *AllocRunner) Run() { task.Resources = alloc.TaskResources[task.Name] restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy) tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, - r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker, + r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker, r.consulService) r.tasks[task.Name] = tr go tr.Run() diff --git a/client/consul.go b/client/consul.go index e75c71f6a50..39f3985797a 100644 --- a/client/consul.go +++ b/client/consul.go @@ -62,8 +62,8 @@ func (a *consulApiClient) Checks() (map[string]*consul.AgentCheck, error) { // trackedTask is a Task that we are tracking for changes in service and check // definitions and keep them sycned with Consul Agent type trackedTask struct { - allocID string - task *structs.Task + task *structs.Task + alloc *structs.Allocation } // ConsulService is the service which tracks tasks and syncs the services and @@ -143,15 +143,16 @@ func NewConsulService(config *consulServiceConfig) (*ConsulService, error) { // Register starts tracking a task for changes to it's services and tasks and // adds/removes services and checks associated with it. -func (c *ConsulService) Register(task *structs.Task, allocID string) error { +func (c *ConsulService) Register(task *structs.Task, alloc *structs.Allocation) error { var mErr multierror.Error c.trackedTskLock.Lock() - tt := &trackedTask{allocID: allocID, task: task} - c.trackedTasks[fmt.Sprintf("%s-%s", allocID, task.Name)] = tt + tt := &trackedTask{task: task, alloc: alloc} + c.trackedTasks[fmt.Sprintf("%s-%s", alloc.ID, task.Name)] = tt c.trackedTskLock.Unlock() for _, service := range task.Services { c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name) - if err := c.registerService(service, task, allocID); err != nil { + if err := c.registerService(service, task, alloc); err != nil { + fmt.Printf("DIPTANU ERR %v\n", err) mErr.Errors = append(mErr.Errors, err) } } @@ -161,17 +162,18 @@ func (c *ConsulService) Register(task *structs.Task, allocID string) error { // Deregister stops tracking a task for changes to it's services and checks and // removes all the services and checks associated with the Task -func (c *ConsulService) Deregister(task *structs.Task, allocID string) error { +func (c *ConsulService) Deregister(task *structs.Task, alloc *structs.Allocation) error { var mErr multierror.Error c.trackedTskLock.Lock() - delete(c.trackedTasks, fmt.Sprintf("%s-%s", allocID, task.Name)) + delete(c.trackedTasks, fmt.Sprintf("%s-%s", alloc.ID, task.Name)) c.trackedTskLock.Unlock() for _, service := range task.Services { - if service.Id == "" { + serviceId := alloc.Services[service.Name] + if serviceId == "" { continue } c.logger.Printf("[INFO] consul: deregistering service %v with consul", service.Name) - if err := c.deregisterService(service.Id); err != nil { + if err := c.deregisterService(serviceId); err != nil { c.printLogMessage("[DEBUG] consul: error in deregistering service %v from consul", service.Name) mErr.Errors = append(mErr.Errors, err) } @@ -223,28 +225,30 @@ func (c *ConsulService) performSync() { // Add services and checks which Consul doesn't know about for _, trackedTask := range c.trackedTasks { for _, service := range trackedTask.task.Services { + serviceId := trackedTask.alloc.Services[service.Name] // Add new services which Consul agent isn't aware of - knownServices[service.Id] = struct{}{} - if _, ok := consulServices[service.Id]; !ok { + knownServices[serviceId] = struct{}{} + if _, ok := consulServices[serviceId]; !ok { c.printLogMessage("[INFO] consul: registering service %s with consul.", service.Name) - c.registerService(service, trackedTask.task, trackedTask.allocID) + c.registerService(service, trackedTask.task, trackedTask.alloc) continue } // If a service has changed, re-register it with Consul agent - if service.Hash() != c.serviceStates[service.Id] { + if service.Hash() != c.serviceStates[serviceId] { c.printLogMessage("[INFO] consul: reregistering service %s with consul.", service.Name) - c.registerService(service, trackedTask.task, trackedTask.allocID) + c.registerService(service, trackedTask.task, trackedTask.alloc) continue } // Add new checks that Consul isn't aware of for _, check := range service.Checks { - knownChecks[check.Id] = struct{}{} - if _, ok := consulChecks[check.Id]; !ok { + checkId := check.Hash(serviceId) + knownChecks[checkId] = struct{}{} + if _, ok := consulChecks[checkId]; !ok { host, port := trackedTask.task.FindHostAndPortFor(service.PortLabel) - cr := c.makeCheck(service, check, host, port) + cr := c.makeCheck(serviceId, check, host, port) c.registerCheck(cr) } } @@ -276,16 +280,17 @@ func (c *ConsulService) performSync() { } // registerService registers a Service with Consul -func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, allocID string) error { +func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, alloc *structs.Allocation) error { var mErr multierror.Error host, port := task.FindHostAndPortFor(service.PortLabel) if host == "" || port == 0 { return fmt.Errorf("consul: the port:%q marked for registration of service: %q couldn't be found", service.PortLabel, service.Name) } - c.serviceStates[service.Id] = service.Hash() + serviceId := alloc.Services[service.Name] + c.serviceStates[serviceId] = service.Hash() asr := &consul.AgentServiceRegistration{ - ID: service.Id, + ID: serviceId, Name: service.Name, Tags: service.Tags, Port: port, @@ -297,7 +302,7 @@ func (c *ConsulService) registerService(service *structs.Service, task *structs. mErr.Errors = append(mErr.Errors, err) } for _, check := range service.Checks { - cr := c.makeCheck(service, check, host, port) + cr := c.makeCheck(serviceId, check, host, port) if err := c.registerCheck(cr); err != nil { c.printLogMessage("[DEBUG] consul: error while registerting check %v with consul: %v", check.Name, err) mErr.Errors = append(mErr.Errors, err) @@ -329,11 +334,12 @@ func (c *ConsulService) deregisterService(serviceId string) error { } // makeCheck creates a Consul Check Registration struct -func (c *ConsulService) makeCheck(service *structs.Service, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration { +func (c *ConsulService) makeCheck(serviceId string, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration { + checkId := check.Hash(serviceId) cr := &consul.AgentCheckRegistration{ - ID: check.Id, + ID: checkId, Name: check.Name, - ServiceID: service.Id, + ServiceID: serviceId, } cr.Interval = check.Interval.String() cr.Timeout = check.Timeout.String() diff --git a/client/task_runner.go b/client/task_runner.go index 7f6cc40ff5c..cf446c39535 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -23,7 +23,7 @@ type TaskRunner struct { updater TaskStateUpdater logger *log.Logger ctx *driver.ExecContext - allocID string + alloc *structs.Allocation restartTracker restartTracker consulService *ConsulService @@ -52,7 +52,7 @@ type TaskStateUpdater func(taskName string) // NewTaskRunner is used to create a new task context func NewTaskRunner(logger *log.Logger, config *config.Config, updater TaskStateUpdater, ctx *driver.ExecContext, - allocID string, task *structs.Task, state *structs.TaskState, + alloc *structs.Allocation, task *structs.Task, state *structs.TaskState, restartTracker restartTracker, consulService *ConsulService) *TaskRunner { tc := &TaskRunner{ @@ -62,7 +62,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, restartTracker: restartTracker, consulService: consulService, ctx: ctx, - allocID: allocID, + alloc: alloc, task: task, state: state, updateCh: make(chan *structs.Task, 8), @@ -85,7 +85,7 @@ func (r *TaskRunner) stateFilePath() string { dirName := fmt.Sprintf("task-%s", hashHex) // Generate the path - path := filepath.Join(r.config.StateDir, "alloc", r.allocID, + path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, dirName, "state.json") return path } @@ -113,7 +113,7 @@ func (r *TaskRunner) RestoreState() error { // In the case it fails, we relaunch the task in the Run() method. if err != nil { r.logger.Printf("[ERR] client: failed to open handle to task '%s' for alloc '%s': %v", - r.task.Name, r.allocID, err) + r.task.Name, r.alloc.ID, err) return nil } r.handle = handle @@ -176,7 +176,7 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) { driver, err := driver.NewDriver(r.task.Driver, driverCtx) if err != nil { err = fmt.Errorf("failed to create driver '%s' for alloc %s: %v", - r.task.Driver, r.allocID, err) + r.task.Driver, r.alloc.ID, err) r.logger.Printf("[ERR] client: %s", err) } return driver, err @@ -196,7 +196,7 @@ func (r *TaskRunner) startTask() error { handle, err := driver.Start(r.ctx, r.task) if err != nil { r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s': %v", - r.task.Name, r.allocID, err) + r.task.Name, r.alloc.ID, err) e := structs.NewTaskEvent(structs.TaskDriverFailure). SetDriverError(fmt.Errorf("failed to start: %v", err)) r.setState(structs.TaskStateDead, e) @@ -211,7 +211,7 @@ func (r *TaskRunner) startTask() error { func (r *TaskRunner) Run() { defer close(r.waitCh) r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')", - r.task.Name, r.allocID) + r.task.Name, r.alloc.ID) r.run() return @@ -234,10 +234,10 @@ func (r *TaskRunner) run() { destroyed := false // Register the services defined by the task with Consil - r.consulService.Register(r.task, r.allocID) + r.consulService.Register(r.task, r.alloc) // De-Register the services belonging to the task from consul - defer r.consulService.Deregister(r.task, r.allocID) + defer r.consulService.Deregister(r.task, r.alloc) OUTER: // Wait for updates @@ -249,7 +249,7 @@ func (r *TaskRunner) run() { // Update r.task = update if err := r.handle.Update(update); err != nil { - r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err) + r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err) } case <-r.destroyCh: // Avoid destroying twice @@ -259,7 +259,7 @@ func (r *TaskRunner) run() { // Send the kill signal, and use the WaitCh to block until complete if err := r.handle.Kill(); err != nil { - r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err) + r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err) destroyErr = err } destroyed = true @@ -274,16 +274,16 @@ func (r *TaskRunner) run() { // Log whether the task was successful or not. if !waitRes.Successful() { - r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.allocID, waitRes) + r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, waitRes) } else { - r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.allocID) + r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.alloc.ID) } // Check if we should restart. If not mark task as dead and exit. shouldRestart, when := r.restartTracker.nextRestart(waitRes.ExitCode) waitEvent := r.waitErrorToEvent(waitRes) if !shouldRestart { - r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.allocID) + r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID) r.setState(structs.TaskStateDead, waitEvent) return } @@ -329,7 +329,7 @@ func (r *TaskRunner) Update(update *structs.Task) { case r.updateCh <- update: default: r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')", - update.Name, r.allocID) + update.Name, r.alloc.ID) } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 82723965c53..6366c73bcb6 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1106,7 +1106,6 @@ const ( // The ServiceCheck data model represents the consul health check that // Nomad registers for a Task type ServiceCheck struct { - Id string // Id of the check, must be unique and it is autogenrated Name string // Name of the check, defaults to id Type string // Type of the check - tcp, http, docker and script Script string // Script to invoke for script check @@ -1151,7 +1150,6 @@ const ( // The Service model represents a Consul service defintion type Service struct { - Id string // Id of the service, this needs to be unique on a local machine Name string // Name of the service, defaults to id Tags []string // List of tags for the service PortLabel string `mapstructure:"port"` // port for the service @@ -1161,10 +1159,6 @@ type Service struct { // InitFields interpolates values of Job, Task Group and Task in the Service // Name. This also generates check names, service id and check ids. func (s *Service) InitFields(job string, taskGroup string, task string) { - // We add a prefix to the Service ID so that we can know that this service - // is managed by Consul since Consul can also have service which are not - // managed by Nomad - s.Id = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID()) s.Name = args.ReplaceEnv(s.Name, map[string]string{ "JOB": job, "TASKGROUP": taskGroup, @@ -1174,7 +1168,6 @@ func (s *Service) InitFields(job string, taskGroup string, task string) { ) for _, check := range s.Checks { - check.Id = check.Hash(s.Id) if check.Name == "" { check.Name = fmt.Sprintf("service: %q check", s.Name) } @@ -1451,6 +1444,9 @@ type Allocation struct { // task. These should sum to the total Resources. TaskResources map[string]*Resources + // Services is a map of service names and service ids + Services map[string]string + // Metrics associated with this allocation Metrics *AllocMetric @@ -1504,6 +1500,19 @@ func (a *Allocation) Stub() *AllocListStub { } } +func (a *Allocation) PopulateServiceIds() { + a.Services = make(map[string]string) + tg := a.Job.LookupTaskGroup(a.TaskGroup) + for _, task := range tg.Tasks { + for _, service := range task.Services { + // We add a prefix to the Service ID so that we can know that this service + // is managed by Consul since Consul can also have service which are not + // managed by Nomad + a.Services[service.Name] = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID()) + } + } +} + // AllocListStub is used to return a subset of alloc information type AllocListStub struct { ID string diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index b3b48665883..436e4991bcd 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -279,6 +279,10 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { Metrics: s.ctx.Metrics(), } + // Generate the service ids for the tasks which this allocation is going + // to run + alloc.PopulateServiceIds() + // Set fields based on if we found an allocation option if option != nil { alloc.NodeID = option.Node.ID diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index d448642ff53..b8b99bee66b 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -246,6 +246,10 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { Metrics: s.ctx.Metrics(), } + // Generate the service ids for the tasks that this allocation is going + // to run + alloc.PopulateServiceIds() + // Set fields based on if we found an allocation option if option != nil { alloc.NodeID = option.Node.ID