Skip to content

Commit

Permalink
consul: do not re-register already registered services
Browse files Browse the repository at this point in the history
This PR updates Nomad's Consul service client to do map comparisons
using maps.Equal instead of reflect.DeepEqual. The bug fix is in how
DeepEqual treats nil slices different from empty slices, when actually
they should be treated the same.
  • Loading branch information
shoenig committed Oct 17, 2022
1 parent aa9790c commit 381874c
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 30 deletions.
3 changes: 3 additions & 0 deletions .changelog/14917.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
consul: Fixed a bug where services continuously re-registered
```
98 changes: 68 additions & 30 deletions command/agent/consul/service_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-set"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/envoy"
"github.com/hashicorp/nomad/nomad/structs"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)

Expand All @@ -28,6 +30,12 @@ const (
// services (both agent and task entries).
nomadServicePrefix = "_nomad"

// nomadServerPrefix is the prefix that scopes Nomad registered Servers.
nomadServerPrefix = nomadServicePrefix + "-server-"

// nomadClientPrefix is the prefix that scopes Nomad registered Clients.
nomadClientPrefix = nomadServicePrefix + "-client-"

// nomadTaskPrefix is the prefix that scopes Nomad registered services
// for tasks.
nomadTaskPrefix = nomadServicePrefix + "-task-"
Expand Down Expand Up @@ -131,10 +139,7 @@ type ConfigAPI interface {
// ACL requirements
// - acl:write (server only)
type ACLsAPI interface {
// We are looking up by [operator token] SecretID, which implies we need
// to use this method instead of the normal TokenRead, which can only be
// used to lookup tokens by their AccessorID.
TokenReadSelf(q *api.QueryOptions) (*api.ACLToken, *api.QueryMeta, error)
TokenReadSelf(q *api.QueryOptions) (*api.ACLToken, *api.QueryMeta, error) // for lookup via operator token
PolicyRead(policyID string, q *api.QueryOptions) (*api.ACLPolicy, *api.QueryMeta, error)
RoleRead(roleID string, q *api.QueryOptions) (*api.ACLRole, *api.QueryMeta, error)
TokenCreate(partial *api.ACLToken, q *api.WriteOptions) (*api.ACLToken, *api.WriteMeta, error)
Expand Down Expand Up @@ -170,13 +175,23 @@ func agentServiceUpdateRequired(reason syncReason, wanted *api.AgentServiceRegis
// We do so by over-writing the nomad service registration by the value
// of the tags that Consul contains, if enable_tag_override = true.
maybeTweakTags(wanted, existing, sidecar)

// Also, purge tagged address fields of nomad agent services.
maybeTweakTaggedAddresses(wanted, existing)

// Okay now it is safe to compare.
return different(wanted, existing, sidecar)

default:
// A non-periodic sync with Consul indicates an operation has been set
// on the queue. This happens when service has been added / removed / modified
// and implies the Consul agent should be sync'd with nomad, because
// nomad is the ultimate source of truth for the service definition.

// But do purge tagged address fields of nomad agent services.
maybeTweakTaggedAddresses(wanted, existing)

// Okay now it is safe to compare.
return different(wanted, existing, sidecar)
}
}
Expand All @@ -197,6 +212,15 @@ func maybeTweakTags(wanted *api.AgentServiceRegistration, existing *api.AgentSer
}
}

// maybeTweakTaggedAddresses will remove the .TaggedAddresses fields from existing
// if wanted represents a Nomad agent (Client or Server). We do this because Consul
// sets the TaggedAddress on these legacy registrations for us
func maybeTweakTaggedAddresses(wanted *api.AgentServiceRegistration, existing *api.AgentService) {
if isNomadAgent(wanted.ID) && len(wanted.TaggedAddresses) == 0 {
existing.TaggedAddresses = nil
}
}

// different compares the wanted state of the service registration with the actual
// (cached) state of the service registration reported by Consul. If any of the
// critical fields are not deeply equal, they considered different.
Expand All @@ -214,9 +238,9 @@ func different(wanted *api.AgentServiceRegistration, existing *api.AgentService,
return true
case wanted.EnableTagOverride != existing.EnableTagOverride:
return true
case !reflect.DeepEqual(wanted.Meta, existing.Meta):
case !maps.Equal(wanted.Meta, existing.Meta):
return true
case !reflect.DeepEqual(wanted.TaggedAddresses, existing.TaggedAddresses):
case !maps.Equal(wanted.TaggedAddresses, existing.TaggedAddresses):
return true
case !helper.SliceSetEq(wanted.Tags, existing.Tags):
return true
Expand Down Expand Up @@ -384,8 +408,8 @@ type ServiceClient struct {
services map[string]*api.AgentServiceRegistration
checks map[string]*api.AgentCheckRegistration

explicitlyDeregisteredServices map[string]bool
explicitlyDeregisteredChecks map[string]bool
explicitlyDeregisteredServices *set.Set[string]
explicitlyDeregisteredChecks *set.Set[string]

// allocRegistrations stores the services and checks that are registered
// with Consul by allocation ID.
Expand All @@ -394,8 +418,8 @@ type ServiceClient struct {

// Nomad agent services and checks that are recorded so they can be removed
// on shutdown. Defers to consul namespace specified in client consul config.
agentServices map[string]struct{}
agentChecks map[string]struct{}
agentServices *set.Set[string]
agentChecks *set.Set[string]
agentLock sync.Mutex

// seen is 1 if Consul has ever been seen; otherwise 0. Accessed with
Expand Down Expand Up @@ -461,11 +485,11 @@ func NewServiceClient(agentAPI AgentAPI, namespacesClient *NamespacesClient, log
opCh: make(chan *operations, 8),
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
explicitlyDeregisteredServices: make(map[string]bool),
explicitlyDeregisteredChecks: make(map[string]bool),
explicitlyDeregisteredServices: set.New[string](0),
explicitlyDeregisteredChecks: set.New[string](0),
allocRegistrations: make(map[string]*serviceregistration.AllocRegistration),
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
agentServices: set.New[string](4),
agentChecks: set.New[string](0),
isClientAgent: isNomadClient,
deregisterProbationExpiry: time.Now().Add(deregisterProbationPeriod),
checkWatcher: serviceregistration.NewCheckWatcher(logger, &checkStatusGetter{
Expand Down Expand Up @@ -656,8 +680,8 @@ func (c *ServiceClient) commit(ops *operations) {
}

func (c *ServiceClient) clearExplicitlyDeregistered() {
c.explicitlyDeregisteredServices = make(map[string]bool)
c.explicitlyDeregisteredChecks = make(map[string]bool)
c.explicitlyDeregisteredServices = set.New[string](0)
c.explicitlyDeregisteredChecks = set.New[string](0)
}

// merge registrations into state map prior to sync'ing with Consul
Expand All @@ -670,11 +694,11 @@ func (c *ServiceClient) merge(ops *operations) {
}
for _, sid := range ops.deregServices {
delete(c.services, sid)
c.explicitlyDeregisteredServices[sid] = true
c.explicitlyDeregisteredServices.Insert(sid)
}
for _, cid := range ops.deregChecks {
delete(c.checks, cid)
c.explicitlyDeregisteredChecks[cid] = true
c.explicitlyDeregisteredChecks.Insert(cid)
}
metrics.SetGauge([]string{"client", "consul", "services"}, float32(len(c.services)))
metrics.SetGauge([]string{"client", "consul", "checks"}, float32(len(c.checks)))
Expand Down Expand Up @@ -728,7 +752,7 @@ func (c *ServiceClient) sync(reason syncReason) error {
}

// Ignore unknown services during probation
if inProbation && !c.explicitlyDeregisteredServices[id] {
if inProbation && !c.explicitlyDeregisteredServices.Contains(id) {
continue
}

Expand Down Expand Up @@ -767,9 +791,8 @@ func (c *ServiceClient) sync(reason syncReason) error {
metrics.IncrCounter([]string{"client", "consul", "service_deregistrations"}, 1)
}

// Add Nomad services missing from Consul, or where the service has been updated.
// Add Nomad managed services missing in Consul, or updated via Nomad.
for id, serviceInNomad := range c.services {

serviceInConsul, exists := servicesInConsul[id]
sidecarInConsul := getNomadSidecar(id, servicesInConsul)

Expand Down Expand Up @@ -813,7 +836,7 @@ func (c *ServiceClient) sync(reason syncReason) error {
}

// Ignore unknown services during probation
if inProbation && !c.explicitlyDeregisteredChecks[id] {
if inProbation && !c.explicitlyDeregisteredChecks.Contains(id) {
continue
}

Expand Down Expand Up @@ -938,10 +961,10 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)

// Record IDs for deregistering on shutdown
for _, id := range ops.regServices {
c.agentServices[id.ID] = struct{}{}
c.agentServices.Insert(id.ID)
}
for _, id := range ops.regChecks {
c.agentChecks[id.ID] = struct{}{}
c.agentChecks.Insert(id.ID)
}
return nil
}
Expand Down Expand Up @@ -1432,7 +1455,7 @@ func (c *ServiceClient) Shutdown() error {

// Always attempt to deregister Nomad agent Consul entries, even if
// deadline was reached
for id := range c.agentServices {
for _, id := range c.agentServices.List() {
if err := c.agentAPI.ServiceDeregisterOpts(id, nil); err != nil {
c.logger.Error("failed deregistering agent service", "service_id", id, "error", err)
}
Expand Down Expand Up @@ -1463,7 +1486,7 @@ func (c *ServiceClient) Shutdown() error {
return false
}

for id := range c.agentChecks {
for _, id := range c.agentChecks.List() {
// if we couldn't populate remainingChecks it is unlikely that CheckDeregister will work, but try anyway
// if we could list the remaining checks, verify that the check we store still exists before removing it.
if remainingChecks == nil || checkRemains(id) {
Expand Down Expand Up @@ -1595,10 +1618,19 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host
return &chkReg, nil
}

// isNomadCheck returns true if the ID matches the pattern of a Nomad managed
// check.
func isNomadCheck(id string) bool {
return strings.HasPrefix(id, nomadCheckPrefix)
// isNomadClient returns true if id represents a Nomad Client registration.
func isNomadClient(id string) bool {
return strings.HasPrefix(id, nomadClientPrefix)
}

// isNomadServer returns true if id represents a Nomad Server registration.
func isNomadServer(id string) bool {
return strings.HasPrefix(id, nomadServerPrefix)
}

// isNomadAgent returns true if id represents a Nomad Client or Server registration.
func isNomadAgent(id string) bool {
return isNomadClient(id) || isNomadServer(id)
}

// isNomadService returns true if the ID matches the pattern of a Nomad managed
Expand All @@ -1608,6 +1640,12 @@ func isNomadService(id string) bool {
return strings.HasPrefix(id, nomadTaskPrefix) || isOldNomadService(id)
}

// isNomadCheck returns true if the ID matches the pattern of a Nomad managed
// check.
func isNomadCheck(id string) bool {
return strings.HasPrefix(id, nomadCheckPrefix)
}

// isOldNomadService returns true if the ID matches an old pattern managed by
// Nomad.
//
Expand Down

0 comments on commit 381874c

Please sign in to comment.