Skip to content

Commit

Permalink
Merge pull request #4170 from hashicorp/f-consul-periodic-sync
Browse files Browse the repository at this point in the history
consul: periodically reconcile services/checks
  • Loading branch information
schmichael authored Apr 19, 2018
2 parents 45d0c88 + 972e861 commit 4317ea7
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 46 deletions.
28 changes: 26 additions & 2 deletions command/agent/consul/catalog_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ type MockAgent struct {
// maps of what services and checks have been registered
services map[string]*api.AgentServiceRegistration
checks map[string]*api.AgentCheckRegistration
mu sync.Mutex

// hits is the total number of times agent methods have been called
hits int

// mu guards above fields
mu sync.Mutex

// when UpdateTTL is called the check ID will have its counter inc'd
checkTTLs map[string]int
Expand All @@ -52,6 +57,13 @@ func NewMockAgent() *MockAgent {
}
}

// getHits returns how many Consul Agent API calls have been made.
func (c *MockAgent) getHits() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.hits
}

// SetStatus that Checks() should return. Returns old status value.
func (c *MockAgent) SetStatus(s string) string {
c.mu.Lock()
Expand All @@ -62,6 +74,10 @@ func (c *MockAgent) SetStatus(s string) string {
}

func (c *MockAgent) Self() (map[string]map[string]interface{}, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++

s := map[string]map[string]interface{}{
"Member": {
"Addr": "127.0.0.1",
Expand All @@ -85,6 +101,7 @@ func (c *MockAgent) Self() (map[string]map[string]interface{}, error) {
func (c *MockAgent) Services() (map[string]*api.AgentService, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++

r := make(map[string]*api.AgentService, len(c.services))
for k, v := range c.services {
Expand All @@ -105,6 +122,7 @@ func (c *MockAgent) Services() (map[string]*api.AgentService, error) {
func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++

r := make(map[string]*api.AgentCheck, len(c.checks))
for k, v := range c.checks {
Expand All @@ -125,7 +143,6 @@ func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) {
func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration {
c.mu.Lock()
defer c.mu.Unlock()

regs := make([]*api.AgentCheckRegistration, 0, len(c.checks))
for _, check := range c.checks {
regs = append(regs, check)
Expand All @@ -136,6 +153,8 @@ func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration {
func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++

c.checks[check.ID] = check

// Be nice and make checks reachable-by-service
Expand All @@ -147,6 +166,7 @@ func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error {
func (c *MockAgent) CheckDeregister(checkID string) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
delete(c.checks, checkID)
delete(c.checkTTLs, checkID)
return nil
Expand All @@ -155,20 +175,24 @@ func (c *MockAgent) CheckDeregister(checkID string) error {
func (c *MockAgent) ServiceRegister(service *api.AgentServiceRegistration) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
c.services[service.ID] = service
return nil
}

func (c *MockAgent) ServiceDeregister(serviceID string) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
delete(c.services, serviceID)
return nil
}

func (c *MockAgent) UpdateTTL(id string, output string, status string) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++

check, ok := c.checks[id]
if !ok {
return fmt.Errorf("unknown check id: %q", id)
Expand Down
17 changes: 9 additions & 8 deletions command/agent/consul/check_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -113,9 +114,9 @@ func (c *fakeChecksAPI) Checks() (map[string]*api.AgentCheck, error) {

// testWatcherSetup sets up a fakeChecksAPI and a real checkWatcher with a test
// logger and faster poll frequency.
func testWatcherSetup() (*fakeChecksAPI, *checkWatcher) {
func testWatcherSetup(t *testing.T) (*fakeChecksAPI, *checkWatcher) {
fakeAPI := newFakeChecksAPI()
cw := newCheckWatcher(testLogger(), fakeAPI)
cw := newCheckWatcher(testlog.Logger(t), fakeAPI)
cw.pollFreq = 10 * time.Millisecond
return fakeAPI, cw
}
Expand All @@ -141,7 +142,7 @@ func TestCheckWatcher_Skip(t *testing.T) {
check := testCheck()
check.CheckRestart = nil

cw := newCheckWatcher(testLogger(), newFakeChecksAPI())
cw := newCheckWatcher(testlog.Logger(t), newFakeChecksAPI())
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check)
cw.Watch("testalloc1", "testtask1", "testcheck1", check, restarter1)

Expand All @@ -155,7 +156,7 @@ func TestCheckWatcher_Skip(t *testing.T) {
func TestCheckWatcher_Healthy(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)

check1 := testCheck()
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
Expand Down Expand Up @@ -190,7 +191,7 @@ func TestCheckWatcher_Healthy(t *testing.T) {
func TestCheckWatcher_HealthyWarning(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)

check1 := testCheck()
check1.CheckRestart.Limit = 1
Expand Down Expand Up @@ -218,7 +219,7 @@ func TestCheckWatcher_HealthyWarning(t *testing.T) {
func TestCheckWatcher_Flapping(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)

check1 := testCheck()
check1.CheckRestart.Grace = 0
Expand Down Expand Up @@ -247,7 +248,7 @@ func TestCheckWatcher_Flapping(t *testing.T) {
func TestCheckWatcher_Unwatch(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)

// Unwatch immediately
check1 := testCheck()
Expand Down Expand Up @@ -276,7 +277,7 @@ func TestCheckWatcher_Unwatch(t *testing.T) {
func TestCheckWatcher_MultipleChecks(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)

check1 := testCheck()
check1.CheckRestart.Limit = 1
Expand Down
26 changes: 20 additions & 6 deletions command/agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ const (
// defaultMaxRetryInterval is the default max retry interval.
defaultMaxRetryInterval = 30 * time.Second

// defaultPeriodicalInterval is the interval at which the service
// client reconciles state between the desired services and checks and
// what's actually registered in Consul. This is done at an interval,
// rather than being purely edge triggered, to handle the case that the
// Consul agent's state may change underneath us
defaultPeriodicInterval = 30 * time.Second

// ttlCheckBuffer is the time interval that Nomad can take to report Consul
// the check result
ttlCheckBuffer = 31 * time.Second
Expand Down Expand Up @@ -190,6 +197,7 @@ type ServiceClient struct {
logger *log.Logger
retryInterval time.Duration
maxRetryInterval time.Duration
periodicInterval time.Duration

// exitCh is closed when the main Run loop exits
exitCh chan struct{}
Expand Down Expand Up @@ -235,6 +243,7 @@ func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient
logger: logger,
retryInterval: defaultRetryInterval,
maxRetryInterval: defaultMaxRetryInterval,
periodicInterval: defaultPeriodicInterval,
exitCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait,
Expand Down Expand Up @@ -279,7 +288,6 @@ func (c *ServiceClient) Run() {

// Process operations while waiting for initial contact with Consul but
// do not sync until contact has been made.
hasOps := false
INIT:
for {
select {
Expand All @@ -289,7 +297,6 @@ INIT:
case <-c.shutdownCh:
return
case ops := <-c.opCh:
hasOps = true
c.merge(ops)
}
}
Expand All @@ -299,11 +306,8 @@ INIT:
// Start checkWatcher
go c.checkWatcher.Run(ctx)

// Always immediately sync to reconcile Nomad and Consul's state
retryTimer := time.NewTimer(0)
if !hasOps {
// No pending operations so don't immediately sync
<-retryTimer.C
}

failures := 0
for {
Expand Down Expand Up @@ -345,6 +349,16 @@ INIT:
c.logger.Printf("[INFO] consul.sync: successfully updated services in Consul")
failures = 0
}

// Reset timer to periodic interval to periodically
// reconile with Consul
if !retryTimer.Stop() {
select {
case <-retryTimer.C:
default:
}
}
retryTimer.Reset(c.periodicInterval)
}

select {
Expand Down
11 changes: 6 additions & 5 deletions command/agent/consul/script_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/testtask"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestConsulScript_Exec_Cancel(t *testing.T) {
exec := newBlockingScriptExec()

// pass nil for heartbeater as it shouldn't be called
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testLogger(), nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testlog.Logger(t), nil)
handle := check.run()

// wait until Exec is called
Expand Down Expand Up @@ -111,7 +112,7 @@ func TestConsulScript_Exec_Timeout(t *testing.T) {
exec := newBlockingScriptExec()

hb := newFakeHeartbeater()
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), nil)
handle := check.run()
defer handle.cancel() // just-in-case cleanup
<-exec.running
Expand Down Expand Up @@ -160,7 +161,7 @@ func TestConsulScript_Exec_TimeoutCritical(t *testing.T) {
Timeout: time.Nanosecond,
}
hb := newFakeHeartbeater()
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testLogger(), nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testlog.Logger(t), nil)
handle := check.run()
defer handle.cancel() // just-in-case cleanup

Expand Down Expand Up @@ -205,7 +206,7 @@ func TestConsulScript_Exec_Shutdown(t *testing.T) {
hb := newFakeHeartbeater()
shutdown := make(chan struct{})
exec := newSimpleExec(0, nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), shutdown)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), shutdown)
handle := check.run()
defer handle.cancel() // just-in-case cleanup

Expand Down Expand Up @@ -242,7 +243,7 @@ func TestConsulScript_Exec_Codes(t *testing.T) {
hb := newFakeHeartbeater()
shutdown := make(chan struct{})
exec := newSimpleExec(code, err)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), shutdown)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), shutdown)
handle := check.run()
defer handle.cancel()

Expand Down
Loading

0 comments on commit 4317ea7

Please sign in to comment.