Skip to content

Commit

Permalink
Merge pull request #583 from hashicorp/f-consul-service
Browse files Browse the repository at this point in the history
Making the allocs hold service ids
  • Loading branch information
diptanu committed Dec 15, 2015
2 parents a57cc6f + ed911ca commit d8b39f1
Show file tree
Hide file tree
Showing 15 changed files with 178 additions and 100 deletions.
1 change: 1 addition & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Allocation struct {
TaskGroup string
Resources *Resources
TaskResources map[string]*Resources
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
DesiredDescription string
Expand Down
4 changes: 2 additions & 2 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (r *AllocRunner) RestoreState() error {
task := &structs.Task{Name: name}
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.tasks[name] = tr

Expand Down Expand Up @@ -324,7 +324,7 @@ func (r *AllocRunner) Run() {
task.Resources = alloc.TaskResources[task.Name]
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.tasks[task.Name] = tr
go tr.Run()
Expand Down
71 changes: 38 additions & 33 deletions client/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func (a *consulApiClient) ServiceRegister(service *consul.AgentServiceRegistrati
return a.client.Agent().ServiceRegister(service)
}

func (a *consulApiClient) ServiceDeregister(serviceId string) error {
return a.client.Agent().ServiceDeregister(serviceId)
func (a *consulApiClient) ServiceDeregister(serviceID string) error {
return a.client.Agent().ServiceDeregister(serviceID)
}

func (a *consulApiClient) Services() (map[string]*consul.AgentService, error) {
Expand All @@ -62,8 +62,8 @@ func (a *consulApiClient) Checks() (map[string]*consul.AgentCheck, error) {
// trackedTask is a Task that we are tracking for changes in service and check
// definitions and keep them sycned with Consul Agent
type trackedTask struct {
allocID string
task *structs.Task
task *structs.Task
alloc *structs.Allocation
}

// ConsulService is the service which tracks tasks and syncs the services and
Expand Down Expand Up @@ -143,15 +143,15 @@ func NewConsulService(config *consulServiceConfig) (*ConsulService, error) {

// Register starts tracking a task for changes to it's services and tasks and
// adds/removes services and checks associated with it.
func (c *ConsulService) Register(task *structs.Task, allocID string) error {
func (c *ConsulService) Register(task *structs.Task, alloc *structs.Allocation) error {
var mErr multierror.Error
c.trackedTskLock.Lock()
tt := &trackedTask{allocID: allocID, task: task}
c.trackedTasks[fmt.Sprintf("%s-%s", allocID, task.Name)] = tt
tt := &trackedTask{task: task, alloc: alloc}
c.trackedTasks[fmt.Sprintf("%s-%s", alloc.ID, task.Name)] = tt
c.trackedTskLock.Unlock()
for _, service := range task.Services {
c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name)
if err := c.registerService(service, task, allocID); err != nil {
if err := c.registerService(service, task, alloc); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
Expand All @@ -161,17 +161,18 @@ func (c *ConsulService) Register(task *structs.Task, allocID string) error {

// Deregister stops tracking a task for changes to it's services and checks and
// removes all the services and checks associated with the Task
func (c *ConsulService) Deregister(task *structs.Task, allocID string) error {
func (c *ConsulService) Deregister(task *structs.Task, alloc *structs.Allocation) error {
var mErr multierror.Error
c.trackedTskLock.Lock()
delete(c.trackedTasks, fmt.Sprintf("%s-%s", allocID, task.Name))
delete(c.trackedTasks, fmt.Sprintf("%s-%s", alloc.ID, task.Name))
c.trackedTskLock.Unlock()
for _, service := range task.Services {
if service.Id == "" {
serviceID := alloc.Services[service.Name]
if serviceID == "" {
continue
}
c.logger.Printf("[INFO] consul: deregistering service %v with consul", service.Name)
if err := c.deregisterService(service.Id); err != nil {
if err := c.deregisterService(serviceID); err != nil {
c.printLogMessage("[DEBUG] consul: error in deregistering service %v from consul", service.Name)
mErr.Errors = append(mErr.Errors, err)
}
Expand Down Expand Up @@ -223,38 +224,40 @@ func (c *ConsulService) performSync() {
// Add services and checks which Consul doesn't know about
for _, trackedTask := range c.trackedTasks {
for _, service := range trackedTask.task.Services {
serviceID := trackedTask.alloc.Services[service.Name]

// Add new services which Consul agent isn't aware of
knownServices[service.Id] = struct{}{}
if _, ok := consulServices[service.Id]; !ok {
knownServices[serviceID] = struct{}{}
if _, ok := consulServices[serviceID]; !ok {
c.printLogMessage("[INFO] consul: registering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.allocID)
c.registerService(service, trackedTask.task, trackedTask.alloc)
continue
}

// If a service has changed, re-register it with Consul agent
if service.Hash() != c.serviceStates[service.Id] {
if service.Hash() != c.serviceStates[serviceID] {
c.printLogMessage("[INFO] consul: reregistering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.allocID)
c.registerService(service, trackedTask.task, trackedTask.alloc)
continue
}

// Add new checks that Consul isn't aware of
for _, check := range service.Checks {
knownChecks[check.Id] = struct{}{}
if _, ok := consulChecks[check.Id]; !ok {
checkID := check.Hash(serviceID)
knownChecks[checkID] = struct{}{}
if _, ok := consulChecks[checkID]; !ok {
host, port := trackedTask.task.FindHostAndPortFor(service.PortLabel)
cr := c.makeCheck(service, check, host, port)
cr := c.makeCheck(serviceID, check, host, port)
c.registerCheck(cr)
}
}
}
}

// Remove services from the service tracker which no longer exists
for serviceId := range c.serviceStates {
if _, ok := knownServices[serviceId]; !ok {
delete(c.serviceStates, serviceId)
for serviceID := range c.serviceStates {
if _, ok := knownServices[serviceID]; !ok {
delete(c.serviceStates, serviceID)
}
}

Expand All @@ -276,16 +279,17 @@ func (c *ConsulService) performSync() {
}

// registerService registers a Service with Consul
func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, allocID string) error {
func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, alloc *structs.Allocation) error {
var mErr multierror.Error
host, port := task.FindHostAndPortFor(service.PortLabel)
if host == "" || port == 0 {
return fmt.Errorf("consul: the port:%q marked for registration of service: %q couldn't be found", service.PortLabel, service.Name)
}
c.serviceStates[service.Id] = service.Hash()
serviceID := alloc.Services[service.Name]
c.serviceStates[serviceID] = service.Hash()

asr := &consul.AgentServiceRegistration{
ID: service.Id,
ID: serviceID,
Name: service.Name,
Tags: service.Tags,
Port: port,
Expand All @@ -297,7 +301,7 @@ func (c *ConsulService) registerService(service *structs.Service, task *structs.
mErr.Errors = append(mErr.Errors, err)
}
for _, check := range service.Checks {
cr := c.makeCheck(service, check, host, port)
cr := c.makeCheck(serviceID, check, host, port)
if err := c.registerCheck(cr); err != nil {
c.printLogMessage("[DEBUG] consul: error while registerting check %v with consul: %v", check.Name, err)
mErr.Errors = append(mErr.Errors, err)
Expand All @@ -320,20 +324,21 @@ func (c *ConsulService) deregisterCheck(checkID string) error {
}

// deregisterService de-registers a Service with a specific id from Consul
func (c *ConsulService) deregisterService(serviceId string) error {
delete(c.serviceStates, serviceId)
if err := c.client.ServiceDeregister(serviceId); err != nil {
func (c *ConsulService) deregisterService(serviceID string) error {
delete(c.serviceStates, serviceID)
if err := c.client.ServiceDeregister(serviceID); err != nil {
return err
}
return nil
}

// makeCheck creates a Consul Check Registration struct
func (c *ConsulService) makeCheck(service *structs.Service, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration {
func (c *ConsulService) makeCheck(serviceID string, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration {
checkID := check.Hash(serviceID)
cr := &consul.AgentCheckRegistration{
ID: check.Id,
ID: checkID,
Name: check.Name,
ServiceID: service.Id,
ServiceID: serviceID,
}
cr.Interval = check.Interval.String()
cr.Timeout = check.Timeout.String()
Expand Down
32 changes: 16 additions & 16 deletions client/consul_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package client

import (
"fmt"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"log"
"os"
Expand Down Expand Up @@ -32,7 +34,7 @@ func (a *mockConsulApiClient) ServiceRegister(service *consul.AgentServiceRegist
return nil
}

func (a *mockConsulApiClient) ServiceDeregister(serviceId string) error {
func (a *mockConsulApiClient) ServiceDeregister(serviceID string) error {
a.serviceDeregisterCallCount += 1
return nil
}
Expand Down Expand Up @@ -70,7 +72,6 @@ func newTask() *structs.Task {

func TestConsul_MakeChecks(t *testing.T) {
service := &structs.Service{
Id: "Foo",
Name: "Bar",
Checks: []*structs.ServiceCheck{
{
Expand All @@ -95,10 +96,11 @@ func TestConsul_MakeChecks(t *testing.T) {
}

c := newConsulService()
serviceID := fmt.Sprintf("%s-1234", structs.NomadConsulPrefix)

check1 := c.makeCheck(service, service.Checks[0], "10.10.0.1", 8090)
check2 := c.makeCheck(service, service.Checks[1], "10.10.0.1", 8090)
check3 := c.makeCheck(service, service.Checks[2], "10.10.0.1", 8090)
check1 := c.makeCheck(serviceID, service.Checks[0], "10.10.0.1", 8090)
check2 := c.makeCheck(serviceID, service.Checks[1], "10.10.0.1", 8090)
check3 := c.makeCheck(serviceID, service.Checks[2], "10.10.0.1", 8090)

if check1.HTTP != "http://10.10.0.1:8090/foo/bar" {
t.Fatalf("Invalid http url for check: %v", check1.HTTP)
Expand Down Expand Up @@ -142,15 +144,14 @@ func TestConsul_InvalidPortLabelForService(t *testing.T) {
},
}
service := &structs.Service{
Id: "service-id",
Name: "foo",
Tags: []string{"a", "b"},
PortLabel: "https",
Checks: make([]*structs.ServiceCheck, 0),
}

c := newConsulService()
if err := c.registerService(service, task, "allocid"); err == nil {
if err := c.registerService(service, task, mock.Alloc()); err == nil {
t.Fatalf("Service should be invalid")
}
}
Expand All @@ -175,7 +176,7 @@ func TestConsul_Services_Deleted_From_Task(t *testing.T) {
},
},
}
c.Register(&task, "1")
c.Register(&task, mock.Alloc())
if len(c.serviceStates) != 1 {
t.Fatalf("Expected tracked services: %v, Actual: %v", 1, len(c.serviceStates))
}
Expand All @@ -191,13 +192,14 @@ func TestConsul_Service_Should_Be_Re_Reregistered_On_Change(t *testing.T) {
c := newConsulService()
task := newTask()
s1 := structs.Service{
Id: "1-example-cache-redis",
Name: "example-cache-redis",
Tags: []string{"global"},
PortLabel: "db",
}
task.Services = append(task.Services, &s1)
c.Register(task, "1")
alloc := mock.Alloc()
serviceID := alloc.Services[s1.Name]
c.Register(task, alloc)

s1.Tags = []string{"frontcache"}

Expand All @@ -207,8 +209,8 @@ func TestConsul_Service_Should_Be_Re_Reregistered_On_Change(t *testing.T) {
t.Fatal("We should be tracking one service")
}

if c.serviceStates[s1.Id] != s1.Hash() {
t.Fatalf("Hash is %v, expected %v", c.serviceStates[s1.Id], s1.Hash())
if c.serviceStates[serviceID] != s1.Hash() {
t.Fatalf("Hash is %v, expected %v", c.serviceStates[serviceID], s1.Hash())
}
}

Expand All @@ -219,14 +221,13 @@ func TestConsul_AddCheck_To_Service(t *testing.T) {
task := newTask()
var checks []*structs.ServiceCheck
s1 := structs.Service{
Id: "1-example-cache-redis",
Name: "example-cache-redis",
Tags: []string{"global"},
PortLabel: "db",
Checks: checks,
}
task.Services = append(task.Services, &s1)
c.Register(task, "1")
c.Register(task, mock.Alloc())

check1 := structs.ServiceCheck{
Name: "alive",
Expand All @@ -250,14 +251,13 @@ func TestConsul_ModifyCheck(t *testing.T) {
task := newTask()
var checks []*structs.ServiceCheck
s1 := structs.Service{
Id: "1-example-cache-redis",
Name: "example-cache-redis",
Tags: []string{"global"},
PortLabel: "db",
Checks: checks,
}
task.Services = append(task.Services, &s1)
c.Register(task, "1")
c.Register(task, mock.Alloc())

check1 := structs.ServiceCheck{
Name: "alive",
Expand Down
Loading

0 comments on commit d8b39f1

Please sign in to comment.