From 17c6eb8629ba946d4f090e7b313e8c35973ae6a0 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 23 Apr 2018 16:34:53 -0700 Subject: [PATCH] consul: support canary tags for services Also refactor Consul ServiceClient to take a struct instead of a massive set of arguments. Meant updating a lot of code but it should be far easier to extend in the future as you will only need to update a single struct instead of every single call site. Adds an e2e test for canary tags. --- client/consul.go | 9 +- client/consul_testing.go | 31 ++--- client/task_runner.go | 24 +++- command/agent/consul/client.go | 84 ++++++----- command/agent/consul/structs.go | 67 +++++++++ command/agent/consul/unit_test.go | 224 +++++++++++++++++------------- e2e/consul/canary_tags_test.go | 161 +++++++++++++++++++++ e2e/consul/input/canary_tags.hcl | 36 +++++ nomad/structs/structs.go | 7 +- 9 files changed, 477 insertions(+), 166 deletions(-) create mode 100644 command/agent/consul/structs.go create mode 100644 e2e/consul/canary_tags_test.go create mode 100644 e2e/consul/input/canary_tags.hcl diff --git a/client/consul.go b/client/consul.go index 02e40ef0f09..58f75e6f8c5 100644 --- a/client/consul.go +++ b/client/consul.go @@ -1,17 +1,14 @@ package client import ( - "github.com/hashicorp/nomad/client/driver" - cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/command/agent/consul" - "github.com/hashicorp/nomad/nomad/structs" ) // ConsulServiceAPI is the interface the Nomad Client uses to register and // remove services and checks from Consul. type ConsulServiceAPI interface { - RegisterTask(allocID string, task *structs.Task, restarter consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error - RemoveTask(allocID string, task *structs.Task) - UpdateTask(allocID string, existing, newTask *structs.Task, restart consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error + RegisterTask(*consul.TaskServices) error + RemoveTask(*consul.TaskServices) + UpdateTask(old, newTask *consul.TaskServices) error AllocRegistrations(allocID string) (*consul.AllocRegistration, error) } diff --git a/client/consul_testing.go b/client/consul_testing.go index 4a2d2631bc6..1db5d4ccfa7 100644 --- a/client/consul_testing.go +++ b/client/consul_testing.go @@ -5,11 +5,8 @@ import ( "log" "sync" - "github.com/hashicorp/nomad/client/driver" - cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/testlog" - "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/go-testing-interface" ) @@ -17,12 +14,10 @@ import ( type mockConsulOp struct { op string // add, remove, or update allocID string - task *structs.Task - exec driver.ScriptExecutor - net *cstructs.DriverNetwork + task string } -func newMockConsulOp(op, allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) mockConsulOp { +func newMockConsulOp(op, allocID, task string) mockConsulOp { if op != "add" && op != "remove" && op != "update" && op != "alloc_registrations" { panic(fmt.Errorf("invalid consul op: %s", op)) } @@ -30,8 +25,6 @@ func newMockConsulOp(op, allocID string, task *structs.Task, exec driver.ScriptE op: op, allocID: allocID, task: task, - exec: exec, - net: net, } } @@ -56,34 +49,34 @@ func newMockConsulServiceClient(t testing.T) *mockConsulServiceClient { return &m } -func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.Task, restarter consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { +func (m *mockConsulServiceClient) UpdateTask(old, new *consul.TaskServices) error { m.mu.Lock() defer m.mu.Unlock() - m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %v, %v, %T, %x)", allocID, old, new, exec, net.Hash()) - m.ops = append(m.ops, newMockConsulOp("update", allocID, new, exec, net)) + m.logger.Printf("[TEST] mock_consul: UpdateTask(alloc: %s, task: %s)", new.AllocID[:6], new.Name) + m.ops = append(m.ops, newMockConsulOp("update", new.AllocID, new.Name)) return nil } -func (m *mockConsulServiceClient) RegisterTask(allocID string, task *structs.Task, restarter consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { +func (m *mockConsulServiceClient) RegisterTask(task *consul.TaskServices) error { m.mu.Lock() defer m.mu.Unlock() - m.logger.Printf("[TEST] mock_consul: RegisterTask(%q, %q, %T, %x)", allocID, task.Name, exec, net.Hash()) - m.ops = append(m.ops, newMockConsulOp("add", allocID, task, exec, net)) + m.logger.Printf("[TEST] mock_consul: RegisterTask(alloc: %s, task: %s)", task.AllocID, task.Name) + m.ops = append(m.ops, newMockConsulOp("add", task.AllocID, task.Name)) return nil } -func (m *mockConsulServiceClient) RemoveTask(allocID string, task *structs.Task) { +func (m *mockConsulServiceClient) RemoveTask(task *consul.TaskServices) { m.mu.Lock() defer m.mu.Unlock() - m.logger.Printf("[TEST] mock_consul: RemoveTask(%q, %q)", allocID, task.Name) - m.ops = append(m.ops, newMockConsulOp("remove", allocID, task, nil, nil)) + m.logger.Printf("[TEST] mock_consul: RemoveTask(%q, %q)", task.AllocID, task.Name) + m.ops = append(m.ops, newMockConsulOp("remove", task.AllocID, task.Name)) } func (m *mockConsulServiceClient) AllocRegistrations(allocID string) (*consul.AllocRegistration, error) { m.mu.Lock() defer m.mu.Unlock() m.logger.Printf("[TEST] mock_consul: AllocRegistrations(%q)", allocID) - m.ops = append(m.ops, newMockConsulOp("alloc_registrations", allocID, nil, nil, nil)) + m.ops = append(m.ops, newMockConsulOp("alloc_registrations", allocID, "")) if m.allocRegistrationsFn != nil { return m.allocRegistrationsFn(allocID) diff --git a/client/task_runner.go b/client/task_runner.go index 0e3af11e69b..0973c07ac2d 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/client/vaultclient" + "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/structs" "github.com/ugorji/go/codec" @@ -1219,7 +1220,8 @@ func (r *TaskRunner) run() { // Remove from consul before killing the task so that traffic // can be rerouted interpTask := interpolateServices(r.envBuilder.Build(), r.task) - r.consul.RemoveTask(r.alloc.ID, interpTask) + taskServices := consul.NewTaskServices(r.alloc, interpTask, r, nil, nil) + r.consul.RemoveTask(taskServices) // Delay actually killing the task if configured. See #244 if r.task.ShutdownDelay > 0 { @@ -1275,7 +1277,8 @@ func (r *TaskRunner) run() { func (r *TaskRunner) cleanup() { // Remove from Consul interpTask := interpolateServices(r.envBuilder.Build(), r.task) - r.consul.RemoveTask(r.alloc.ID, interpTask) + taskServices := consul.NewTaskServices(r.alloc, interpTask, r, nil, nil) + r.consul.RemoveTask(taskServices) drv, err := r.createDriver() if err != nil { @@ -1339,7 +1342,8 @@ func (r *TaskRunner) shouldRestart() bool { // Unregister from Consul while waiting to restart. interpTask := interpolateServices(r.envBuilder.Build(), r.task) - r.consul.RemoveTask(r.alloc.ID, interpTask) + taskServices := consul.NewTaskServices(r.alloc, interpTask, r, nil, nil) + r.consul.RemoveTask(taskServices) // Sleep but watch for destroy events. select { @@ -1498,7 +1502,8 @@ func (r *TaskRunner) registerServices(d driver.Driver, h driver.DriverHandle, n exec = h } interpolatedTask := interpolateServices(r.envBuilder.Build(), r.task) - return r.consul.RegisterTask(r.alloc.ID, interpolatedTask, r, exec, n) + taskServices := consul.NewTaskServices(r.alloc, interpolatedTask, r, exec, n) + return r.consul.RegisterTask(taskServices) } // interpolateServices interpolates tags in a service and checks with values from the @@ -1680,7 +1685,7 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { // Update services in Consul newInterpolatedTask := interpolateServices(r.envBuilder.Build(), updatedTask) - if err := r.updateServices(drv, r.handle, oldInterpolatedTask, newInterpolatedTask); err != nil { + if err := r.updateServices(drv, r.handle, r.alloc, oldInterpolatedTask, update, newInterpolatedTask); err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("error updating services and checks in Consul: %v", err)) } } @@ -1698,7 +1703,10 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { } // updateServices and checks with Consul. Tasks must be interpolated! -func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, oldTask, newTask *structs.Task) error { +func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, + oldAlloc *structs.Allocation, oldTask *structs.Task, + newAlloc *structs.Allocation, newTask *structs.Task) error { + var exec driver.ScriptExecutor if d.Abilities().Exec { // Allow set the script executor if the driver supports it @@ -1707,7 +1715,9 @@ func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, ol r.driverNetLock.Lock() net := r.driverNet.Copy() r.driverNetLock.Unlock() - return r.consul.UpdateTask(r.alloc.ID, oldTask, newTask, r, exec, net) + oldTaskServices := consul.NewTaskServices(oldAlloc, oldTask, r, exec, net) + newTaskServices := consul.NewTaskServices(newAlloc, newTask, r, exec, net) + return r.consul.UpdateTask(oldTaskServices, newTaskServices) } // handleDestroy kills the task handle. In the case that killing fails, diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 660a2e876de..4c849812843 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -14,7 +14,6 @@ import ( metrics "github.com/armon/go-metrics" "github.com/hashicorp/consul/api" - "github.com/hashicorp/nomad/client/driver" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" @@ -603,11 +602,11 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) // serviceRegs creates service registrations, check registrations, and script // checks from a service. It returns a service registration object with the // service and check IDs populated. -func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *structs.Service, - task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) (*ServiceRegistration, error) { +func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, task *TaskServices) ( + *ServiceRegistration, error) { // Get the services ID - id := makeTaskServiceID(allocID, task.Name, service) + id := makeTaskServiceID(task.AllocID, task.Name, service, task.Canary) sreg := &ServiceRegistration{ serviceID: id, checkIDs: make(map[string]struct{}, len(service.Checks)), @@ -620,26 +619,33 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st } // Determine the address to advertise based on the mode - ip, port, err := getAddress(addrMode, service.PortLabel, task.Resources.Networks, net) + ip, port, err := getAddress(addrMode, service.PortLabel, task.Networks, task.DriverNetwork) if err != nil { return nil, fmt.Errorf("unable to get address for service %q: %v", service.Name, err) } + // Determine whether to use tags or canary_tags + var tags []string + if task.Canary { + tags = make([]string, len(service.CanaryTags)) + copy(tags, service.CanaryTags) + } else { + tags = make([]string, len(service.Tags)) + copy(tags, service.Tags) + } + // Build the Consul Service registration request serviceReg := &api.AgentServiceRegistration{ ID: id, Name: service.Name, - Tags: make([]string, len(service.Tags)), + Tags: tags, Address: ip, Port: port, } - // copy isn't strictly necessary but can avoid bugs especially - // with tests that may reuse Tasks - copy(serviceReg.Tags, service.Tags) ops.regServices = append(ops.regServices, serviceReg) // Build the check registrations - checkIDs, err := c.checkRegs(ops, allocID, id, service, task, exec, net) + checkIDs, err := c.checkRegs(ops, id, service, task) if err != nil { return nil, err } @@ -651,8 +657,8 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st // checkRegs registers the checks for the given service and returns the // registered check ids. -func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, service *structs.Service, - task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) ([]string, error) { +func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *structs.Service, + task *TaskServices) ([]string, error) { // Fast path numChecks := len(service.Checks) @@ -665,11 +671,13 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se checkID := makeCheckID(serviceID, check) checkIDs = append(checkIDs, checkID) if check.Type == structs.ServiceCheckScript { - if exec == nil { + if task.DriverExec == nil { return nil, fmt.Errorf("driver doesn't support script checks") } - ops.scripts = append(ops.scripts, newScriptCheck( - allocID, task.Name, checkID, check, exec, c.client, c.logger, c.shutdownCh)) + + sc := newScriptCheck(task.AllocID, task.Name, checkID, check, task.DriverExec, + c.client, c.logger, c.shutdownCh) + ops.scripts = append(ops.scripts, sc) // Skip getAddress for script checks checkReg, err := createCheckReg(serviceID, checkID, check, "", 0) @@ -693,7 +701,7 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se addrMode = structs.AddressModeHost } - ip, port, err := getAddress(addrMode, portLabel, task.Resources.Networks, net) + ip, port, err := getAddress(addrMode, portLabel, task.Networks, task.DriverNetwork) if err != nil { return nil, fmt.Errorf("error getting address for check %q: %v", check.Name, err) } @@ -714,7 +722,7 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se // Checks will always use the IP from the Task struct (host's IP). // // Actual communication with Consul is done asynchronously (see Run). -func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, restarter TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { +func (c *ServiceClient) RegisterTask(task *TaskServices) error { // Fast path numServices := len(task.Services) if numServices == 0 { @@ -726,7 +734,7 @@ func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, restart ops := &operations{} for _, service := range task.Services { - sreg, err := c.serviceRegs(ops, allocID, service, task, exec, net) + sreg, err := c.serviceRegs(ops, service, task) if err != nil { return err } @@ -734,18 +742,18 @@ func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, restart } // Add the task to the allocation's registration - c.addTaskRegistration(allocID, task.Name, t) + c.addTaskRegistration(task.AllocID, task.Name, t) c.commit(ops) // Start watching checks. Done after service registrations are built // since an error building them could leak watches. for _, service := range task.Services { - serviceID := makeTaskServiceID(allocID, task.Name, service) + serviceID := makeTaskServiceID(task.AllocID, task.Name, service, task.Canary) for _, check := range service.Checks { if check.TriggersRestarts() { checkID := makeCheckID(serviceID, check) - c.checkWatcher.Watch(allocID, task.Name, checkID, check, restarter) + c.checkWatcher.Watch(task.AllocID, task.Name, checkID, check, task.Restarter) } } } @@ -756,19 +764,19 @@ func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, restart // changed. // // DriverNetwork must not change between invocations for the same allocation. -func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Task, restarter TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { +func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { ops := &operations{} taskReg := new(TaskRegistration) taskReg.Services = make(map[string]*ServiceRegistration, len(newTask.Services)) - existingIDs := make(map[string]*structs.Service, len(existing.Services)) - for _, s := range existing.Services { - existingIDs[makeTaskServiceID(allocID, existing.Name, s)] = s + existingIDs := make(map[string]*structs.Service, len(old.Services)) + for _, s := range old.Services { + existingIDs[makeTaskServiceID(old.AllocID, old.Name, s, old.Canary)] = s } newIDs := make(map[string]*structs.Service, len(newTask.Services)) for _, s := range newTask.Services { - newIDs[makeTaskServiceID(allocID, newTask.Name, s)] = s + newIDs[makeTaskServiceID(newTask.AllocID, newTask.Name, s, newTask.Canary)] = s } // Loop over existing Service IDs to see if they have been removed or @@ -816,7 +824,7 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta } // New check on an unchanged service; add them now - newCheckIDs, err := c.checkRegs(ops, allocID, existingID, newSvc, newTask, exec, net) + newCheckIDs, err := c.checkRegs(ops, existingID, newSvc, newTask) if err != nil { return err } @@ -828,7 +836,7 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta // Update all watched checks as CheckRestart fields aren't part of ID if check.TriggersRestarts() { - c.checkWatcher.Watch(allocID, newTask.Name, checkID, check, restarter) + c.checkWatcher.Watch(newTask.AllocID, newTask.Name, checkID, check, newTask.Restarter) } } @@ -845,7 +853,7 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta // Any remaining services should just be enqueued directly for _, newSvc := range newIDs { - sreg, err := c.serviceRegs(ops, allocID, newSvc, newTask, exec, net) + sreg, err := c.serviceRegs(ops, newSvc, newTask) if err != nil { return err } @@ -854,18 +862,18 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta } // Add the task to the allocation's registration - c.addTaskRegistration(allocID, newTask.Name, taskReg) + c.addTaskRegistration(newTask.AllocID, newTask.Name, taskReg) c.commit(ops) // Start watching checks. Done after service registrations are built // since an error building them could leak watches. for _, service := range newIDs { - serviceID := makeTaskServiceID(allocID, newTask.Name, service) + serviceID := makeTaskServiceID(newTask.AllocID, newTask.Name, service, newTask.Canary) for _, check := range service.Checks { if check.TriggersRestarts() { checkID := makeCheckID(serviceID, check) - c.checkWatcher.Watch(allocID, newTask.Name, checkID, check, restarter) + c.checkWatcher.Watch(newTask.AllocID, newTask.Name, checkID, check, newTask.Restarter) } } } @@ -875,11 +883,11 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta // RemoveTask from Consul. Removes all service entries and checks. // // Actual communication with Consul is done asynchronously (see Run). -func (c *ServiceClient) RemoveTask(allocID string, task *structs.Task) { +func (c *ServiceClient) RemoveTask(task *TaskServices) { ops := operations{} for _, service := range task.Services { - id := makeTaskServiceID(allocID, task.Name, service) + id := makeTaskServiceID(task.AllocID, task.Name, service, task.Canary) ops.deregServices = append(ops.deregServices, id) for _, check := range service.Checks { @@ -893,7 +901,7 @@ func (c *ServiceClient) RemoveTask(allocID string, task *structs.Task) { } // Remove the task from the alloc's registrations - c.removeTaskRegistration(allocID, task.Name) + c.removeTaskRegistration(task.AllocID, task.Name) // Now add them to the deregistration fields; main Run loop will update c.commit(&ops) @@ -1037,7 +1045,7 @@ func (c *ServiceClient) removeTaskRegistration(allocID, taskName string) { // Example Client ID: _nomad-client-ggnjpgl7yn7rgmvxzilmpvrzzvrszc7l // func makeAgentServiceID(role string, service *structs.Service) string { - return fmt.Sprintf("%s-%s-%s", nomadServicePrefix, role, service.Hash(role, "")) + return fmt.Sprintf("%s-%s-%s", nomadServicePrefix, role, service.Hash(role, "", false)) } // makeTaskServiceID creates a unique ID for identifying a task service in @@ -1045,8 +1053,8 @@ func makeAgentServiceID(role string, service *structs.Service) string { // Checks. This allows updates to merely compare IDs. // // Example Service ID: _nomad-task-TNM333JKJPM5AK4FAS3VXQLXFDWOF4VH -func makeTaskServiceID(allocID, taskName string, service *structs.Service) string { - return nomadTaskPrefix + service.Hash(allocID, taskName) +func makeTaskServiceID(allocID, taskName string, service *structs.Service, canary bool) string { + return nomadTaskPrefix + service.Hash(allocID, taskName, canary) } // makeCheckID creates a unique ID for a check. diff --git a/command/agent/consul/structs.go b/command/agent/consul/structs.go new file mode 100644 index 00000000000..2b61448ecf5 --- /dev/null +++ b/command/agent/consul/structs.go @@ -0,0 +1,67 @@ +package consul + +import ( + "github.com/hashicorp/nomad/client/driver" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/structs" +) + +type TaskServices struct { + AllocID string + + // Name of the task + Name string + + // Canary indicates whether or not the allocation is a canary + Canary bool + + // Restarter allows restarting the task depending on the task's + // check_restart stanzas. + Restarter TaskRestarter + + // Services and checks to register for the task. + Services []*structs.Service + + // Networks from the task's resources stanza. + Networks structs.Networks + + // DriverExec is the script executor for the task's driver. + DriverExec driver.ScriptExecutor + + // DriverNetwork is the network specified by the driver and may be nil. + DriverNetwork *cstructs.DriverNetwork +} + +func NewTaskServices(alloc *structs.Allocation, task *structs.Task, restarter TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) *TaskServices { + ts := TaskServices{ + AllocID: alloc.ID, + Name: task.Name, + Restarter: restarter, + Services: task.Services, + DriverExec: exec, + DriverNetwork: net, + } + + if task.Resources != nil { + ts.Networks = task.Resources.Networks + } + + if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Canary { + ts.Canary = true + } + + return &ts +} + +// Copy method for easing tests +func (t *TaskServices) Copy() *TaskServices { + newTS := new(TaskServices) + *newTS = *t + + // Deep copy Services + newTS.Services = make([]*structs.Service, len(t.Services)) + for i := range t.Services { + newTS.Services[i] = t.Services[i].Copy() + } + return newTS +} diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index 37157fcdcdd..b89d07c3dae 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -11,7 +11,9 @@ import ( "github.com/hashicorp/consul/api" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/kr/pretty" @@ -25,19 +27,11 @@ const ( yPort = 1235 ) -func testTask() *structs.Task { - return &structs.Task{ - Name: "taskname", - Resources: &structs.Resources{ - Networks: []*structs.NetworkResource{ - { - DynamicPorts: []structs.Port{ - {Label: "x", Value: xPort}, - {Label: "y", Value: yPort}, - }, - }, - }, - }, +func testTask() *TaskServices { + return &TaskServices{ + AllocID: uuid.Generate(), + Name: "taskname", + Restarter: &restartRecorder{}, Services: []*structs.Service{ { Name: "taskname-service", @@ -45,27 +39,21 @@ func testTask() *structs.Task { Tags: []string{"tag1", "tag2"}, }, }, + Networks: []*structs.NetworkResource{ + { + DynamicPorts: []structs.Port{ + {Label: "x", Value: xPort}, + {Label: "y", Value: yPort}, + }, + }, + }, + DriverExec: newMockExec(), } } -// restartRecorder is a minimal TaskRestarter implementation that simply -// counts how many restarts were triggered. -type restartRecorder struct { - restarts int64 -} - -func (r *restartRecorder) Restart(source, reason string, failure bool) { - atomic.AddInt64(&r.restarts, 1) -} - -// testFakeCtx contains a fake Consul AgentAPI and implements the Exec -// interface to allow testing without running Consul. -type testFakeCtx struct { - ServiceClient *ServiceClient - FakeConsul *MockAgent - Task *structs.Task - Restarter *restartRecorder - +// mockExec implements the ScriptExecutor interface and will use an alternate +// implementation t.ExecFunc if non-nil. +type mockExec struct { // Ticked whenever a script is called execs chan int @@ -73,18 +61,40 @@ type testFakeCtx struct { ExecFunc func(ctx context.Context, cmd string, args []string) ([]byte, int, error) } -// Exec implements the ScriptExecutor interface and will use an alternate -// implementation t.ExecFunc if non-nil. -func (t *testFakeCtx) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) { +func newMockExec() *mockExec { + return &mockExec{ + execs: make(chan int, 100), + } +} + +func (m *mockExec) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) { select { - case t.execs <- 1: + case m.execs <- 1: default: } - if t.ExecFunc == nil { + if m.ExecFunc == nil { // Default impl is just "ok" return []byte("ok"), 0, nil } - return t.ExecFunc(ctx, cmd, args) + return m.ExecFunc(ctx, cmd, args) +} + +// restartRecorder is a minimal TaskRestarter implementation that simply +// counts how many restarts were triggered. +type restartRecorder struct { + restarts int64 +} + +func (r *restartRecorder) Restart(source, reason string, failure bool) { + atomic.AddInt64(&r.restarts, 1) +} + +// testFakeCtx contains a fake Consul AgentAPI +type testFakeCtx struct { + ServiceClient *ServiceClient + FakeConsul *MockAgent + Task *TaskServices + MockExec *mockExec } var errNoOps = fmt.Errorf("testing error: no pending operations") @@ -105,20 +115,19 @@ func (t *testFakeCtx) syncOnce() error { // A test Task is also provided. func setupFake(t *testing.T) *testFakeCtx { fc := NewMockAgent() + tt := testTask() return &testFakeCtx{ ServiceClient: NewServiceClient(fc, testlog.Logger(t)), FakeConsul: fc, - Task: testTask(), - Restarter: &restartRecorder{}, - execs: make(chan int, 100), + Task: tt, + MockExec: tt.DriverExec.(*mockExec), } } func TestConsul_ChangeTags(t *testing.T) { ctx := setupFake(t) - allocID := "allocid" - if err := ctx.ServiceClient.RegisterTask(allocID, ctx.Task, ctx.Restarter, nil, nil); err != nil { + if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -132,7 +141,7 @@ func TestConsul_ChangeTags(t *testing.T) { // Query the allocs registrations and then again when we update. The IDs // should change - reg1, err := ctx.ServiceClient.AllocRegistrations(allocID) + reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID) if err != nil { t.Fatalf("Looking up alloc registration failed: %v", err) } @@ -157,10 +166,9 @@ func TestConsul_ChangeTags(t *testing.T) { } } - origTask := ctx.Task - ctx.Task = testTask() + origTask := ctx.Task.Copy() ctx.Task.Services[0].Tags[0] = "newtag" - if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, nil, nil, nil); err != nil { + if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } if err := ctx.syncOnce(); err != nil { @@ -184,7 +192,7 @@ func TestConsul_ChangeTags(t *testing.T) { } // Check again and ensure the IDs changed - reg2, err := ctx.ServiceClient.AllocRegistrations(allocID) + reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID) if err != nil { t.Fatalf("Looking up alloc registration failed: %v", err) } @@ -242,7 +250,7 @@ func TestConsul_ChangePorts(t *testing.T) { }, } - if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, nil); err != nil { + if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -285,8 +293,8 @@ func TestConsul_ChangePorts(t *testing.T) { case "c2": origScriptKey = k select { - case <-ctx.execs: - if n := len(ctx.execs); n > 0 { + case <-ctx.MockExec.execs: + if n := len(ctx.MockExec.execs); n > 0 { t.Errorf("expected 1 exec but found: %d", n+1) } case <-time.After(3 * time.Second): @@ -303,8 +311,7 @@ func TestConsul_ChangePorts(t *testing.T) { } // Now update the PortLabel on the Service and Check c3 - origTask := ctx.Task - ctx.Task = testTask() + origTask := ctx.Task.Copy() ctx.Task.Services[0].PortLabel = "y" ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ { @@ -330,7 +337,7 @@ func TestConsul_ChangePorts(t *testing.T) { // Removed PortLabel; should default to service's (y) }, } - if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, nil, ctx, nil); err != nil { + if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } if err := ctx.syncOnce(); err != nil { @@ -374,8 +381,8 @@ func TestConsul_ChangePorts(t *testing.T) { t.Errorf("expected key change for %s from %q", v.Name, origScriptKey) } select { - case <-ctx.execs: - if n := len(ctx.execs); n > 0 { + case <-ctx.MockExec.execs: + if n := len(ctx.MockExec.execs); n > 0 { t.Errorf("expected 1 exec but found: %d", n+1) } case <-time.After(3 * time.Second): @@ -411,8 +418,7 @@ func TestConsul_ChangeChecks(t *testing.T) { }, } - allocID := "allocid" - if err := ctx.ServiceClient.RegisterTask(allocID, ctx.Task, ctx.Restarter, ctx, nil); err != nil { + if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -433,7 +439,7 @@ func TestConsul_ChangeChecks(t *testing.T) { // Query the allocs registrations and then again when we update. The IDs // should change - reg1, err := ctx.ServiceClient.AllocRegistrations(allocID) + reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID) if err != nil { t.Fatalf("Looking up alloc registration failed: %v", err) } @@ -489,7 +495,7 @@ func TestConsul_ChangeChecks(t *testing.T) { PortLabel: "x", }, } - if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, nil, ctx, nil); err != nil { + if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -547,7 +553,7 @@ func TestConsul_ChangeChecks(t *testing.T) { } // Check again and ensure the IDs changed - reg2, err := ctx.ServiceClient.AllocRegistrations(allocID) + reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID) if err != nil { t.Fatalf("Looking up alloc registration failed: %v", err) } @@ -603,7 +609,7 @@ func TestConsul_ChangeChecks(t *testing.T) { PortLabel: "x", }, } - if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, nil, ctx, nil); err != nil { + if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } if err := ctx.syncOnce(); err != nil { @@ -646,7 +652,7 @@ func TestConsul_RegServices(t *testing.T) { }, } - if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, nil, nil); err != nil { + if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -677,7 +683,7 @@ func TestConsul_RegServices(t *testing.T) { // Assert the check update is properly formed checkUpd := <-ctx.ServiceClient.checkWatcher.checkUpdateCh - if checkUpd.checkRestart.allocID != "allocid" { + if checkUpd.checkRestart.allocID != ctx.Task.AllocID { t.Fatalf("expected check's allocid to be %q but found %q", "allocid", checkUpd.checkRestart.allocID) } if expected := 200 * time.Millisecond; checkUpd.checkRestart.timeLimit != expected { @@ -687,7 +693,7 @@ func TestConsul_RegServices(t *testing.T) { // Make a change which will register a new service ctx.Task.Services[0].Name = "taskname-service2" ctx.Task.Services[0].Tags[0] = "tag3" - if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, nil, nil); err != nil { + if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -737,7 +743,7 @@ func TestConsul_RegServices(t *testing.T) { } // Remove the new task - ctx.ServiceClient.RemoveTask("allocid", ctx.Task) + ctx.ServiceClient.RemoveTask(ctx.Task) if err := ctx.syncOnce(); err != nil { t.Fatalf("unexpected error syncing task: %v", err) } @@ -787,7 +793,7 @@ func TestConsul_ShutdownOK(t *testing.T) { go ctx.ServiceClient.Run() // Register a task and agent - if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, nil); err != nil { + if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -849,7 +855,7 @@ func TestConsul_ShutdownSlow(t *testing.T) { // Make Exec slow, but not too slow waiter := make(chan struct{}) - ctx.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) { + ctx.MockExec.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) { select { case <-waiter: default: @@ -865,7 +871,7 @@ func TestConsul_ShutdownSlow(t *testing.T) { go ctx.ServiceClient.Run() // Register a task and agent - if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, nil); err != nil { + if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -924,7 +930,7 @@ func TestConsul_ShutdownBlocked(t *testing.T) { // Make Exec block forever waiter := make(chan struct{}) - ctx.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) { + ctx.MockExec.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) { close(waiter) <-block return []byte{}, 0, nil @@ -936,7 +942,7 @@ func TestConsul_ShutdownBlocked(t *testing.T) { go ctx.ServiceClient.Run() // Register a task and agent - if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, nil); err != nil { + if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -988,7 +994,7 @@ func TestConsul_CancelScript(t *testing.T) { }, } - if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, nil); err != nil { + if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -1007,7 +1013,7 @@ func TestConsul_CancelScript(t *testing.T) { for i := 0; i < 2; i++ { select { - case <-ctx.execs: + case <-ctx.MockExec.execs: // Script ran as expected! case <-time.After(3 * time.Second): t.Fatalf("timed out waiting for script check to run") @@ -1025,7 +1031,7 @@ func TestConsul_CancelScript(t *testing.T) { }, } - if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx.Restarter, ctx, nil); err != nil { + if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -1044,7 +1050,7 @@ func TestConsul_CancelScript(t *testing.T) { // Make sure exec wasn't called again select { - case <-ctx.execs: + case <-ctx.MockExec.execs: t.Errorf("unexpected execution of script; was goroutine not cancelled?") case <-time.After(100 * time.Millisecond): // No unexpected script execs @@ -1104,7 +1110,7 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) { }, } - net := &cstructs.DriverNetwork{ + ctx.Task.DriverNetwork = &cstructs.DriverNetwork{ PortMap: map[string]int{ "x": 8888, "y": 9999, @@ -1113,7 +1119,7 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) { AutoAdvertise: true, } - if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, net); err != nil { + if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -1129,9 +1135,9 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) { switch v.Name { case ctx.Task.Services[0].Name: // x // Since DriverNetwork.AutoAdvertise=true, driver ports should be used - if v.Port != net.PortMap["x"] { + if v.Port != ctx.Task.DriverNetwork.PortMap["x"] { t.Errorf("expected service %s's port to be %d but found %d", - v.Name, net.PortMap["x"], v.Port) + v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port) } // The order of checks in Consul is not guaranteed to // be the same as their order in the Task definition, @@ -1159,13 +1165,13 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) { } case ctx.Task.Services[1].Name: // y // Service should be container ip:port - if v.Address != net.IP { + if v.Address != ctx.Task.DriverNetwork.IP { t.Errorf("expected service %s's address to be %s but found %s", - v.Name, net.IP, v.Address) + v.Name, ctx.Task.DriverNetwork.IP, v.Address) } - if v.Port != net.PortMap["y"] { + if v.Port != ctx.Task.DriverNetwork.PortMap["y"] { t.Errorf("expected service %s's port to be %d but found %d", - v.Name, net.PortMap["x"], v.Port) + v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port) } // Check should be host ip:port if v.Checks[0].TCP != ":1235" { // yPort @@ -1208,7 +1214,7 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) { }, } - net := &cstructs.DriverNetwork{ + ctx.Task.DriverNetwork = &cstructs.DriverNetwork{ PortMap: map[string]int{ "x": 8888, "y": 9999, @@ -1217,7 +1223,7 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) { AutoAdvertise: false, } - if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, net); err != nil { + if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -1239,13 +1245,13 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) { } case ctx.Task.Services[1].Name: // y + driver mode // Service should be container ip:port - if v.Address != net.IP { + if v.Address != ctx.Task.DriverNetwork.IP { t.Errorf("expected service %s's address to be %s but found %s", - v.Name, net.IP, v.Address) + v.Name, ctx.Task.DriverNetwork.IP, v.Address) } - if v.Port != net.PortMap["y"] { + if v.Port != ctx.Task.DriverNetwork.PortMap["y"] { t.Errorf("expected service %s's port to be %d but found %d", - v.Name, net.PortMap["x"], v.Port) + v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port) } case ctx.Task.Services[2].Name: // y + host mode if v.Port != yPort { @@ -1272,7 +1278,7 @@ func TestConsul_DriverNetwork_Change(t *testing.T) { }, } - net := &cstructs.DriverNetwork{ + ctx.Task.DriverNetwork = &cstructs.DriverNetwork{ PortMap: map[string]int{ "x": 8888, "y": 9999, @@ -1304,31 +1310,59 @@ func TestConsul_DriverNetwork_Change(t *testing.T) { } // Initial service should advertise host port x - if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, ctx, net); err != nil { + if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { t.Fatalf("unexpected error registering task: %v", err) } syncAndAssertPort(xPort) // UpdateTask to use Host (shouldn't change anything) - orig := ctx.Task.Copy() + origTask := ctx.Task.Copy() ctx.Task.Services[0].AddressMode = structs.AddressModeHost - if err := ctx.ServiceClient.UpdateTask("allocid", orig, ctx.Task, ctx.Restarter, ctx, net); err != nil { + if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil { t.Fatalf("unexpected error updating task: %v", err) } syncAndAssertPort(xPort) // UpdateTask to use Driver (*should* change IP and port) - orig = ctx.Task.Copy() + origTask = ctx.Task.Copy() ctx.Task.Services[0].AddressMode = structs.AddressModeDriver - if err := ctx.ServiceClient.UpdateTask("allocid", orig, ctx.Task, ctx.Restarter, ctx, net); err != nil { + if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil { t.Fatalf("unexpected error updating task: %v", err) } - syncAndAssertPort(net.PortMap["x"]) + syncAndAssertPort(ctx.Task.DriverNetwork.PortMap["x"]) +} + +// TestConsul_CanaryTags asserts CanaryTags are used when Canary=true +func TestConsul_CanaryTags(t *testing.T) { + t.Parallel() + require := require.New(t) + ctx := setupFake(t) + + canaryTags := []string{"tag1", "canary"} + ctx.Task.Canary = true + ctx.Task.Services[0].CanaryTags = canaryTags + + require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task)) + require.NoError(ctx.syncOnce()) + require.Len(ctx.FakeConsul.services, 1) + for _, service := range ctx.FakeConsul.services { + require.Equal(canaryTags, service.Tags) + } + + // Disable canary and assert tags are not the canary tags + origTask := ctx.Task.Copy() + ctx.Task.Canary = false + require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task)) + require.NoError(ctx.syncOnce()) + require.Len(ctx.FakeConsul.services, 1) + for _, service := range ctx.FakeConsul.services { + require.NotEqual(canaryTags, service.Tags) + } } // TestConsul_PeriodicSync asserts that Nomad periodically reconciles with diff --git a/e2e/consul/canary_tags_test.go b/e2e/consul/canary_tags_test.go new file mode 100644 index 00000000000..3b6e953c52e --- /dev/null +++ b/e2e/consul/canary_tags_test.go @@ -0,0 +1,161 @@ +package consul_test + +import ( + "flag" + "testing" + "time" + + consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/jobspec" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var integration = flag.Bool("integration", false, "run integration tests") + +func TestConsul(t *testing.T) { + if !*integration { + t.Skip("skipping test in non-integration mode.") + } + RegisterFailHandler(Fail) + RunSpecs(t, "Consul Canary Tags Test") +} + +var _ = Describe("Consul Canary Tags Test", func() { + + var ( + agent *consulapi.Agent + allocations *api.Allocations + deployments *api.Deployments + jobs *api.Jobs + system *api.System + job *api.Job + specFile string + ) + + BeforeSuite(func() { + consulConf := consulapi.DefaultConfig() + consulClient, err := consulapi.NewClient(consulConf) + Expect(err).ShouldNot(HaveOccurred()) + agent = consulClient.Agent() + + conf := api.DefaultConfig() + client, err := api.NewClient(conf) + Expect(err).ShouldNot(HaveOccurred()) + allocations = client.Allocations() + deployments = client.Deployments() + jobs = client.Jobs() + system = client.System() + }) + + JustBeforeEach(func() { + var err error + job, err = jobspec.ParseFile(specFile) + Expect(err).ShouldNot(HaveOccurred()) + job.ID = helper.StringToPtr(*job.ID + uuid.Generate()[22:]) + resp, _, err := jobs.Register(job, nil) + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.EvalID).ShouldNot(BeEmpty()) + }) + + AfterEach(func() { + jobs.Deregister(*job.ID, true, nil) + system.GarbageCollect() + }) + + Describe("Consul Canary Tags Test", func() { + Context("Canary Tags", func() { + BeforeEach(func() { + specFile = "input/canary_tags.hcl" + }) + + It("Should set and unset canary tags", func() { + + // Eventually be running and healthy + Eventually(func() []string { + deploys, _, err := jobs.Deployments(*job.ID, nil) + Expect(err).ShouldNot(HaveOccurred()) + healthyDeploys := make([]string, 0, len(deploys)) + for _, d := range deploys { + if d.Status == "successful" { + healthyDeploys = append(healthyDeploys, d.ID) + } + } + return healthyDeploys + }, 5*time.Second, 20*time.Millisecond).Should(HaveLen(1)) + + // Start a deployment + job.Meta = map[string]string{"version": "2"} + resp, _, err := jobs.Register(job, nil) + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.EvalID).ShouldNot(BeEmpty()) + + // Eventually have a canary + var deploys []*api.Deployment + Eventually(func() []*api.Deployment { + deploys, _, err = jobs.Deployments(*job.ID, nil) + Expect(err).ShouldNot(HaveOccurred()) + return deploys + }, 2*time.Second, 20*time.Millisecond).Should(HaveLen(2)) + + var deploy *api.Deployment + Eventually(func() []string { + deploy, _, err = deployments.Info(deploys[0].ID, nil) + Expect(err).ShouldNot(HaveOccurred()) + return deploy.TaskGroups["consul_canary_test"].PlacedCanaries + }, 2*time.Second, 20*time.Millisecond).Should(HaveLen(1)) + + Eventually(func() bool { + allocID := deploy.TaskGroups["consul_canary_test"].PlacedCanaries[0] + alloc, _, err := allocations.Info(allocID, nil) + Expect(err).ShouldNot(HaveOccurred()) + return alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Healthy != nil && *alloc.DeploymentStatus.Healthy + }, 3*time.Second, 20*time.Millisecond).Should(BeTrue()) + + // Check Consul for canary tags + Eventually(func() []string { + services, err := agent.Services() + Expect(err).ShouldNot(HaveOccurred()) + for _, v := range services { + if v.Service == "canarytest" { + return v.Tags + } + } + return nil + }, 2*time.Second, 20*time.Millisecond).Should( + Equal([]string{"foo", "canary"})) + + // Manually promote + { + resp, _, err := deployments.PromoteAll(deploys[0].ID, nil) + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.EvalID).ShouldNot(BeEmpty()) + } + + // Eventually canary is removed + Eventually(func() bool { + allocID := deploy.TaskGroups["consul_canary_test"].PlacedCanaries[0] + alloc, _, err := allocations.Info(allocID, nil) + Expect(err).ShouldNot(HaveOccurred()) + return alloc.DeploymentStatus.Canary + }, 2*time.Second, 20*time.Millisecond).Should(BeFalse()) + + // Check Consul canary tags were removed + Eventually(func() []string { + services, err := agent.Services() + Expect(err).ShouldNot(HaveOccurred()) + for _, v := range services { + if v.Service == "canarytest" { + return v.Tags + } + } + return nil + }, 2*time.Second, 20*time.Millisecond).Should( + Equal([]string{"foo", "bar"})) + }) + }) + }) +}) diff --git a/e2e/consul/input/canary_tags.hcl b/e2e/consul/input/canary_tags.hcl new file mode 100644 index 00000000000..da98db49087 --- /dev/null +++ b/e2e/consul/input/canary_tags.hcl @@ -0,0 +1,36 @@ +job "consul_canary_test" { + datacenters = ["dc1"] + + group "consul_canary_test" { + count = 2 + + task "consul_canary_test" { + driver = "mock_driver" + + config { + run_for = "10m" + exit_code = 9 + } + + service { + name = "canarytest" + tags = ["foo", "bar"] + canary_tags = ["foo", "canary"] + } + } + + update { + max_parallel = 1 + canary = 1 + min_healthy_time = "1s" + health_check = "task_states" + auto_revert = false + } + + restart { + attempts = 0 + delay = "0s" + mode = "fail" + } + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5100ea20eba..bc105ac1e30 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3853,7 +3853,7 @@ func (s *Service) ValidateName(name string) error { // Hash returns a base32 encoded hash of a Service's contents excluding checks // as they're hashed independently. -func (s *Service) Hash(allocID, taskName string) string { +func (s *Service) Hash(allocID, taskName string, canary bool) string { h := sha1.New() io.WriteString(h, allocID) io.WriteString(h, taskName) @@ -3867,6 +3867,11 @@ func (s *Service) Hash(allocID, taskName string) string { io.WriteString(h, tag) } + // Vary ID on whether or not CanaryTags will be used + if canary { + h.Write([]byte{'1'}) + } + // Base32 is used for encoding the hash as sha1 hashes can always be // encoded without padding, only 4 bytes larger than base64, and saves // 8 bytes vs hex. Since these hashes are used in Consul URLs it's nice