Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the consul service when the task/alloc changes #766

Merged
merged 2 commits into from
Feb 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,6 @@ func (r *AllocRunner) Run() {
continue
}

// Merge in the task resources
task.Resources = alloc.TaskResources[task.Name]
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
task, r.consulService)
r.tasks[task.Name] = tr
Expand All @@ -392,22 +390,6 @@ OUTER:
r.taskLock.RLock()
for _, task := range tg.Tasks {
tr := r.tasks[task.Name]

// Merge in the task resources
task.Resources = update.TaskResources[task.Name]
FOUND:
for _, updateGroup := range update.Job.TaskGroups {
if tg.Name != updateGroup.Name {
continue
}
for _, updateTask := range updateGroup.Tasks {
if updateTask.Name != task.Name {
continue
}
task.Services = updateTask.Services
break FOUND
}
}
tr.Update(update)
}
r.taskLock.RUnlock()
Expand Down
31 changes: 22 additions & 9 deletions client/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type ConsulService struct {

trackedTasks map[string]*trackedTask
serviceStates map[string]string
allocToService map[string][]string
trackedTskLock sync.Mutex
}

Expand Down Expand Up @@ -130,12 +131,13 @@ func NewConsulService(config *consulServiceConfig) (*ConsulService, error) {
}

consulService := ConsulService{
client: &consulApiClient{client: c},
logger: config.logger,
node: config.node,
trackedTasks: make(map[string]*trackedTask),
serviceStates: make(map[string]string),
shutdownCh: make(chan struct{}),
client: &consulApiClient{client: c},
logger: config.logger,
node: config.node,
trackedTasks: make(map[string]*trackedTask),
serviceStates: make(map[string]string),
allocToService: make(map[string][]string),
shutdownCh: make(chan struct{}),
}

return &consulService, nil
Expand All @@ -148,8 +150,18 @@ func (c *ConsulService) Register(task *structs.Task, alloc *structs.Allocation)
c.trackedTskLock.Lock()
tt := &trackedTask{task: task, alloc: alloc}
c.trackedTasks[fmt.Sprintf("%s-%s", alloc.ID, task.Name)] = tt

// Delete any previously registered service as the same alloc is being
// re-registered.
for _, service := range c.allocToService[alloc.ID] {
delete(c.serviceStates, service)
}
c.trackedTskLock.Unlock()

for _, service := range task.Services {
// Track the services this alloc is registering.
c.allocToService[alloc.ID] = append(c.allocToService[alloc.ID], service.Name)

c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name)
if err := c.registerService(service, task, alloc); err != nil {
mErr.Errors = append(mErr.Errors, err)
Expand All @@ -165,6 +177,7 @@ func (c *ConsulService) Deregister(task *structs.Task, alloc *structs.Allocation
var mErr multierror.Error
c.trackedTskLock.Lock()
delete(c.trackedTasks, fmt.Sprintf("%s-%s", alloc.ID, task.Name))
delete(c.allocToService, alloc.ID)
c.trackedTskLock.Unlock()
for _, service := range task.Services {
serviceID := alloc.Services[service.Name]
Expand Down Expand Up @@ -229,14 +242,14 @@ func (c *ConsulService) performSync() {
// Add new services which Consul agent isn't aware of
knownServices[serviceID] = struct{}{}
if _, ok := consulServices[serviceID]; !ok {
c.printLogMessage("[INFO] consul: registering service %s with consul.", service.Name)
c.printLogMessage("[INFO] consul: perform sync, registering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.alloc)
continue
}

// If a service has changed, re-register it with Consul agent
if service.Hash() != c.serviceStates[serviceID] {
c.printLogMessage("[INFO] consul: reregistering service %s with consul.", service.Name)
c.printLogMessage("[INFO] consul: perform sync hash change, reregistering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.alloc)
continue
}
Expand Down Expand Up @@ -268,7 +281,7 @@ func (c *ConsulService) performSync() {
for _, consulService := range consulServices {
if _, ok := knownServices[consulService.ID]; !ok {
delete(c.serviceStates, consulService.ID)
c.printLogMessage("[INFO] consul: deregistering service %v with consul", consulService.Service)
c.printLogMessage("[INFO] consul: perform sync, deregistering service %v with consul", consulService.Service)
c.deregisterService(consulService.ID)
}
}
Expand Down
46 changes: 33 additions & 13 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"sync"
"time"

"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/hashstructure"

cstructs "github.com/hashicorp/nomad/client/driver/structs"
)
Expand Down Expand Up @@ -54,6 +56,9 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
alloc *structs.Allocation, task *structs.Task,
consulService *ConsulService) *TaskRunner {

// Merge in the task resources
task.Resources = alloc.TaskResources[task.Name]

// Build the restart tracker.
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
Expand Down Expand Up @@ -319,21 +324,24 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
}

// Extract the task.
var task *structs.Task
var updatedTask *structs.Task
for _, t := range tg.Tasks {
if t.Name == r.task.Name {
task = t
updatedTask = t
}
}
if task == nil {
if updatedTask == nil {
return fmt.Errorf("task group %q doesn't contain task %q", tg.Name, r.task.Name)
}
r.task = task

// Merge in the task resources
updatedTask.Resources = update.TaskResources[updatedTask.Name]

// Update will update resources and store the new kill timeout.
var mErr multierror.Error
if r.handle != nil {
if err := r.handle.Update(task); err != nil {
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
if err := r.handle.Update(updatedTask); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err))
}
}

Expand All @@ -342,14 +350,26 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
r.restartTracker.SetPolicy(tg.RestartPolicy)
}

/* TODO
// Re-register the task to consul and store the updated alloc.
r.consulService.Deregister(r.task, r.alloc)
r.alloc = update
r.consulService.Register(r.task, r.alloc)
*/
// Hash services returns the hash of the task's services
hashServices := func(task *structs.Task) uint64 {
h, err := hashstructure.Hash(task.Services, nil)
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("hashing services failed %#v: %v", task.Services, err))
}
return h
}

return nil
// Re-register the task to consul if any of the services have changed.
if hashServices(updatedTask) != hashServices(r.task) {
if err := r.consulService.Register(updatedTask, update); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("updating services with consul failed: %v", err))
}
}

// Store the updated alloc.
r.alloc = update
r.task = updatedTask
return mErr.ErrorOrNil()
}

// Helper function for converting a WaitResult into a TaskTerminated event.
Expand Down