Skip to content

Commit

Permalink
Merge pull request #7106 from hashicorp/f-ctag-override
Browse files Browse the repository at this point in the history
client: enable configuring enable_tag_override for services
  • Loading branch information
shoenig authored Feb 13, 2020
2 parents 3a01ad4 + 6bfd86b commit 1ced8ba
Show file tree
Hide file tree
Showing 16 changed files with 654 additions and 183 deletions.
23 changes: 12 additions & 11 deletions api/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,18 @@ type ServiceCheck struct {
// Service represents a Consul service definition.
type Service struct {
//FIXME Id is unused. Remove?
Id string
Name string
Tags []string
CanaryTags []string `mapstructure:"canary_tags"`
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Checks []ServiceCheck
CheckRestart *CheckRestart `mapstructure:"check_restart"`
Connect *ConsulConnect
Meta map[string]string
CanaryMeta map[string]string
Id string
Name string
Tags []string
CanaryTags []string `mapstructure:"canary_tags"`
EnableTagOverride bool `mapstructure:"enable_tag_override"`
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Checks []ServiceCheck
CheckRestart *CheckRestart `mapstructure:"check_restart"`
Connect *ConsulConnect
Meta map[string]string
CanaryMeta map[string]string
}

// Canonicalize the Service by ensuring its name and address mode are set. Task
Expand Down
25 changes: 25 additions & 0 deletions api/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestService_CheckRestart asserts Service.CheckRestart settings are properly
// inherited by Checks.
func TestService_CheckRestart(t *testing.T) {
t.Parallel()

job := &Job{Name: stringToPtr("job")}
tg := &TaskGroup{Name: stringToPtr("group")}
task := &Task{Name: "task"}
Expand Down Expand Up @@ -58,6 +61,8 @@ func TestService_CheckRestart(t *testing.T) {
// TestService_Connect asserts Service.Connect settings are properly
// inherited by Checks.
func TestService_Connect(t *testing.T) {
t.Parallel()

job := &Job{Name: stringToPtr("job")}
tg := &TaskGroup{Name: stringToPtr("group")}
task := &Task{Name: "task"}
Expand All @@ -83,3 +88,23 @@ func TestService_Connect(t *testing.T) {
assert.Equal(t, proxy.Upstreams[0].DestinationName, "upstream")
assert.Equal(t, proxy.LocalServicePort, 8000)
}

func TestService_Tags(t *testing.T) {
t.Parallel()
r := require.New(t)

// canonicalize does not modify eto or tags
job := &Job{Name: stringToPtr("job")}
tg := &TaskGroup{Name: stringToPtr("group")}
task := &Task{Name: "task"}
service := &Service{
Tags: []string{"a", "b"},
CanaryTags: []string{"c", "d"},
EnableTagOverride: true,
}

service.Canonicalize(task, tg, job)
r.True(service.EnableTagOverride)
r.Equal([]string{"a", "b"}, service.Tags)
r.Equal([]string{"c", "d"}, service.CanaryTags)
}
1 change: 1 addition & 0 deletions client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (h *groupServiceHook) Prerun() error {
func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
h.mu.Lock()
defer h.mu.Unlock()

oldWorkloadServices := h.getWorkloadServices()

// Store new updated values out of request
Expand Down
14 changes: 14 additions & 0 deletions command/agent/consul/catalog_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,17 @@ func (c *MockAgent) UpdateTTL(id string, output string, status string) error {
c.checkTTLs[id]++
return nil
}

// a convenience method for looking up a registered service by name
func (c *MockAgent) lookupService(name string) []*api.AgentServiceRegistration {
c.mu.Lock()
defer c.mu.Unlock()

var services []*api.AgentServiceRegistration
for _, service := range c.services {
if service.Name == name {
services = append(services, service)
}
}
return services
}
106 changes: 84 additions & 22 deletions command/agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,56 @@ type ACLsAPI interface {
TokenList(q *api.QueryOptions) ([]*api.ACLTokenListEntry, *api.QueryMeta, error)
}

func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.AgentService) bool {
return !(reg.Kind == svc.Kind &&
reg.ID == svc.ID &&
reg.Port == svc.Port &&
reg.Address == svc.Address &&
reg.Name == svc.Service &&
reflect.DeepEqual(reg.Tags, svc.Tags) &&
reflect.DeepEqual(reg.Meta, svc.Meta))
// agentServiceUpdateRequired checks if any critical fields in Nomad's version
// of a service definition are different from the existing service definition as
// known by Consul.
func agentServiceUpdateRequired(reason syncReason, wanted *api.AgentServiceRegistration, existing *api.AgentService) bool {
switch reason {
case syncPeriodic:
// In a periodic sync with Consul, we need to respect the value of
// the enable_tag_override field so that we maintain the illusion that the
// user is in control of the Consul tags, as they may be externally edited
// via the Consul catalog API (e.g. a user manually sets them).
//
// As Consul does by disabling anti-entropy for the tags field, Nomad will
// ignore differences in the tags field during the periodic syncs with
// the Consul agent API.
//
// We do so by over-writing the nomad service registration by the value
// of the tags that Consul contains, if enable_tag_override = true.
maybeTweakTags(wanted, existing)
return different(wanted, existing)

default:
// A non-periodic sync with Consul indicates an operation has been set
// on the queue. This happens when service has been added / removed / modified
// and implies the Consul agent should be sync'd with nomad, because
// nomad is the ultimate source of truth for the service definition.
return different(wanted, existing)
}
}

// maybeTweakTags will override wanted.Tags with a copy of existing.Tags only if
// EnableTagOverride is true. Otherwise the wanted service registration is left
// unchanged.
func maybeTweakTags(wanted *api.AgentServiceRegistration, existing *api.AgentService) {
if wanted.EnableTagOverride {
wanted.Tags = helper.CopySliceString(existing.Tags)
}
}

// different compares the wanted state of the service registration with the actual
// (cached) state of the service registration reported by Consul. If any of the
// critical fields are not deeply equal, they considered different.
func different(wanted *api.AgentServiceRegistration, existing *api.AgentService) bool {
return !(wanted.Kind == existing.Kind &&
wanted.ID == existing.ID &&
wanted.Port == existing.Port &&
wanted.Address == existing.Address &&
wanted.Name == existing.Service &&
wanted.EnableTagOverride == existing.EnableTagOverride &&
reflect.DeepEqual(wanted.Meta, existing.Meta) &&
reflect.DeepEqual(wanted.Tags, existing.Tags))
}

// operations are submitted to the main loop via commit() for synchronizing
Expand Down Expand Up @@ -320,6 +362,18 @@ func (c *ServiceClient) hasSeen() bool {
return atomic.LoadInt32(&c.seen) == seen
}

// syncReason indicates why a sync operation with consul is about to happen.
//
// The trigger for a sync may have implications on the behavior of the sync itself.
// In particular, if a service is defined with enable_tag_override=true
type syncReason byte

const (
syncPeriodic = iota
syncShutdown
syncNewOps
)

// Run the Consul main loop which retries operations against Consul. It should
// be called exactly once.
func (c *ServiceClient) Run() {
Expand Down Expand Up @@ -357,16 +411,23 @@ INIT:

failures := 0
for {
// On every iteration take note of what the trigger for the next sync
// was, so that it may be referenced during the sync itself.
var reasonForSync syncReason

select {
case <-retryTimer.C:
reasonForSync = syncPeriodic
case <-c.shutdownCh:
reasonForSync = syncShutdown
// Cancel check watcher but sync one last time
cancel()
case ops := <-c.opCh:
reasonForSync = syncNewOps
c.merge(ops)
}

if err := c.sync(); err != nil {
if err := c.sync(reasonForSync); err != nil {
if failures == 0 {
// Log on the first failure
c.logger.Warn("failed to update services in Consul", "error", err)
Expand Down Expand Up @@ -460,7 +521,7 @@ func (c *ServiceClient) merge(ops *operations) {
}

// sync enqueued operations.
func (c *ServiceClient) sync() error {
func (c *ServiceClient) sync(reason syncReason) error {
sreg, creg, sdereg, cdereg := 0, 0, 0, 0

consulServices, err := c.client.Services()
Expand Down Expand Up @@ -518,20 +579,20 @@ func (c *ServiceClient) sync() error {
}

// Add Nomad services missing from Consul, or where the service has been updated.
for id, locals := range c.services {
for id, local := range c.services {
existingSvc, ok := consulServices[id]

if ok {
// There is an existing registration of this service in Consul, so here
// we validate to see if the service has been invalidated to see if it
// should be updated.
if !agentServiceUpdateRequired(locals, existingSvc) {
if !agentServiceUpdateRequired(reason, local, existingSvc) {
// No Need to update services that have not changed
continue
}
}

if err = c.client.ServiceRegister(locals); err != nil {
if err = c.client.ServiceRegister(local); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}
Expand Down Expand Up @@ -746,13 +807,14 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w

// Build the Consul Service registration request
serviceReg := &api.AgentServiceRegistration{
ID: id,
Name: service.Name,
Tags: tags,
Address: ip,
Port: port,
Meta: meta,
Connect: connect, // will be nil if no Connect stanza
ID: id,
Name: service.Name,
Tags: tags,
EnableTagOverride: service.EnableTagOverride,
Address: ip,
Port: port,
Meta: meta,
Connect: connect, // will be nil if no Connect stanza
}
ops.regServices = append(ops.regServices, serviceReg)

Expand Down Expand Up @@ -868,8 +930,7 @@ func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error {
//
// DriverNetwork must not change between invocations for the same allocation.
func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error {
ops := &operations{}

ops := new(operations)
regs := new(ServiceRegistrations)
regs.Services = make(map[string]*ServiceRegistration, len(newWorkload.Services))

Expand Down Expand Up @@ -984,6 +1045,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
}
}
}

return nil
}

Expand Down
Loading

0 comments on commit 1ced8ba

Please sign in to comment.