Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: enable configuring enable_tag_override for services #7106

Merged
merged 1 commit into from
Feb 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions api/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,18 @@ type ServiceCheck struct {
// Service represents a Consul service definition.
type Service struct {
//FIXME Id is unused. Remove?
Id string
Name string
Tags []string
CanaryTags []string `mapstructure:"canary_tags"`
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Checks []ServiceCheck
CheckRestart *CheckRestart `mapstructure:"check_restart"`
Connect *ConsulConnect
Meta map[string]string
CanaryMeta map[string]string
Id string
Name string
Tags []string
CanaryTags []string `mapstructure:"canary_tags"`
EnableTagOverride bool `mapstructure:"enable_tag_override"`
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Checks []ServiceCheck
CheckRestart *CheckRestart `mapstructure:"check_restart"`
Connect *ConsulConnect
Meta map[string]string
CanaryMeta map[string]string
}

// Canonicalize the Service by ensuring its name and address mode are set. Task
Expand Down
25 changes: 25 additions & 0 deletions api/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestService_CheckRestart asserts Service.CheckRestart settings are properly
// inherited by Checks.
func TestService_CheckRestart(t *testing.T) {
t.Parallel()

job := &Job{Name: stringToPtr("job")}
tg := &TaskGroup{Name: stringToPtr("group")}
task := &Task{Name: "task"}
Expand Down Expand Up @@ -58,6 +61,8 @@ func TestService_CheckRestart(t *testing.T) {
// TestService_Connect asserts Service.Connect settings are properly
// inherited by Checks.
func TestService_Connect(t *testing.T) {
t.Parallel()

job := &Job{Name: stringToPtr("job")}
tg := &TaskGroup{Name: stringToPtr("group")}
task := &Task{Name: "task"}
Expand All @@ -83,3 +88,23 @@ func TestService_Connect(t *testing.T) {
assert.Equal(t, proxy.Upstreams[0].DestinationName, "upstream")
assert.Equal(t, proxy.LocalServicePort, 8000)
}

func TestService_Tags(t *testing.T) {
t.Parallel()
r := require.New(t)

// canonicalize does not modify eto or tags
job := &Job{Name: stringToPtr("job")}
tg := &TaskGroup{Name: stringToPtr("group")}
task := &Task{Name: "task"}
service := &Service{
Tags: []string{"a", "b"},
CanaryTags: []string{"c", "d"},
EnableTagOverride: true,
}

service.Canonicalize(task, tg, job)
r.True(service.EnableTagOverride)
r.Equal([]string{"a", "b"}, service.Tags)
r.Equal([]string{"c", "d"}, service.CanaryTags)
}
1 change: 1 addition & 0 deletions client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (h *groupServiceHook) Prerun() error {
func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
h.mu.Lock()
defer h.mu.Unlock()

oldWorkloadServices := h.getWorkloadServices()

// Store new updated values out of request
Expand Down
14 changes: 14 additions & 0 deletions command/agent/consul/catalog_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,17 @@ func (c *MockAgent) UpdateTTL(id string, output string, status string) error {
c.checkTTLs[id]++
return nil
}

// a convenience method for looking up a registered service by name
func (c *MockAgent) lookupService(name string) []*api.AgentServiceRegistration {
c.mu.Lock()
defer c.mu.Unlock()

var services []*api.AgentServiceRegistration
for _, service := range c.services {
if service.Name == name {
services = append(services, service)
}
}
return services
}
106 changes: 84 additions & 22 deletions command/agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,56 @@ type ACLsAPI interface {
TokenList(q *api.QueryOptions) ([]*api.ACLTokenListEntry, *api.QueryMeta, error)
}

func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.AgentService) bool {
return !(reg.Kind == svc.Kind &&
reg.ID == svc.ID &&
reg.Port == svc.Port &&
reg.Address == svc.Address &&
reg.Name == svc.Service &&
reflect.DeepEqual(reg.Tags, svc.Tags) &&
reflect.DeepEqual(reg.Meta, svc.Meta))
// agentServiceUpdateRequired checks if any critical fields in Nomad's version
// of a service definition are different from the existing service definition as
// known by Consul.
func agentServiceUpdateRequired(reason syncReason, wanted *api.AgentServiceRegistration, existing *api.AgentService) bool {
switch reason {
case syncPeriodic:
// In a periodic sync with Consul, we need to respect the value of
// the enable_tag_override field so that we maintain the illusion that the
// user is in control of the Consul tags, as they may be externally edited
// via the Consul catalog API (e.g. a user manually sets them).
//
// As Consul does by disabling anti-entropy for the tags field, Nomad will
// ignore differences in the tags field during the periodic syncs with
// the Consul agent API.
//
// We do so by over-writing the nomad service registration by the value
// of the tags that Consul contains, if enable_tag_override = true.
maybeTweakTags(wanted, existing)
return different(wanted, existing)

default:
// A non-periodic sync with Consul indicates an operation has been set
// on the queue. This happens when service has been added / removed / modified
// and implies the Consul agent should be sync'd with nomad, because
// nomad is the ultimate source of truth for the service definition.
return different(wanted, existing)
}
}

// maybeTweakTags will override wanted.Tags with a copy of existing.Tags only if
// EnableTagOverride is true. Otherwise the wanted service registration is left
// unchanged.
func maybeTweakTags(wanted *api.AgentServiceRegistration, existing *api.AgentService) {
if wanted.EnableTagOverride {
wanted.Tags = helper.CopySliceString(existing.Tags)
}
}

// different compares the wanted state of the service registration with the actual
// (cached) state of the service registration reported by Consul. If any of the
// critical fields are not deeply equal, they considered different.
func different(wanted *api.AgentServiceRegistration, existing *api.AgentService) bool {
return !(wanted.Kind == existing.Kind &&
wanted.ID == existing.ID &&
wanted.Port == existing.Port &&
wanted.Address == existing.Address &&
wanted.Name == existing.Service &&
wanted.EnableTagOverride == existing.EnableTagOverride &&
reflect.DeepEqual(wanted.Meta, existing.Meta) &&
reflect.DeepEqual(wanted.Tags, existing.Tags))
}

// operations are submitted to the main loop via commit() for synchronizing
Expand Down Expand Up @@ -320,6 +362,18 @@ func (c *ServiceClient) hasSeen() bool {
return atomic.LoadInt32(&c.seen) == seen
}

// syncReason indicates why a sync operation with consul is about to happen.
//
// The trigger for a sync may have implications on the behavior of the sync itself.
// In particular, if a service is defined with enable_tag_override=true
type syncReason byte

const (
syncPeriodic = iota
syncShutdown
syncNewOps
)

// Run the Consul main loop which retries operations against Consul. It should
// be called exactly once.
func (c *ServiceClient) Run() {
Expand Down Expand Up @@ -357,16 +411,23 @@ INIT:

failures := 0
for {
// On every iteration take note of what the trigger for the next sync
// was, so that it may be referenced during the sync itself.
var reasonForSync syncReason

select {
case <-retryTimer.C:
reasonForSync = syncPeriodic
case <-c.shutdownCh:
reasonForSync = syncShutdown
// Cancel check watcher but sync one last time
cancel()
case ops := <-c.opCh:
reasonForSync = syncNewOps
c.merge(ops)
}

if err := c.sync(); err != nil {
if err := c.sync(reasonForSync); err != nil {
if failures == 0 {
// Log on the first failure
c.logger.Warn("failed to update services in Consul", "error", err)
Expand Down Expand Up @@ -460,7 +521,7 @@ func (c *ServiceClient) merge(ops *operations) {
}

// sync enqueued operations.
func (c *ServiceClient) sync() error {
func (c *ServiceClient) sync(reason syncReason) error {
sreg, creg, sdereg, cdereg := 0, 0, 0, 0

consulServices, err := c.client.Services()
Expand Down Expand Up @@ -518,20 +579,20 @@ func (c *ServiceClient) sync() error {
}

// Add Nomad services missing from Consul, or where the service has been updated.
for id, locals := range c.services {
for id, local := range c.services {
existingSvc, ok := consulServices[id]

if ok {
// There is an existing registration of this service in Consul, so here
// we validate to see if the service has been invalidated to see if it
// should be updated.
if !agentServiceUpdateRequired(locals, existingSvc) {
if !agentServiceUpdateRequired(reason, local, existingSvc) {
// No Need to update services that have not changed
continue
}
}

if err = c.client.ServiceRegister(locals); err != nil {
if err = c.client.ServiceRegister(local); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}
Expand Down Expand Up @@ -746,13 +807,14 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w

// Build the Consul Service registration request
serviceReg := &api.AgentServiceRegistration{
ID: id,
Name: service.Name,
Tags: tags,
Address: ip,
Port: port,
Meta: meta,
Connect: connect, // will be nil if no Connect stanza
ID: id,
Name: service.Name,
Tags: tags,
EnableTagOverride: service.EnableTagOverride,
Address: ip,
Port: port,
Meta: meta,
Connect: connect, // will be nil if no Connect stanza
}
ops.regServices = append(ops.regServices, serviceReg)

Expand Down Expand Up @@ -868,8 +930,7 @@ func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error {
//
// DriverNetwork must not change between invocations for the same allocation.
func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error {
ops := &operations{}

ops := new(operations)
regs := new(ServiceRegistrations)
regs.Services = make(map[string]*ServiceRegistration, len(newWorkload.Services))

Expand Down Expand Up @@ -984,6 +1045,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
}
}
}

return nil
}

Expand Down
Loading