Skip to content

Commit

Permalink
Making the allocs hold service ids
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanu committed Dec 14, 2015
1 parent 985d6db commit d4d7572
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 50 deletions.
1 change: 1 addition & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Allocation struct {
TaskGroup string
Resources *Resources
TaskResources map[string]*Resources
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
DesiredDescription string
Expand Down
4 changes: 2 additions & 2 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (r *AllocRunner) RestoreState() error {
task := &structs.Task{Name: name}
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.tasks[name] = tr

Expand Down Expand Up @@ -324,7 +324,7 @@ func (r *AllocRunner) Run() {
task.Resources = alloc.TaskResources[task.Name]
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.tasks[task.Name] = tr
go tr.Run()
Expand Down
56 changes: 31 additions & 25 deletions client/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func (a *consulApiClient) Checks() (map[string]*consul.AgentCheck, error) {
// trackedTask is a Task that we are tracking for changes in service and check
// definitions and keep them sycned with Consul Agent
type trackedTask struct {
allocID string
task *structs.Task
task *structs.Task
alloc *structs.Allocation
}

// ConsulService is the service which tracks tasks and syncs the services and
Expand Down Expand Up @@ -143,15 +143,16 @@ func NewConsulService(config *consulServiceConfig) (*ConsulService, error) {

// Register starts tracking a task for changes to it's services and tasks and
// adds/removes services and checks associated with it.
func (c *ConsulService) Register(task *structs.Task, allocID string) error {
func (c *ConsulService) Register(task *structs.Task, alloc *structs.Allocation) error {
var mErr multierror.Error
c.trackedTskLock.Lock()
tt := &trackedTask{allocID: allocID, task: task}
c.trackedTasks[fmt.Sprintf("%s-%s", allocID, task.Name)] = tt
tt := &trackedTask{task: task, alloc: alloc}
c.trackedTasks[fmt.Sprintf("%s-%s", alloc.ID, task.Name)] = tt
c.trackedTskLock.Unlock()
for _, service := range task.Services {
c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name)
if err := c.registerService(service, task, allocID); err != nil {
if err := c.registerService(service, task, alloc); err != nil {
fmt.Printf("DIPTANU ERR %v\n", err)
mErr.Errors = append(mErr.Errors, err)
}
}
Expand All @@ -161,17 +162,18 @@ func (c *ConsulService) Register(task *structs.Task, allocID string) error {

// Deregister stops tracking a task for changes to it's services and checks and
// removes all the services and checks associated with the Task
func (c *ConsulService) Deregister(task *structs.Task, allocID string) error {
func (c *ConsulService) Deregister(task *structs.Task, alloc *structs.Allocation) error {
var mErr multierror.Error
c.trackedTskLock.Lock()
delete(c.trackedTasks, fmt.Sprintf("%s-%s", allocID, task.Name))
delete(c.trackedTasks, fmt.Sprintf("%s-%s", alloc.ID, task.Name))
c.trackedTskLock.Unlock()
for _, service := range task.Services {
if service.Id == "" {
serviceId := alloc.Services[service.Name]
if serviceId == "" {
continue
}
c.logger.Printf("[INFO] consul: deregistering service %v with consul", service.Name)
if err := c.deregisterService(service.Id); err != nil {
if err := c.deregisterService(serviceId); err != nil {
c.printLogMessage("[DEBUG] consul: error in deregistering service %v from consul", service.Name)
mErr.Errors = append(mErr.Errors, err)
}
Expand Down Expand Up @@ -223,28 +225,30 @@ func (c *ConsulService) performSync() {
// Add services and checks which Consul doesn't know about
for _, trackedTask := range c.trackedTasks {
for _, service := range trackedTask.task.Services {
serviceId := trackedTask.alloc.Services[service.Name]

// Add new services which Consul agent isn't aware of
knownServices[service.Id] = struct{}{}
if _, ok := consulServices[service.Id]; !ok {
knownServices[serviceId] = struct{}{}
if _, ok := consulServices[serviceId]; !ok {
c.printLogMessage("[INFO] consul: registering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.allocID)
c.registerService(service, trackedTask.task, trackedTask.alloc)
continue
}

// If a service has changed, re-register it with Consul agent
if service.Hash() != c.serviceStates[service.Id] {
if service.Hash() != c.serviceStates[serviceId] {
c.printLogMessage("[INFO] consul: reregistering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.allocID)
c.registerService(service, trackedTask.task, trackedTask.alloc)
continue
}

// Add new checks that Consul isn't aware of
for _, check := range service.Checks {
knownChecks[check.Id] = struct{}{}
if _, ok := consulChecks[check.Id]; !ok {
checkId := check.Hash(serviceId)
knownChecks[checkId] = struct{}{}
if _, ok := consulChecks[checkId]; !ok {
host, port := trackedTask.task.FindHostAndPortFor(service.PortLabel)
cr := c.makeCheck(service, check, host, port)
cr := c.makeCheck(serviceId, check, host, port)
c.registerCheck(cr)
}
}
Expand Down Expand Up @@ -276,16 +280,17 @@ func (c *ConsulService) performSync() {
}

// registerService registers a Service with Consul
func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, allocID string) error {
func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, alloc *structs.Allocation) error {
var mErr multierror.Error
host, port := task.FindHostAndPortFor(service.PortLabel)
if host == "" || port == 0 {
return fmt.Errorf("consul: the port:%q marked for registration of service: %q couldn't be found", service.PortLabel, service.Name)
}
c.serviceStates[service.Id] = service.Hash()
serviceId := alloc.Services[service.Name]
c.serviceStates[serviceId] = service.Hash()

asr := &consul.AgentServiceRegistration{
ID: service.Id,
ID: serviceId,
Name: service.Name,
Tags: service.Tags,
Port: port,
Expand All @@ -297,7 +302,7 @@ func (c *ConsulService) registerService(service *structs.Service, task *structs.
mErr.Errors = append(mErr.Errors, err)
}
for _, check := range service.Checks {
cr := c.makeCheck(service, check, host, port)
cr := c.makeCheck(serviceId, check, host, port)
if err := c.registerCheck(cr); err != nil {
c.printLogMessage("[DEBUG] consul: error while registerting check %v with consul: %v", check.Name, err)
mErr.Errors = append(mErr.Errors, err)
Expand Down Expand Up @@ -329,11 +334,12 @@ func (c *ConsulService) deregisterService(serviceId string) error {
}

// makeCheck creates a Consul Check Registration struct
func (c *ConsulService) makeCheck(service *structs.Service, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration {
func (c *ConsulService) makeCheck(serviceId string, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration {
checkId := check.Hash(serviceId)
cr := &consul.AgentCheckRegistration{
ID: check.Id,
ID: checkId,
Name: check.Name,
ServiceID: service.Id,
ServiceID: serviceId,
}
cr.Interval = check.Interval.String()
cr.Timeout = check.Timeout.String()
Expand Down
32 changes: 16 additions & 16 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type TaskRunner struct {
updater TaskStateUpdater
logger *log.Logger
ctx *driver.ExecContext
allocID string
alloc *structs.Allocation
restartTracker restartTracker
consulService *ConsulService

Expand Down Expand Up @@ -52,7 +52,7 @@ type TaskStateUpdater func(taskName string)
// NewTaskRunner is used to create a new task context
func NewTaskRunner(logger *log.Logger, config *config.Config,
updater TaskStateUpdater, ctx *driver.ExecContext,
allocID string, task *structs.Task, state *structs.TaskState,
alloc *structs.Allocation, task *structs.Task, state *structs.TaskState,
restartTracker restartTracker, consulService *ConsulService) *TaskRunner {

tc := &TaskRunner{
Expand All @@ -62,7 +62,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
restartTracker: restartTracker,
consulService: consulService,
ctx: ctx,
allocID: allocID,
alloc: alloc,
task: task,
state: state,
updateCh: make(chan *structs.Task, 8),
Expand All @@ -85,7 +85,7 @@ func (r *TaskRunner) stateFilePath() string {
dirName := fmt.Sprintf("task-%s", hashHex)

// Generate the path
path := filepath.Join(r.config.StateDir, "alloc", r.allocID,
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID,
dirName, "state.json")
return path
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func (r *TaskRunner) RestoreState() error {
// In the case it fails, we relaunch the task in the Run() method.
if err != nil {
r.logger.Printf("[ERR] client: failed to open handle to task '%s' for alloc '%s': %v",
r.task.Name, r.allocID, err)
r.task.Name, r.alloc.ID, err)
return nil
}
r.handle = handle
Expand Down Expand Up @@ -176,7 +176,7 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) {
driver, err := driver.NewDriver(r.task.Driver, driverCtx)
if err != nil {
err = fmt.Errorf("failed to create driver '%s' for alloc %s: %v",
r.task.Driver, r.allocID, err)
r.task.Driver, r.alloc.ID, err)
r.logger.Printf("[ERR] client: %s", err)
}
return driver, err
Expand All @@ -196,7 +196,7 @@ func (r *TaskRunner) startTask() error {
handle, err := driver.Start(r.ctx, r.task)
if err != nil {
r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s': %v",
r.task.Name, r.allocID, err)
r.task.Name, r.alloc.ID, err)
e := structs.NewTaskEvent(structs.TaskDriverFailure).
SetDriverError(fmt.Errorf("failed to start: %v", err))
r.setState(structs.TaskStateDead, e)
Expand All @@ -211,7 +211,7 @@ func (r *TaskRunner) startTask() error {
func (r *TaskRunner) Run() {
defer close(r.waitCh)
r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')",
r.task.Name, r.allocID)
r.task.Name, r.alloc.ID)

r.run()
return
Expand All @@ -234,10 +234,10 @@ func (r *TaskRunner) run() {
destroyed := false

// Register the services defined by the task with Consil
r.consulService.Register(r.task, r.allocID)
r.consulService.Register(r.task, r.alloc)

// De-Register the services belonging to the task from consul
defer r.consulService.Deregister(r.task, r.allocID)
defer r.consulService.Deregister(r.task, r.alloc)

OUTER:
// Wait for updates
Expand All @@ -249,7 +249,7 @@ func (r *TaskRunner) run() {
// Update
r.task = update
if err := r.handle.Update(update); err != nil {
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err)
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
}
case <-r.destroyCh:
// Avoid destroying twice
Expand All @@ -259,7 +259,7 @@ func (r *TaskRunner) run() {

// Send the kill signal, and use the WaitCh to block until complete
if err := r.handle.Kill(); err != nil {
r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err)
r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
destroyErr = err
}
destroyed = true
Expand All @@ -274,16 +274,16 @@ func (r *TaskRunner) run() {

// Log whether the task was successful or not.
if !waitRes.Successful() {
r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.allocID, waitRes)
r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, waitRes)
} else {
r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.allocID)
r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.alloc.ID)
}

// Check if we should restart. If not mark task as dead and exit.
shouldRestart, when := r.restartTracker.nextRestart(waitRes.ExitCode)
waitEvent := r.waitErrorToEvent(waitRes)
if !shouldRestart {
r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.allocID)
r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID)
r.setState(structs.TaskStateDead, waitEvent)
return
}
Expand Down Expand Up @@ -329,7 +329,7 @@ func (r *TaskRunner) Update(update *structs.Task) {
case r.updateCh <- update:
default:
r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')",
update.Name, r.allocID)
update.Name, r.alloc.ID)
}
}

Expand Down
23 changes: 16 additions & 7 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,6 @@ const (
// The ServiceCheck data model represents the consul health check that
// Nomad registers for a Task
type ServiceCheck struct {
Id string // Id of the check, must be unique and it is autogenrated
Name string // Name of the check, defaults to id
Type string // Type of the check - tcp, http, docker and script
Script string // Script to invoke for script check
Expand Down Expand Up @@ -1151,7 +1150,6 @@ const (

// The Service model represents a Consul service defintion
type Service struct {
Id string // Id of the service, this needs to be unique on a local machine
Name string // Name of the service, defaults to id
Tags []string // List of tags for the service
PortLabel string `mapstructure:"port"` // port for the service
Expand All @@ -1161,10 +1159,6 @@ type Service struct {
// InitFields interpolates values of Job, Task Group and Task in the Service
// Name. This also generates check names, service id and check ids.
func (s *Service) InitFields(job string, taskGroup string, task string) {
// We add a prefix to the Service ID so that we can know that this service
// is managed by Consul since Consul can also have service which are not
// managed by Nomad
s.Id = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID())
s.Name = args.ReplaceEnv(s.Name, map[string]string{
"JOB": job,
"TASKGROUP": taskGroup,
Expand All @@ -1174,7 +1168,6 @@ func (s *Service) InitFields(job string, taskGroup string, task string) {
)

for _, check := range s.Checks {
check.Id = check.Hash(s.Id)
if check.Name == "" {
check.Name = fmt.Sprintf("service: %q check", s.Name)
}
Expand Down Expand Up @@ -1451,6 +1444,9 @@ type Allocation struct {
// task. These should sum to the total Resources.
TaskResources map[string]*Resources

// Services is a map of service names and service ids
Services map[string]string

// Metrics associated with this allocation
Metrics *AllocMetric

Expand Down Expand Up @@ -1504,6 +1500,19 @@ func (a *Allocation) Stub() *AllocListStub {
}
}

func (a *Allocation) PopulateServiceIds() {
a.Services = make(map[string]string)
tg := a.Job.LookupTaskGroup(a.TaskGroup)
for _, task := range tg.Tasks {
for _, service := range task.Services {
// We add a prefix to the Service ID so that we can know that this service
// is managed by Consul since Consul can also have service which are not
// managed by Nomad
a.Services[service.Name] = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID())
}
}
}

// AllocListStub is used to return a subset of alloc information
type AllocListStub struct {
ID string
Expand Down
4 changes: 4 additions & 0 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
Metrics: s.ctx.Metrics(),
}

// Generate the service ids for the tasks which this allocation is going
// to run
alloc.PopulateServiceIds()

// Set fields based on if we found an allocation option
if option != nil {
alloc.NodeID = option.Node.ID
Expand Down
4 changes: 4 additions & 0 deletions scheduler/system_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
Metrics: s.ctx.Metrics(),
}

// Generate the service ids for the tasks that this allocation is going
// to run
alloc.PopulateServiceIds()

// Set fields based on if we found an allocation option
if option != nil {
alloc.NodeID = option.Node.ID
Expand Down

0 comments on commit d4d7572

Please sign in to comment.