From b290b8ef79a4c14928255689256872af7539de93 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 5 Feb 2016 12:07:56 -0800 Subject: [PATCH 1/2] Update the consul service when the task/alloc changes --- client/alloc_runner.go | 18 ------------------ client/consul.go | 31 ++++++++++++++++++++++--------- client/task_runner.go | 13 +++++++------ 3 files changed, 29 insertions(+), 33 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 928b0105be9..f1c33fb7237 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -364,8 +364,6 @@ func (r *AllocRunner) Run() { continue } - // Merge in the task resources - task.Resources = alloc.TaskResources[task.Name] tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc, task, r.consulService) r.tasks[task.Name] = tr @@ -392,22 +390,6 @@ OUTER: r.taskLock.RLock() for _, task := range tg.Tasks { tr := r.tasks[task.Name] - - // Merge in the task resources - task.Resources = update.TaskResources[task.Name] - FOUND: - for _, updateGroup := range update.Job.TaskGroups { - if tg.Name != updateGroup.Name { - continue - } - for _, updateTask := range updateGroup.Tasks { - if updateTask.Name != task.Name { - continue - } - task.Services = updateTask.Services - break FOUND - } - } tr.Update(update) } r.taskLock.RUnlock() diff --git a/client/consul.go b/client/consul.go index a5a0155040a..f0462486f86 100644 --- a/client/consul.go +++ b/client/consul.go @@ -76,6 +76,7 @@ type ConsulService struct { trackedTasks map[string]*trackedTask serviceStates map[string]string + allocToService map[string][]string trackedTskLock sync.Mutex } @@ -130,12 +131,13 @@ func NewConsulService(config *consulServiceConfig) (*ConsulService, error) { } consulService := ConsulService{ - client: &consulApiClient{client: c}, - logger: config.logger, - node: config.node, - trackedTasks: make(map[string]*trackedTask), - serviceStates: make(map[string]string), - shutdownCh: make(chan struct{}), + client: &consulApiClient{client: c}, + logger: config.logger, + node: config.node, + trackedTasks: make(map[string]*trackedTask), + serviceStates: make(map[string]string), + allocToService: make(map[string][]string), + shutdownCh: make(chan struct{}), } return &consulService, nil @@ -148,8 +150,18 @@ func (c *ConsulService) Register(task *structs.Task, alloc *structs.Allocation) c.trackedTskLock.Lock() tt := &trackedTask{task: task, alloc: alloc} c.trackedTasks[fmt.Sprintf("%s-%s", alloc.ID, task.Name)] = tt + + // Delete any previously registered service as the same alloc is being + // re-registered. + for _, service := range c.allocToService[alloc.ID] { + delete(c.serviceStates, service) + } c.trackedTskLock.Unlock() + for _, service := range task.Services { + // Track the services this alloc is registering. + c.allocToService[alloc.ID] = append(c.allocToService[alloc.ID], service.Name) + c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name) if err := c.registerService(service, task, alloc); err != nil { mErr.Errors = append(mErr.Errors, err) @@ -165,6 +177,7 @@ func (c *ConsulService) Deregister(task *structs.Task, alloc *structs.Allocation var mErr multierror.Error c.trackedTskLock.Lock() delete(c.trackedTasks, fmt.Sprintf("%s-%s", alloc.ID, task.Name)) + delete(c.allocToService, alloc.ID) c.trackedTskLock.Unlock() for _, service := range task.Services { serviceID := alloc.Services[service.Name] @@ -229,14 +242,14 @@ func (c *ConsulService) performSync() { // Add new services which Consul agent isn't aware of knownServices[serviceID] = struct{}{} if _, ok := consulServices[serviceID]; !ok { - c.printLogMessage("[INFO] consul: registering service %s with consul.", service.Name) + c.printLogMessage("[INFO] consul: perform sync, registering service %s with consul.", service.Name) c.registerService(service, trackedTask.task, trackedTask.alloc) continue } // If a service has changed, re-register it with Consul agent if service.Hash() != c.serviceStates[serviceID] { - c.printLogMessage("[INFO] consul: reregistering service %s with consul.", service.Name) + c.printLogMessage("[INFO] consul: perform sync hash change, reregistering service %s with consul.", service.Name) c.registerService(service, trackedTask.task, trackedTask.alloc) continue } @@ -268,7 +281,7 @@ func (c *ConsulService) performSync() { for _, consulService := range consulServices { if _, ok := knownServices[consulService.ID]; !ok { delete(c.serviceStates, consulService.ID) - c.printLogMessage("[INFO] consul: deregistering service %v with consul", consulService.Service) + c.printLogMessage("[INFO] consul: perform sync, deregistering service %v with consul", consulService.Service) c.deregisterService(consulService.ID) } } diff --git a/client/task_runner.go b/client/task_runner.go index 3d2b5c7c228..b32b9d310e0 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -54,6 +54,9 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, alloc *structs.Allocation, task *structs.Task, consulService *ConsulService) *TaskRunner { + // Merge in the task resources + task.Resources = alloc.TaskResources[task.Name] + // Build the restart tracker. tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) if tg == nil { @@ -328,6 +331,9 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { if task == nil { return fmt.Errorf("task group %q doesn't contain task %q", tg.Name, r.task.Name) } + + // Merge in the task resources + task.Resources = update.TaskResources[task.Name] r.task = task // Update will update resources and store the new kill timeout. @@ -342,14 +348,9 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { r.restartTracker.SetPolicy(tg.RestartPolicy) } - /* TODO // Re-register the task to consul and store the updated alloc. - r.consulService.Deregister(r.task, r.alloc) r.alloc = update - r.consulService.Register(r.task, r.alloc) - */ - - return nil + return r.consulService.Register(r.task, r.alloc) } // Helper function for converting a WaitResult into a TaskTerminated event. From e8f88d332a7edc6c8dcefe57a453671108b9a2b5 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sat, 6 Feb 2016 17:08:12 -0800 Subject: [PATCH 2/2] Precise registration --- client/task_runner.go | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/client/task_runner.go b/client/task_runner.go index b32b9d310e0..83a29bb65d3 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -10,9 +10,11 @@ import ( "sync" "time" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/nomad/structs" + "github.com/mitchellh/hashstructure" cstructs "github.com/hashicorp/nomad/client/driver/structs" ) @@ -322,24 +324,24 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { } // Extract the task. - var task *structs.Task + var updatedTask *structs.Task for _, t := range tg.Tasks { if t.Name == r.task.Name { - task = t + updatedTask = t } } - if task == nil { + if updatedTask == nil { return fmt.Errorf("task group %q doesn't contain task %q", tg.Name, r.task.Name) } // Merge in the task resources - task.Resources = update.TaskResources[task.Name] - r.task = task + updatedTask.Resources = update.TaskResources[updatedTask.Name] // Update will update resources and store the new kill timeout. + var mErr multierror.Error if r.handle != nil { - if err := r.handle.Update(task); err != nil { - r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err) + if err := r.handle.Update(updatedTask); err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err)) } } @@ -348,9 +350,26 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { r.restartTracker.SetPolicy(tg.RestartPolicy) } - // Re-register the task to consul and store the updated alloc. + // Hash services returns the hash of the task's services + hashServices := func(task *structs.Task) uint64 { + h, err := hashstructure.Hash(task.Services, nil) + if err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("hashing services failed %#v: %v", task.Services, err)) + } + return h + } + + // Re-register the task to consul if any of the services have changed. + if hashServices(updatedTask) != hashServices(r.task) { + if err := r.consulService.Register(updatedTask, update); err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("updating services with consul failed: %v", err)) + } + } + + // Store the updated alloc. r.alloc = update - return r.consulService.Register(r.task, r.alloc) + r.task = updatedTask + return mErr.ErrorOrNil() } // Helper function for converting a WaitResult into a TaskTerminated event.