Skip to content

Commit

Permalink
consul: support canary tags for services
Browse files Browse the repository at this point in the history
Also refactor Consul ServiceClient to take a struct instead of a massive
set of arguments. Meant updating a lot of code but it should be far
easier to extend in the future as you will only need to update a single
struct instead of every single call site.

Adds an e2e test for canary tags.
  • Loading branch information
schmichael authored and Preetha Appan committed May 7, 2018
1 parent 1154ccc commit 17c6eb8
Show file tree
Hide file tree
Showing 9 changed files with 477 additions and 166 deletions.
9 changes: 3 additions & 6 deletions client/consul.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package client

import (
"github.com/hashicorp/nomad/client/driver"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
)

// ConsulServiceAPI is the interface the Nomad Client uses to register and
// remove services and checks from Consul.
type ConsulServiceAPI interface {
RegisterTask(allocID string, task *structs.Task, restarter consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error
RemoveTask(allocID string, task *structs.Task)
UpdateTask(allocID string, existing, newTask *structs.Task, restart consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error
RegisterTask(*consul.TaskServices) error
RemoveTask(*consul.TaskServices)
UpdateTask(old, newTask *consul.TaskServices) error
AllocRegistrations(allocID string) (*consul.AllocRegistration, error)
}
31 changes: 12 additions & 19 deletions client/consul_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,26 @@ import (
"log"
"sync"

"github.com/hashicorp/nomad/client/driver"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/go-testing-interface"
)

// mockConsulOp represents the register/deregister operations.
type mockConsulOp struct {
op string // add, remove, or update
allocID string
task *structs.Task
exec driver.ScriptExecutor
net *cstructs.DriverNetwork
task string
}

func newMockConsulOp(op, allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) mockConsulOp {
func newMockConsulOp(op, allocID, task string) mockConsulOp {
if op != "add" && op != "remove" && op != "update" && op != "alloc_registrations" {
panic(fmt.Errorf("invalid consul op: %s", op))
}
return mockConsulOp{
op: op,
allocID: allocID,
task: task,
exec: exec,
net: net,
}
}

Expand All @@ -56,34 +49,34 @@ func newMockConsulServiceClient(t testing.T) *mockConsulServiceClient {
return &m
}

func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.Task, restarter consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
func (m *mockConsulServiceClient) UpdateTask(old, new *consul.TaskServices) error {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %v, %v, %T, %x)", allocID, old, new, exec, net.Hash())
m.ops = append(m.ops, newMockConsulOp("update", allocID, new, exec, net))
m.logger.Printf("[TEST] mock_consul: UpdateTask(alloc: %s, task: %s)", new.AllocID[:6], new.Name)
m.ops = append(m.ops, newMockConsulOp("update", new.AllocID, new.Name))
return nil
}

func (m *mockConsulServiceClient) RegisterTask(allocID string, task *structs.Task, restarter consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
func (m *mockConsulServiceClient) RegisterTask(task *consul.TaskServices) error {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Printf("[TEST] mock_consul: RegisterTask(%q, %q, %T, %x)", allocID, task.Name, exec, net.Hash())
m.ops = append(m.ops, newMockConsulOp("add", allocID, task, exec, net))
m.logger.Printf("[TEST] mock_consul: RegisterTask(alloc: %s, task: %s)", task.AllocID, task.Name)
m.ops = append(m.ops, newMockConsulOp("add", task.AllocID, task.Name))
return nil
}

func (m *mockConsulServiceClient) RemoveTask(allocID string, task *structs.Task) {
func (m *mockConsulServiceClient) RemoveTask(task *consul.TaskServices) {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Printf("[TEST] mock_consul: RemoveTask(%q, %q)", allocID, task.Name)
m.ops = append(m.ops, newMockConsulOp("remove", allocID, task, nil, nil))
m.logger.Printf("[TEST] mock_consul: RemoveTask(%q, %q)", task.AllocID, task.Name)
m.ops = append(m.ops, newMockConsulOp("remove", task.AllocID, task.Name))
}

func (m *mockConsulServiceClient) AllocRegistrations(allocID string) (*consul.AllocRegistration, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Printf("[TEST] mock_consul: AllocRegistrations(%q)", allocID)
m.ops = append(m.ops, newMockConsulOp("alloc_registrations", allocID, nil, nil, nil))
m.ops = append(m.ops, newMockConsulOp("alloc_registrations", allocID, ""))

if m.allocRegistrationsFn != nil {
return m.allocRegistrationsFn(allocID)
Expand Down
24 changes: 17 additions & 7 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"

Expand Down Expand Up @@ -1219,7 +1220,8 @@ func (r *TaskRunner) run() {
// Remove from consul before killing the task so that traffic
// can be rerouted
interpTask := interpolateServices(r.envBuilder.Build(), r.task)
r.consul.RemoveTask(r.alloc.ID, interpTask)
taskServices := consul.NewTaskServices(r.alloc, interpTask, r, nil, nil)
r.consul.RemoveTask(taskServices)

// Delay actually killing the task if configured. See #244
if r.task.ShutdownDelay > 0 {
Expand Down Expand Up @@ -1275,7 +1277,8 @@ func (r *TaskRunner) run() {
func (r *TaskRunner) cleanup() {
// Remove from Consul
interpTask := interpolateServices(r.envBuilder.Build(), r.task)
r.consul.RemoveTask(r.alloc.ID, interpTask)
taskServices := consul.NewTaskServices(r.alloc, interpTask, r, nil, nil)
r.consul.RemoveTask(taskServices)

drv, err := r.createDriver()
if err != nil {
Expand Down Expand Up @@ -1339,7 +1342,8 @@ func (r *TaskRunner) shouldRestart() bool {

// Unregister from Consul while waiting to restart.
interpTask := interpolateServices(r.envBuilder.Build(), r.task)
r.consul.RemoveTask(r.alloc.ID, interpTask)
taskServices := consul.NewTaskServices(r.alloc, interpTask, r, nil, nil)
r.consul.RemoveTask(taskServices)

// Sleep but watch for destroy events.
select {
Expand Down Expand Up @@ -1498,7 +1502,8 @@ func (r *TaskRunner) registerServices(d driver.Driver, h driver.DriverHandle, n
exec = h
}
interpolatedTask := interpolateServices(r.envBuilder.Build(), r.task)
return r.consul.RegisterTask(r.alloc.ID, interpolatedTask, r, exec, n)
taskServices := consul.NewTaskServices(r.alloc, interpolatedTask, r, exec, n)
return r.consul.RegisterTask(taskServices)
}

// interpolateServices interpolates tags in a service and checks with values from the
Expand Down Expand Up @@ -1680,7 +1685,7 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {

// Update services in Consul
newInterpolatedTask := interpolateServices(r.envBuilder.Build(), updatedTask)
if err := r.updateServices(drv, r.handle, oldInterpolatedTask, newInterpolatedTask); err != nil {
if err := r.updateServices(drv, r.handle, r.alloc, oldInterpolatedTask, update, newInterpolatedTask); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("error updating services and checks in Consul: %v", err))
}
}
Expand All @@ -1698,7 +1703,10 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
}

// updateServices and checks with Consul. Tasks must be interpolated!
func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, oldTask, newTask *structs.Task) error {
func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor,
oldAlloc *structs.Allocation, oldTask *structs.Task,
newAlloc *structs.Allocation, newTask *structs.Task) error {

var exec driver.ScriptExecutor
if d.Abilities().Exec {
// Allow set the script executor if the driver supports it
Expand All @@ -1707,7 +1715,9 @@ func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, ol
r.driverNetLock.Lock()
net := r.driverNet.Copy()
r.driverNetLock.Unlock()
return r.consul.UpdateTask(r.alloc.ID, oldTask, newTask, r, exec, net)
oldTaskServices := consul.NewTaskServices(oldAlloc, oldTask, r, exec, net)
newTaskServices := consul.NewTaskServices(newAlloc, newTask, r, exec, net)
return r.consul.UpdateTask(oldTaskServices, newTaskServices)
}

// handleDestroy kills the task handle. In the case that killing fails,
Expand Down
Loading

0 comments on commit 17c6eb8

Please sign in to comment.