Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid de-registering slowly restored services #5837

Merged
merged 2 commits into from
Jul 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 56 additions & 18 deletions command/agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ const (

// ServiceTagSerf is the tag assigned to Serf services
ServiceTagSerf = "serf"

// deregisterProbationPeriod is the initialization period where
// services registered in Consul but not in Nomad don't get deregistered,
// to allow for nomad restoring tasks
deregisterProbationPeriod = time.Minute
)

// CatalogAPI is the consul/api.Catalog API used by Nomad.
Expand Down Expand Up @@ -230,6 +235,9 @@ type ServiceClient struct {
scripts map[string]*scriptCheck
runningScripts map[string]*scriptHandle

explicitlyDeregisteredServices map[string]bool
explicitlyDeregisteredChecks map[string]bool

// allocRegistrations stores the services and checks that are registered
// with Consul by allocation ID.
allocRegistrations map[string]*AllocRegistration
Expand All @@ -245,6 +253,11 @@ type ServiceClient struct {
// atomics.
seen int32

// deregisterProbationExpiry is the time before which consul sync shouldn't deregister
// unknown services.
// Used to mitigate risk of deleting restored services upon client restart.
deregisterProbationExpiry time.Time

// checkWatcher restarts checks that are unhealthy.
checkWatcher *checkWatcher

Expand All @@ -260,24 +273,27 @@ type ServiceClient struct {
func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bool) *ServiceClient {
logger = logger.ResetNamed("consul.sync")
return &ServiceClient{
client: consulClient,
logger: logger,
retryInterval: defaultRetryInterval,
maxRetryInterval: defaultMaxRetryInterval,
periodicInterval: defaultPeriodicInterval,
exitCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait,
opCh: make(chan *operations, 8),
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
scripts: make(map[string]*scriptCheck),
runningScripts: make(map[string]*scriptHandle),
allocRegistrations: make(map[string]*AllocRegistration),
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
checkWatcher: newCheckWatcher(logger, consulClient),
isClientAgent: isNomadClient,
client: consulClient,
logger: logger,
retryInterval: defaultRetryInterval,
maxRetryInterval: defaultMaxRetryInterval,
periodicInterval: defaultPeriodicInterval,
exitCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait,
opCh: make(chan *operations, 8),
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
scripts: make(map[string]*scriptCheck),
runningScripts: make(map[string]*scriptHandle),
explicitlyDeregisteredServices: make(map[string]bool),
explicitlyDeregisteredChecks: make(map[string]bool),
allocRegistrations: make(map[string]*AllocRegistration),
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
checkWatcher: newCheckWatcher(logger, consulClient),
isClientAgent: isNomadClient,
deregisterProbationExpiry: time.Now().Add(deregisterProbationPeriod),
}
}

Expand Down Expand Up @@ -372,6 +388,9 @@ INIT:
failures = 0
}

// on successful sync, clear deregistered consul entities
c.clearExplicitlyDeregistered()

// Reset timer to periodic interval to periodically
// reconile with Consul
if !retryTimer.Stop() {
Expand Down Expand Up @@ -407,6 +426,11 @@ func (c *ServiceClient) commit(ops *operations) {
}
}

func (c *ServiceClient) clearExplicitlyDeregistered() {
c.explicitlyDeregisteredServices = map[string]bool{}
c.explicitlyDeregisteredChecks = map[string]bool{}
}

// merge registrations into state map prior to sync'ing with Consul
func (c *ServiceClient) merge(ops *operations) {
for _, s := range ops.regServices {
Expand All @@ -420,6 +444,7 @@ func (c *ServiceClient) merge(ops *operations) {
}
for _, sid := range ops.deregServices {
delete(c.services, sid)
c.explicitlyDeregisteredServices[sid] = true
}
for _, cid := range ops.deregChecks {
if script, ok := c.runningScripts[cid]; ok {
Expand All @@ -428,6 +453,7 @@ func (c *ServiceClient) merge(ops *operations) {
delete(c.runningScripts, cid)
}
delete(c.checks, cid)
c.explicitlyDeregisteredChecks[cid] = true
}
metrics.SetGauge([]string{"client", "consul", "services"}, float32(len(c.services)))
metrics.SetGauge([]string{"client", "consul", "checks"}, float32(len(c.checks)))
Expand All @@ -450,6 +476,8 @@ func (c *ServiceClient) sync() error {
return fmt.Errorf("error querying Consul checks: %v", err)
}

inProbation := time.Now().Before(c.deregisterProbationExpiry)

// Remove Nomad services in Consul but unknown locally
for id := range consulServices {
if _, ok := c.services[id]; ok {
Expand All @@ -466,6 +494,11 @@ func (c *ServiceClient) sync() error {
continue
}

// Ignore unknown services during probation
if inProbation && !c.explicitlyDeregisteredServices[id] {
continue
}

// Unknown Nomad managed service; kill
if err := c.client.ServiceDeregister(id); err != nil {
if isOldNomadService(id) {
Expand Down Expand Up @@ -518,6 +551,11 @@ func (c *ServiceClient) sync() error {
continue
}

// Ignore unknown services during probation
if inProbation && !c.explicitlyDeregisteredChecks[id] {
continue
}

// Unknown Nomad managed check; remove
if err := c.client.CheckDeregister(id); err != nil {
if isOldNomadService(check.ServiceID) {
Expand Down
Loading