Skip to content

Commit

Permalink
Merge pull request #5837 from hashicorp/b-consul-restore-sync-2
Browse files Browse the repository at this point in the history
Avoid de-registering slowly restored services
  • Loading branch information
Mahmood Ali authored Jul 17, 2019
2 parents cacf79e + 121c974 commit 15caf5c
Show file tree
Hide file tree
Showing 2 changed files with 299 additions and 20 deletions.
74 changes: 56 additions & 18 deletions command/agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ const (

// ServiceTagSerf is the tag assigned to Serf services
ServiceTagSerf = "serf"

// deregisterProbationPeriod is the initialization period where
// services registered in Consul but not in Nomad don't get deregistered,
// to allow for nomad restoring tasks
deregisterProbationPeriod = time.Minute
)

// CatalogAPI is the consul/api.Catalog API used by Nomad.
Expand Down Expand Up @@ -230,6 +235,9 @@ type ServiceClient struct {
scripts map[string]*scriptCheck
runningScripts map[string]*scriptHandle

explicitlyDeregisteredServices map[string]bool
explicitlyDeregisteredChecks map[string]bool

// allocRegistrations stores the services and checks that are registered
// with Consul by allocation ID.
allocRegistrations map[string]*AllocRegistration
Expand All @@ -245,6 +253,11 @@ type ServiceClient struct {
// atomics.
seen int32

// deregisterProbationExpiry is the time before which consul sync shouldn't deregister
// unknown services.
// Used to mitigate risk of deleting restored services upon client restart.
deregisterProbationExpiry time.Time

// checkWatcher restarts checks that are unhealthy.
checkWatcher *checkWatcher

Expand All @@ -260,24 +273,27 @@ type ServiceClient struct {
func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bool) *ServiceClient {
logger = logger.ResetNamed("consul.sync")
return &ServiceClient{
client: consulClient,
logger: logger,
retryInterval: defaultRetryInterval,
maxRetryInterval: defaultMaxRetryInterval,
periodicInterval: defaultPeriodicInterval,
exitCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait,
opCh: make(chan *operations, 8),
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
scripts: make(map[string]*scriptCheck),
runningScripts: make(map[string]*scriptHandle),
allocRegistrations: make(map[string]*AllocRegistration),
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
checkWatcher: newCheckWatcher(logger, consulClient),
isClientAgent: isNomadClient,
client: consulClient,
logger: logger,
retryInterval: defaultRetryInterval,
maxRetryInterval: defaultMaxRetryInterval,
periodicInterval: defaultPeriodicInterval,
exitCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait,
opCh: make(chan *operations, 8),
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
scripts: make(map[string]*scriptCheck),
runningScripts: make(map[string]*scriptHandle),
explicitlyDeregisteredServices: make(map[string]bool),
explicitlyDeregisteredChecks: make(map[string]bool),
allocRegistrations: make(map[string]*AllocRegistration),
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
checkWatcher: newCheckWatcher(logger, consulClient),
isClientAgent: isNomadClient,
deregisterProbationExpiry: time.Now().Add(deregisterProbationPeriod),
}
}

Expand Down Expand Up @@ -372,6 +388,9 @@ INIT:
failures = 0
}

// on successful sync, clear deregistered consul entities
c.clearExplicitlyDeregistered()

// Reset timer to periodic interval to periodically
// reconile with Consul
if !retryTimer.Stop() {
Expand Down Expand Up @@ -407,6 +426,11 @@ func (c *ServiceClient) commit(ops *operations) {
}
}

func (c *ServiceClient) clearExplicitlyDeregistered() {
c.explicitlyDeregisteredServices = map[string]bool{}
c.explicitlyDeregisteredChecks = map[string]bool{}
}

// merge registrations into state map prior to sync'ing with Consul
func (c *ServiceClient) merge(ops *operations) {
for _, s := range ops.regServices {
Expand All @@ -420,6 +444,7 @@ func (c *ServiceClient) merge(ops *operations) {
}
for _, sid := range ops.deregServices {
delete(c.services, sid)
c.explicitlyDeregisteredServices[sid] = true
}
for _, cid := range ops.deregChecks {
if script, ok := c.runningScripts[cid]; ok {
Expand All @@ -428,6 +453,7 @@ func (c *ServiceClient) merge(ops *operations) {
delete(c.runningScripts, cid)
}
delete(c.checks, cid)
c.explicitlyDeregisteredChecks[cid] = true
}
metrics.SetGauge([]string{"client", "consul", "services"}, float32(len(c.services)))
metrics.SetGauge([]string{"client", "consul", "checks"}, float32(len(c.checks)))
Expand All @@ -450,6 +476,8 @@ func (c *ServiceClient) sync() error {
return fmt.Errorf("error querying Consul checks: %v", err)
}

inProbation := time.Now().Before(c.deregisterProbationExpiry)

// Remove Nomad services in Consul but unknown locally
for id := range consulServices {
if _, ok := c.services[id]; ok {
Expand All @@ -466,6 +494,11 @@ func (c *ServiceClient) sync() error {
continue
}

// Ignore unknown services during probation
if inProbation && !c.explicitlyDeregisteredServices[id] {
continue
}

// Unknown Nomad managed service; kill
if err := c.client.ServiceDeregister(id); err != nil {
if isOldNomadService(id) {
Expand Down Expand Up @@ -518,6 +551,11 @@ func (c *ServiceClient) sync() error {
continue
}

// Ignore unknown services during probation
if inProbation && !c.explicitlyDeregisteredChecks[id] {
continue
}

// Unknown Nomad managed check; remove
if err := c.client.CheckDeregister(id); err != nil {
if isOldNomadService(check.ServiceID) {
Expand Down
Loading

0 comments on commit 15caf5c

Please sign in to comment.