Skip to content

Commit

Permalink
client: enable configuring enable_tag_override for services
Browse files Browse the repository at this point in the history
Consul provides a feature of Service Definitions where the tags
associated with a service can be modified through the Catalog API,
overriding the value(s) configured in the agent's service configuration.

To enable this feature, the flag enable_tag_override must be configured
in the service definition.

Previously, Nomad did not allow configuring this flag, and thus the default
value of false was used. Now, it is configurable.

Because Nomad itself acts as a state machine around the the service definitions
of the tasks it manages, it's worth describing what happens when this feature
is enabled and why.

Consider the basic case where there is no Nomad, and your service is provided
to consul as a boring JSON file. The ultimate source of truth for the definition
of that service is the file, and is stored in the agent. Later, Consul performs
"anti-entropy" which synchronizes the Catalog (stored only the leaders). Then
with enable_tag_override=true, the tags field is available for "external"
modification through the Catalog API (rather than directly configuring the
service definition file, or using the Agent API). The important observation
is that if the service definition ever changes (i.e. the file is changed &
config reloaded OR the Agent API is used to modify the service), those
"external" tag values are thrown away, and the new service definition is
once again the source of truth.

In the Nomad case, Nomad itself is the source of truth over the Agent in
the same way the JSON file was the source of truth in the example above.
That means any time Nomad sets a new service definition, any externally
configured tags are going to be replaced. When does this happen? Only on
major lifecycle events, for example when a task is modified because of an
updated job spec from the 'nomad job run <existing>' command. Otherwise,
Nomad's periodic re-sync's with Consul will now no longer try to restore
the externally modified tag values (as long as enable_tag_override=true).

Fixes #2057
  • Loading branch information
shoenig committed Feb 10, 2020
1 parent 4757f87 commit 6bfd86b
Show file tree
Hide file tree
Showing 16 changed files with 654 additions and 183 deletions.
23 changes: 12 additions & 11 deletions api/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,18 @@ type ServiceCheck struct {
// Service represents a Consul service definition.
type Service struct {
//FIXME Id is unused. Remove?
Id string
Name string
Tags []string
CanaryTags []string `mapstructure:"canary_tags"`
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Checks []ServiceCheck
CheckRestart *CheckRestart `mapstructure:"check_restart"`
Connect *ConsulConnect
Meta map[string]string
CanaryMeta map[string]string
Id string
Name string
Tags []string
CanaryTags []string `mapstructure:"canary_tags"`
EnableTagOverride bool `mapstructure:"enable_tag_override"`
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Checks []ServiceCheck
CheckRestart *CheckRestart `mapstructure:"check_restart"`
Connect *ConsulConnect
Meta map[string]string
CanaryMeta map[string]string
}

// Canonicalize the Service by ensuring its name and address mode are set. Task
Expand Down
25 changes: 25 additions & 0 deletions api/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestService_CheckRestart asserts Service.CheckRestart settings are properly
// inherited by Checks.
func TestService_CheckRestart(t *testing.T) {
t.Parallel()

job := &Job{Name: stringToPtr("job")}
tg := &TaskGroup{Name: stringToPtr("group")}
task := &Task{Name: "task"}
Expand Down Expand Up @@ -58,6 +61,8 @@ func TestService_CheckRestart(t *testing.T) {
// TestService_Connect asserts Service.Connect settings are properly
// inherited by Checks.
func TestService_Connect(t *testing.T) {
t.Parallel()

job := &Job{Name: stringToPtr("job")}
tg := &TaskGroup{Name: stringToPtr("group")}
task := &Task{Name: "task"}
Expand All @@ -83,3 +88,23 @@ func TestService_Connect(t *testing.T) {
assert.Equal(t, proxy.Upstreams[0].DestinationName, "upstream")
assert.Equal(t, proxy.LocalServicePort, 8000)
}

func TestService_Tags(t *testing.T) {
t.Parallel()
r := require.New(t)

// canonicalize does not modify eto or tags
job := &Job{Name: stringToPtr("job")}
tg := &TaskGroup{Name: stringToPtr("group")}
task := &Task{Name: "task"}
service := &Service{
Tags: []string{"a", "b"},
CanaryTags: []string{"c", "d"},
EnableTagOverride: true,
}

service.Canonicalize(task, tg, job)
r.True(service.EnableTagOverride)
r.Equal([]string{"a", "b"}, service.Tags)
r.Equal([]string{"c", "d"}, service.CanaryTags)
}
1 change: 1 addition & 0 deletions client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (h *groupServiceHook) Prerun() error {
func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
h.mu.Lock()
defer h.mu.Unlock()

oldWorkloadServices := h.getWorkloadServices()

// Store new updated values out of request
Expand Down
14 changes: 14 additions & 0 deletions command/agent/consul/catalog_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,17 @@ func (c *MockAgent) UpdateTTL(id string, output string, status string) error {
c.checkTTLs[id]++
return nil
}

// a convenience method for looking up a registered service by name
func (c *MockAgent) lookupService(name string) []*api.AgentServiceRegistration {
c.mu.Lock()
defer c.mu.Unlock()

var services []*api.AgentServiceRegistration
for _, service := range c.services {
if service.Name == name {
services = append(services, service)
}
}
return services
}
106 changes: 84 additions & 22 deletions command/agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,56 @@ type ACLsAPI interface {
TokenList(q *api.QueryOptions) ([]*api.ACLTokenListEntry, *api.QueryMeta, error)
}

func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.AgentService) bool {
return !(reg.Kind == svc.Kind &&
reg.ID == svc.ID &&
reg.Port == svc.Port &&
reg.Address == svc.Address &&
reg.Name == svc.Service &&
reflect.DeepEqual(reg.Tags, svc.Tags) &&
reflect.DeepEqual(reg.Meta, svc.Meta))
// agentServiceUpdateRequired checks if any critical fields in Nomad's version
// of a service definition are different from the existing service definition as
// known by Consul.
func agentServiceUpdateRequired(reason syncReason, wanted *api.AgentServiceRegistration, existing *api.AgentService) bool {
switch reason {
case syncPeriodic:
// In a periodic sync with Consul, we need to respect the value of
// the enable_tag_override field so that we maintain the illusion that the
// user is in control of the Consul tags, as they may be externally edited
// via the Consul catalog API (e.g. a user manually sets them).
//
// As Consul does by disabling anti-entropy for the tags field, Nomad will
// ignore differences in the tags field during the periodic syncs with
// the Consul agent API.
//
// We do so by over-writing the nomad service registration by the value
// of the tags that Consul contains, if enable_tag_override = true.
maybeTweakTags(wanted, existing)
return different(wanted, existing)

default:
// A non-periodic sync with Consul indicates an operation has been set
// on the queue. This happens when service has been added / removed / modified
// and implies the Consul agent should be sync'd with nomad, because
// nomad is the ultimate source of truth for the service definition.
return different(wanted, existing)
}
}

// maybeTweakTags will override wanted.Tags with a copy of existing.Tags only if
// EnableTagOverride is true. Otherwise the wanted service registration is left
// unchanged.
func maybeTweakTags(wanted *api.AgentServiceRegistration, existing *api.AgentService) {
if wanted.EnableTagOverride {
wanted.Tags = helper.CopySliceString(existing.Tags)
}
}

// different compares the wanted state of the service registration with the actual
// (cached) state of the service registration reported by Consul. If any of the
// critical fields are not deeply equal, they considered different.
func different(wanted *api.AgentServiceRegistration, existing *api.AgentService) bool {
return !(wanted.Kind == existing.Kind &&
wanted.ID == existing.ID &&
wanted.Port == existing.Port &&
wanted.Address == existing.Address &&
wanted.Name == existing.Service &&
wanted.EnableTagOverride == existing.EnableTagOverride &&
reflect.DeepEqual(wanted.Meta, existing.Meta) &&
reflect.DeepEqual(wanted.Tags, existing.Tags))
}

// operations are submitted to the main loop via commit() for synchronizing
Expand Down Expand Up @@ -320,6 +362,18 @@ func (c *ServiceClient) hasSeen() bool {
return atomic.LoadInt32(&c.seen) == seen
}

// syncReason indicates why a sync operation with consul is about to happen.
//
// The trigger for a sync may have implications on the behavior of the sync itself.
// In particular, if a service is defined with enable_tag_override=true
type syncReason byte

const (
syncPeriodic = iota
syncShutdown
syncNewOps
)

// Run the Consul main loop which retries operations against Consul. It should
// be called exactly once.
func (c *ServiceClient) Run() {
Expand Down Expand Up @@ -357,16 +411,23 @@ INIT:

failures := 0
for {
// On every iteration take note of what the trigger for the next sync
// was, so that it may be referenced during the sync itself.
var reasonForSync syncReason

select {
case <-retryTimer.C:
reasonForSync = syncPeriodic
case <-c.shutdownCh:
reasonForSync = syncShutdown
// Cancel check watcher but sync one last time
cancel()
case ops := <-c.opCh:
reasonForSync = syncNewOps
c.merge(ops)
}

if err := c.sync(); err != nil {
if err := c.sync(reasonForSync); err != nil {
if failures == 0 {
// Log on the first failure
c.logger.Warn("failed to update services in Consul", "error", err)
Expand Down Expand Up @@ -460,7 +521,7 @@ func (c *ServiceClient) merge(ops *operations) {
}

// sync enqueued operations.
func (c *ServiceClient) sync() error {
func (c *ServiceClient) sync(reason syncReason) error {
sreg, creg, sdereg, cdereg := 0, 0, 0, 0

consulServices, err := c.client.Services()
Expand Down Expand Up @@ -518,20 +579,20 @@ func (c *ServiceClient) sync() error {
}

// Add Nomad services missing from Consul, or where the service has been updated.
for id, locals := range c.services {
for id, local := range c.services {
existingSvc, ok := consulServices[id]

if ok {
// There is an existing registration of this service in Consul, so here
// we validate to see if the service has been invalidated to see if it
// should be updated.
if !agentServiceUpdateRequired(locals, existingSvc) {
if !agentServiceUpdateRequired(reason, local, existingSvc) {
// No Need to update services that have not changed
continue
}
}

if err = c.client.ServiceRegister(locals); err != nil {
if err = c.client.ServiceRegister(local); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}
Expand Down Expand Up @@ -746,13 +807,14 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w

// Build the Consul Service registration request
serviceReg := &api.AgentServiceRegistration{
ID: id,
Name: service.Name,
Tags: tags,
Address: ip,
Port: port,
Meta: meta,
Connect: connect, // will be nil if no Connect stanza
ID: id,
Name: service.Name,
Tags: tags,
EnableTagOverride: service.EnableTagOverride,
Address: ip,
Port: port,
Meta: meta,
Connect: connect, // will be nil if no Connect stanza
}
ops.regServices = append(ops.regServices, serviceReg)

Expand Down Expand Up @@ -868,8 +930,7 @@ func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error {
//
// DriverNetwork must not change between invocations for the same allocation.
func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error {
ops := &operations{}

ops := new(operations)
regs := new(ServiceRegistrations)
regs.Services = make(map[string]*ServiceRegistration, len(newWorkload.Services))

Expand Down Expand Up @@ -984,6 +1045,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error
}
}
}

return nil
}

Expand Down
Loading

0 comments on commit 6bfd86b

Please sign in to comment.