Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consul: periodically reconcile services/checks #4170

Merged
merged 1 commit into from
Apr 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions command/agent/consul/catalog_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ type MockAgent struct {
// maps of what services and checks have been registered
services map[string]*api.AgentServiceRegistration
checks map[string]*api.AgentCheckRegistration
mu sync.Mutex

// hits is the total number of times agent methods have been called
hits int

// mu guards above fields
mu sync.Mutex

// when UpdateTTL is called the check ID will have its counter inc'd
checkTTLs map[string]int
Expand All @@ -52,6 +57,13 @@ func NewMockAgent() *MockAgent {
}
}

// getHits returns how many Consul Agent API calls have been made.
func (c *MockAgent) getHits() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.hits
}

// SetStatus that Checks() should return. Returns old status value.
func (c *MockAgent) SetStatus(s string) string {
c.mu.Lock()
Expand All @@ -62,6 +74,10 @@ func (c *MockAgent) SetStatus(s string) string {
}

func (c *MockAgent) Self() (map[string]map[string]interface{}, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++

s := map[string]map[string]interface{}{
"Member": {
"Addr": "127.0.0.1",
Expand All @@ -85,6 +101,7 @@ func (c *MockAgent) Self() (map[string]map[string]interface{}, error) {
func (c *MockAgent) Services() (map[string]*api.AgentService, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++

r := make(map[string]*api.AgentService, len(c.services))
for k, v := range c.services {
Expand All @@ -105,6 +122,7 @@ func (c *MockAgent) Services() (map[string]*api.AgentService, error) {
func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++

r := make(map[string]*api.AgentCheck, len(c.checks))
for k, v := range c.checks {
Expand All @@ -125,7 +143,6 @@ func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) {
func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration {
c.mu.Lock()
defer c.mu.Unlock()

regs := make([]*api.AgentCheckRegistration, 0, len(c.checks))
for _, check := range c.checks {
regs = append(regs, check)
Expand All @@ -136,6 +153,8 @@ func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration {
func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++

c.checks[check.ID] = check

// Be nice and make checks reachable-by-service
Expand All @@ -147,6 +166,7 @@ func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error {
func (c *MockAgent) CheckDeregister(checkID string) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
delete(c.checks, checkID)
delete(c.checkTTLs, checkID)
return nil
Expand All @@ -155,20 +175,24 @@ func (c *MockAgent) CheckDeregister(checkID string) error {
func (c *MockAgent) ServiceRegister(service *api.AgentServiceRegistration) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
c.services[service.ID] = service
return nil
}

func (c *MockAgent) ServiceDeregister(serviceID string) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
delete(c.services, serviceID)
return nil
}

func (c *MockAgent) UpdateTTL(id string, output string, status string) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++

check, ok := c.checks[id]
if !ok {
return fmt.Errorf("unknown check id: %q", id)
Expand Down
17 changes: 9 additions & 8 deletions command/agent/consul/check_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -113,9 +114,9 @@ func (c *fakeChecksAPI) Checks() (map[string]*api.AgentCheck, error) {

// testWatcherSetup sets up a fakeChecksAPI and a real checkWatcher with a test
// logger and faster poll frequency.
func testWatcherSetup() (*fakeChecksAPI, *checkWatcher) {
func testWatcherSetup(t *testing.T) (*fakeChecksAPI, *checkWatcher) {
fakeAPI := newFakeChecksAPI()
cw := newCheckWatcher(testLogger(), fakeAPI)
cw := newCheckWatcher(testlog.Logger(t), fakeAPI)
cw.pollFreq = 10 * time.Millisecond
return fakeAPI, cw
}
Expand All @@ -141,7 +142,7 @@ func TestCheckWatcher_Skip(t *testing.T) {
check := testCheck()
check.CheckRestart = nil

cw := newCheckWatcher(testLogger(), newFakeChecksAPI())
cw := newCheckWatcher(testlog.Logger(t), newFakeChecksAPI())
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check)
cw.Watch("testalloc1", "testtask1", "testcheck1", check, restarter1)

Expand All @@ -155,7 +156,7 @@ func TestCheckWatcher_Skip(t *testing.T) {
func TestCheckWatcher_Healthy(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)

check1 := testCheck()
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
Expand Down Expand Up @@ -190,7 +191,7 @@ func TestCheckWatcher_Healthy(t *testing.T) {
func TestCheckWatcher_HealthyWarning(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)

check1 := testCheck()
check1.CheckRestart.Limit = 1
Expand Down Expand Up @@ -218,7 +219,7 @@ func TestCheckWatcher_HealthyWarning(t *testing.T) {
func TestCheckWatcher_Flapping(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)

check1 := testCheck()
check1.CheckRestart.Grace = 0
Expand Down Expand Up @@ -247,7 +248,7 @@ func TestCheckWatcher_Flapping(t *testing.T) {
func TestCheckWatcher_Unwatch(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)

// Unwatch immediately
check1 := testCheck()
Expand Down Expand Up @@ -276,7 +277,7 @@ func TestCheckWatcher_Unwatch(t *testing.T) {
func TestCheckWatcher_MultipleChecks(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)

check1 := testCheck()
check1.CheckRestart.Limit = 1
Expand Down
26 changes: 20 additions & 6 deletions command/agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ const (
// defaultMaxRetryInterval is the default max retry interval.
defaultMaxRetryInterval = 30 * time.Second

// defaultPeriodicalInterval is the interval at which the service
// client reconciles state between the desired services and checks and
// what's actually registered in Consul. This is done at an interval,
// rather than being purely edge triggered, to handle the case that the
// Consul agent's state may change underneath us
defaultPeriodicInterval = 30 * time.Second

// ttlCheckBuffer is the time interval that Nomad can take to report Consul
// the check result
ttlCheckBuffer = 31 * time.Second
Expand Down Expand Up @@ -190,6 +197,7 @@ type ServiceClient struct {
logger *log.Logger
retryInterval time.Duration
maxRetryInterval time.Duration
periodicInterval time.Duration

// exitCh is closed when the main Run loop exits
exitCh chan struct{}
Expand Down Expand Up @@ -235,6 +243,7 @@ func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient
logger: logger,
retryInterval: defaultRetryInterval,
maxRetryInterval: defaultMaxRetryInterval,
periodicInterval: defaultPeriodicInterval,
exitCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait,
Expand Down Expand Up @@ -279,7 +288,6 @@ func (c *ServiceClient) Run() {

// Process operations while waiting for initial contact with Consul but
// do not sync until contact has been made.
hasOps := false
INIT:
for {
select {
Expand All @@ -289,7 +297,6 @@ INIT:
case <-c.shutdownCh:
return
case ops := <-c.opCh:
hasOps = true
c.merge(ops)
}
}
Expand All @@ -299,11 +306,8 @@ INIT:
// Start checkWatcher
go c.checkWatcher.Run(ctx)

// Always immediately sync to reconcile Nomad and Consul's state
retryTimer := time.NewTimer(0)
if !hasOps {
// No pending operations so don't immediately sync
<-retryTimer.C
}

failures := 0
for {
Expand Down Expand Up @@ -345,6 +349,16 @@ INIT:
c.logger.Printf("[INFO] consul.sync: successfully updated services in Consul")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a DEBUG?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, github's default hiding makes it appear noisier than it really is. It only logs when failures > 0 which means it logs as an indication that Consul has recovered.

In a steady state it will not log. I kind of like it because otherwise you would only see Consul syncing failures in the logs and not a clear success message.

failures = 0
}

// Reset timer to periodic interval to periodically
// reconile with Consul
if !retryTimer.Stop() {
select {
case <-retryTimer.C:
default:
}
}
retryTimer.Reset(c.periodicInterval)
}

select {
Expand Down
11 changes: 6 additions & 5 deletions command/agent/consul/script_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/testtask"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestConsulScript_Exec_Cancel(t *testing.T) {
exec := newBlockingScriptExec()

// pass nil for heartbeater as it shouldn't be called
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testLogger(), nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testlog.Logger(t), nil)
handle := check.run()

// wait until Exec is called
Expand Down Expand Up @@ -111,7 +112,7 @@ func TestConsulScript_Exec_Timeout(t *testing.T) {
exec := newBlockingScriptExec()

hb := newFakeHeartbeater()
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), nil)
handle := check.run()
defer handle.cancel() // just-in-case cleanup
<-exec.running
Expand Down Expand Up @@ -160,7 +161,7 @@ func TestConsulScript_Exec_TimeoutCritical(t *testing.T) {
Timeout: time.Nanosecond,
}
hb := newFakeHeartbeater()
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testLogger(), nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testlog.Logger(t), nil)
handle := check.run()
defer handle.cancel() // just-in-case cleanup

Expand Down Expand Up @@ -205,7 +206,7 @@ func TestConsulScript_Exec_Shutdown(t *testing.T) {
hb := newFakeHeartbeater()
shutdown := make(chan struct{})
exec := newSimpleExec(0, nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), shutdown)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), shutdown)
handle := check.run()
defer handle.cancel() // just-in-case cleanup

Expand Down Expand Up @@ -242,7 +243,7 @@ func TestConsulScript_Exec_Codes(t *testing.T) {
hb := newFakeHeartbeater()
shutdown := make(chan struct{})
exec := newSimpleExec(code, err)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), shutdown)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), shutdown)
handle := check.run()
defer handle.cancel()

Expand Down
Loading