Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid de-registering slowly restored services - attempt 2 #5838

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 80 additions & 22 deletions command/agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ const (

// ServiceTagSerf is the tag assigned to Serf services
ServiceTagSerf = "serf"

// unknownDeregistrationDelay is a delay imposed on services registered in Consul but not locally.
// This mitigates risk of deregistering restored local nomad services before we track them
unknownDeregistrationDelay = 5 * time.Minute
)

// CatalogAPI is the consul/api.Catalog API used by Nomad.
Expand Down Expand Up @@ -225,10 +229,12 @@ type ServiceClient struct {

opCh chan *operations

services map[string]*api.AgentServiceRegistration
checks map[string]*api.AgentCheckRegistration
scripts map[string]*scriptCheck
runningScripts map[string]*scriptHandle
services map[string]*api.AgentServiceRegistration
checks map[string]*api.AgentCheckRegistration
scripts map[string]*scriptCheck
runningScripts map[string]*scriptHandle
servicesPendingRemoval map[string]time.Time
checksPendingRemoval map[string]time.Time

// allocRegistrations stores the services and checks that are registered
// with Consul by allocation ID.
Expand Down Expand Up @@ -260,24 +266,26 @@ type ServiceClient struct {
func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bool) *ServiceClient {
logger = logger.ResetNamed("consul.sync")
return &ServiceClient{
client: consulClient,
logger: logger,
retryInterval: defaultRetryInterval,
maxRetryInterval: defaultMaxRetryInterval,
periodicInterval: defaultPeriodicInterval,
exitCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait,
opCh: make(chan *operations, 8),
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
scripts: make(map[string]*scriptCheck),
runningScripts: make(map[string]*scriptHandle),
allocRegistrations: make(map[string]*AllocRegistration),
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
checkWatcher: newCheckWatcher(logger, consulClient),
isClientAgent: isNomadClient,
client: consulClient,
logger: logger,
retryInterval: defaultRetryInterval,
maxRetryInterval: defaultMaxRetryInterval,
periodicInterval: defaultPeriodicInterval,
exitCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait,
opCh: make(chan *operations, 8),
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
scripts: make(map[string]*scriptCheck),
runningScripts: make(map[string]*scriptHandle),
allocRegistrations: make(map[string]*AllocRegistration),
servicesPendingRemoval: make(map[string]time.Time),
checksPendingRemoval: make(map[string]time.Time),
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
checkWatcher: newCheckWatcher(logger, consulClient),
isClientAgent: isNomadClient,
}
}

Expand Down Expand Up @@ -396,6 +404,9 @@ INIT:
default:
}

// doing it here in between sleeps
c.removeStalePendingRemovals()

}
}

Expand All @@ -409,17 +420,21 @@ func (c *ServiceClient) commit(ops *operations) {

// merge registrations into state map prior to sync'ing with Consul
func (c *ServiceClient) merge(ops *operations) {
now := time.Now()
for _, s := range ops.regServices {
c.services[s.ID] = s
delete(c.servicesPendingRemoval, s.ID)
}
for _, check := range ops.regChecks {
c.checks[check.ID] = check
delete(c.checksPendingRemoval, check.ID)
}
for _, s := range ops.scripts {
c.scripts[s.id] = s
}
for _, sid := range ops.deregServices {
delete(c.services, sid)
c.servicesPendingRemoval[sid] = now
}
for _, cid := range ops.deregChecks {
if script, ok := c.runningScripts[cid]; ok {
Expand All @@ -428,12 +443,31 @@ func (c *ServiceClient) merge(ops *operations) {
delete(c.runningScripts, cid)
}
delete(c.checks, cid)
c.checksPendingRemoval[cid] = now
}
metrics.SetGauge([]string{"client", "consul", "services"}, float32(len(c.services)))
metrics.SetGauge([]string{"client", "consul", "checks"}, float32(len(c.checks)))
metrics.SetGauge([]string{"client", "consul", "script_checks"}, float32(len(c.runningScripts)))
}

// removes stale pending removals that seem to already be removed externally,
// to protect against edge cases where pending removal entries are ever growing
func (c *ServiceClient) removeStalePendingRemovals() {
const unknownDelayTimeout = 30 * time.Minute
now := time.Now()

for id, t := range c.servicesPendingRemoval {
if now.Sub(t) > unknownDelayTimeout {
delete(c.servicesPendingRemoval, id)
}
}
for id, t := range c.checksPendingRemoval {
if now.Sub(t) > unknownDelayTimeout {
delete(c.checksPendingRemoval, id)
}
}
}

// sync enqueued operations.
func (c *ServiceClient) sync() error {
sreg, creg, sdereg, cdereg := 0, 0, 0, 0
Expand All @@ -450,6 +484,8 @@ func (c *ServiceClient) sync() error {
return fmt.Errorf("error querying Consul checks: %v", err)
}

now := time.Now()

// Remove Nomad services in Consul but unknown locally
for id := range consulServices {
if _, ok := c.services[id]; ok {
Expand All @@ -466,6 +502,15 @@ func (c *ServiceClient) sync() error {
continue
}

if t, ok := c.servicesPendingRemoval[id]; !ok {
// if this first time seeing, allow grace period to remove
c.servicesPendingRemoval[id] = now.Add(unknownDeregistrationDelay)
continue
} else if now.Before(t) {
// not time for deletion yet
continue
}

// Unknown Nomad managed service; kill
if err := c.client.ServiceDeregister(id); err != nil {
if isOldNomadService(id) {
Expand All @@ -476,6 +521,8 @@ func (c *ServiceClient) sync() error {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}

delete(c.servicesPendingRemoval, id)
sdereg++
metrics.IncrCounter([]string{"client", "consul", "service_deregistrations"}, 1)
}
Expand Down Expand Up @@ -518,6 +565,15 @@ func (c *ServiceClient) sync() error {
continue
}

if t, ok := c.checksPendingRemoval[id]; !ok {
// if this first time seeing, allow grace period to remove
c.checksPendingRemoval[id] = now.Add(unknownDeregistrationDelay)
continue
} else if now.Before(t) {
// not time for deletion yet
continue
}

// Unknown Nomad managed check; remove
if err := c.client.CheckDeregister(id); err != nil {
if isOldNomadService(check.ServiceID) {
Expand All @@ -528,6 +584,8 @@ func (c *ServiceClient) sync() error {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}

delete(c.checksPendingRemoval, id)
cdereg++
metrics.IncrCounter([]string{"client", "consul", "check_deregistrations"}, 1)
}
Expand Down
28 changes: 28 additions & 0 deletions command/agent/consul/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package consul

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestConsul_RemoveStalePendingRemovals(t *testing.T) {
now := time.Now()
recent := now.Add(-5 * time.Second)
c := &ServiceClient{
servicesPendingRemoval: map[string]time.Time{
"service_superold": now.Add(-24 * time.Hour),
"service_recent": recent,
},
checksPendingRemoval: map[string]time.Time{
"checks_superold": now.Add(-24 * time.Hour),
"checks_recent": recent,
},
}

c.removeStalePendingRemovals()

require.Equal(t, map[string]time.Time{"service_recent": recent}, c.servicesPendingRemoval)
require.Equal(t, map[string]time.Time{"checks_recent": recent}, c.checksPendingRemoval)
}