From 972e861410831aa1b9a7eab124b584d2d6c7fddd Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 17 Apr 2018 12:36:50 -0700 Subject: [PATCH] consul: periodically reconcile services/checks Periodically sync services and checks from Nomad to Consul. This is mostly useful when testing with the Consul dev agent which does not persist state across restarts. However, this is a reasonable safety measure to prevent skew between Consul's state and Nomad's services+checks. Also modernized the test suite a bit. --- command/agent/consul/catalog_testing.go | 28 ++++++++- command/agent/consul/check_watcher_test.go | 17 ++--- command/agent/consul/client.go | 26 ++++++-- command/agent/consul/script_test.go | 11 ++-- command/agent/consul/unit_test.go | 73 ++++++++++++++-------- 5 files changed, 109 insertions(+), 46 deletions(-) diff --git a/command/agent/consul/catalog_testing.go b/command/agent/consul/catalog_testing.go index 29abb5d90a2..7cd2b833f81 100644 --- a/command/agent/consul/catalog_testing.go +++ b/command/agent/consul/catalog_testing.go @@ -33,7 +33,12 @@ type MockAgent struct { // maps of what services and checks have been registered services map[string]*api.AgentServiceRegistration checks map[string]*api.AgentCheckRegistration - mu sync.Mutex + + // hits is the total number of times agent methods have been called + hits int + + // mu guards above fields + mu sync.Mutex // when UpdateTTL is called the check ID will have its counter inc'd checkTTLs map[string]int @@ -52,6 +57,13 @@ func NewMockAgent() *MockAgent { } } +// getHits returns how many Consul Agent API calls have been made. +func (c *MockAgent) getHits() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.hits +} + // SetStatus that Checks() should return. Returns old status value. func (c *MockAgent) SetStatus(s string) string { c.mu.Lock() @@ -62,6 +74,10 @@ func (c *MockAgent) SetStatus(s string) string { } func (c *MockAgent) Self() (map[string]map[string]interface{}, error) { + c.mu.Lock() + defer c.mu.Unlock() + c.hits++ + s := map[string]map[string]interface{}{ "Member": { "Addr": "127.0.0.1", @@ -85,6 +101,7 @@ func (c *MockAgent) Self() (map[string]map[string]interface{}, error) { func (c *MockAgent) Services() (map[string]*api.AgentService, error) { c.mu.Lock() defer c.mu.Unlock() + c.hits++ r := make(map[string]*api.AgentService, len(c.services)) for k, v := range c.services { @@ -105,6 +122,7 @@ func (c *MockAgent) Services() (map[string]*api.AgentService, error) { func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) { c.mu.Lock() defer c.mu.Unlock() + c.hits++ r := make(map[string]*api.AgentCheck, len(c.checks)) for k, v := range c.checks { @@ -125,7 +143,6 @@ func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) { func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration { c.mu.Lock() defer c.mu.Unlock() - regs := make([]*api.AgentCheckRegistration, 0, len(c.checks)) for _, check := range c.checks { regs = append(regs, check) @@ -136,6 +153,8 @@ func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration { func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error { c.mu.Lock() defer c.mu.Unlock() + c.hits++ + c.checks[check.ID] = check // Be nice and make checks reachable-by-service @@ -147,6 +166,7 @@ func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error { func (c *MockAgent) CheckDeregister(checkID string) error { c.mu.Lock() defer c.mu.Unlock() + c.hits++ delete(c.checks, checkID) delete(c.checkTTLs, checkID) return nil @@ -155,6 +175,7 @@ func (c *MockAgent) CheckDeregister(checkID string) error { func (c *MockAgent) ServiceRegister(service *api.AgentServiceRegistration) error { c.mu.Lock() defer c.mu.Unlock() + c.hits++ c.services[service.ID] = service return nil } @@ -162,6 +183,7 @@ func (c *MockAgent) ServiceRegister(service *api.AgentServiceRegistration) error func (c *MockAgent) ServiceDeregister(serviceID string) error { c.mu.Lock() defer c.mu.Unlock() + c.hits++ delete(c.services, serviceID) return nil } @@ -169,6 +191,8 @@ func (c *MockAgent) ServiceDeregister(serviceID string) error { func (c *MockAgent) UpdateTTL(id string, output string, status string) error { c.mu.Lock() defer c.mu.Unlock() + c.hits++ + check, ok := c.checks[id] if !ok { return fmt.Errorf("unknown check id: %q", id) diff --git a/command/agent/consul/check_watcher_test.go b/command/agent/consul/check_watcher_test.go index ccc06b5e6fa..7e0baf9e75c 100644 --- a/command/agent/consul/check_watcher_test.go +++ b/command/agent/consul/check_watcher_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" ) @@ -113,9 +114,9 @@ func (c *fakeChecksAPI) Checks() (map[string]*api.AgentCheck, error) { // testWatcherSetup sets up a fakeChecksAPI and a real checkWatcher with a test // logger and faster poll frequency. -func testWatcherSetup() (*fakeChecksAPI, *checkWatcher) { +func testWatcherSetup(t *testing.T) (*fakeChecksAPI, *checkWatcher) { fakeAPI := newFakeChecksAPI() - cw := newCheckWatcher(testLogger(), fakeAPI) + cw := newCheckWatcher(testlog.Logger(t), fakeAPI) cw.pollFreq = 10 * time.Millisecond return fakeAPI, cw } @@ -141,7 +142,7 @@ func TestCheckWatcher_Skip(t *testing.T) { check := testCheck() check.CheckRestart = nil - cw := newCheckWatcher(testLogger(), newFakeChecksAPI()) + cw := newCheckWatcher(testlog.Logger(t), newFakeChecksAPI()) restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check) cw.Watch("testalloc1", "testtask1", "testcheck1", check, restarter1) @@ -155,7 +156,7 @@ func TestCheckWatcher_Skip(t *testing.T) { func TestCheckWatcher_Healthy(t *testing.T) { t.Parallel() - fakeAPI, cw := testWatcherSetup() + fakeAPI, cw := testWatcherSetup(t) check1 := testCheck() restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1) @@ -190,7 +191,7 @@ func TestCheckWatcher_Healthy(t *testing.T) { func TestCheckWatcher_HealthyWarning(t *testing.T) { t.Parallel() - fakeAPI, cw := testWatcherSetup() + fakeAPI, cw := testWatcherSetup(t) check1 := testCheck() check1.CheckRestart.Limit = 1 @@ -218,7 +219,7 @@ func TestCheckWatcher_HealthyWarning(t *testing.T) { func TestCheckWatcher_Flapping(t *testing.T) { t.Parallel() - fakeAPI, cw := testWatcherSetup() + fakeAPI, cw := testWatcherSetup(t) check1 := testCheck() check1.CheckRestart.Grace = 0 @@ -247,7 +248,7 @@ func TestCheckWatcher_Flapping(t *testing.T) { func TestCheckWatcher_Unwatch(t *testing.T) { t.Parallel() - fakeAPI, cw := testWatcherSetup() + fakeAPI, cw := testWatcherSetup(t) // Unwatch immediately check1 := testCheck() @@ -276,7 +277,7 @@ func TestCheckWatcher_Unwatch(t *testing.T) { func TestCheckWatcher_MultipleChecks(t *testing.T) { t.Parallel() - fakeAPI, cw := testWatcherSetup() + fakeAPI, cw := testWatcherSetup(t) check1 := testCheck() check1.CheckRestart.Limit = 1 diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 13f44c5a5ed..cc0a26d411e 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -36,6 +36,13 @@ const ( // defaultMaxRetryInterval is the default max retry interval. defaultMaxRetryInterval = 30 * time.Second + // defaultPeriodicalInterval is the interval at which the service + // client reconciles state between the desired services and checks and + // what's actually registered in Consul. This is done at an interval, + // rather than being purely edge triggered, to handle the case that the + // Consul agent's state may change underneath us + defaultPeriodicInterval = 30 * time.Second + // ttlCheckBuffer is the time interval that Nomad can take to report Consul // the check result ttlCheckBuffer = 31 * time.Second @@ -190,6 +197,7 @@ type ServiceClient struct { logger *log.Logger retryInterval time.Duration maxRetryInterval time.Duration + periodicInterval time.Duration // exitCh is closed when the main Run loop exits exitCh chan struct{} @@ -235,6 +243,7 @@ func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient logger: logger, retryInterval: defaultRetryInterval, maxRetryInterval: defaultMaxRetryInterval, + periodicInterval: defaultPeriodicInterval, exitCh: make(chan struct{}), shutdownCh: make(chan struct{}), shutdownWait: defaultShutdownWait, @@ -279,7 +288,6 @@ func (c *ServiceClient) Run() { // Process operations while waiting for initial contact with Consul but // do not sync until contact has been made. - hasOps := false INIT: for { select { @@ -289,7 +297,6 @@ INIT: case <-c.shutdownCh: return case ops := <-c.opCh: - hasOps = true c.merge(ops) } } @@ -299,11 +306,8 @@ INIT: // Start checkWatcher go c.checkWatcher.Run(ctx) + // Always immediately sync to reconcile Nomad and Consul's state retryTimer := time.NewTimer(0) - if !hasOps { - // No pending operations so don't immediately sync - <-retryTimer.C - } failures := 0 for { @@ -345,6 +349,16 @@ INIT: c.logger.Printf("[INFO] consul.sync: successfully updated services in Consul") failures = 0 } + + // Reset timer to periodic interval to periodically + // reconile with Consul + if !retryTimer.Stop() { + select { + case <-retryTimer.C: + default: + } + } + retryTimer.Reset(c.periodicInterval) } select { diff --git a/command/agent/consul/script_test.go b/command/agent/consul/script_test.go index 46f90a8ca4a..3d4331ef09b 100644 --- a/command/agent/consul/script_test.go +++ b/command/agent/consul/script_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/testtask" "github.com/hashicorp/nomad/nomad/structs" ) @@ -59,7 +60,7 @@ func TestConsulScript_Exec_Cancel(t *testing.T) { exec := newBlockingScriptExec() // pass nil for heartbeater as it shouldn't be called - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testLogger(), nil) + check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testlog.Logger(t), nil) handle := check.run() // wait until Exec is called @@ -111,7 +112,7 @@ func TestConsulScript_Exec_Timeout(t *testing.T) { exec := newBlockingScriptExec() hb := newFakeHeartbeater() - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), nil) + check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), nil) handle := check.run() defer handle.cancel() // just-in-case cleanup <-exec.running @@ -160,7 +161,7 @@ func TestConsulScript_Exec_TimeoutCritical(t *testing.T) { Timeout: time.Nanosecond, } hb := newFakeHeartbeater() - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testLogger(), nil) + check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testlog.Logger(t), nil) handle := check.run() defer handle.cancel() // just-in-case cleanup @@ -205,7 +206,7 @@ func TestConsulScript_Exec_Shutdown(t *testing.T) { hb := newFakeHeartbeater() shutdown := make(chan struct{}) exec := newSimpleExec(0, nil) - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), shutdown) + check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), shutdown) handle := check.run() defer handle.cancel() // just-in-case cleanup @@ -242,7 +243,7 @@ func TestConsulScript_Exec_Codes(t *testing.T) { hb := newFakeHeartbeater() shutdown := make(chan struct{}) exec := newSimpleExec(code, err) - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), shutdown) + check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), shutdown) handle := check.run() defer handle.cancel() diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index 91ec3a2b3d0..9f72ee2ea21 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -3,9 +3,6 @@ package consul import ( "context" "fmt" - "io/ioutil" - "log" - "os" "reflect" "strings" "sync/atomic" @@ -14,6 +11,7 @@ import ( "github.com/hashicorp/consul/api" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/kr/pretty" @@ -27,13 +25,6 @@ const ( yPort = 1235 ) -func testLogger() *log.Logger { - if testing.Verbose() { - return log.New(os.Stderr, "", log.LstdFlags) - } - return log.New(ioutil.Discard, "", 0) -} - func testTask() *structs.Task { return &structs.Task{ Name: "taskname", @@ -112,10 +103,10 @@ func (t *testFakeCtx) syncOnce() error { // setupFake creates a testFakeCtx with a ServiceClient backed by a fakeConsul. // A test Task is also provided. -func setupFake() *testFakeCtx { +func setupFake(t *testing.T) *testFakeCtx { fc := NewMockAgent() return &testFakeCtx{ - ServiceClient: NewServiceClient(fc, testLogger()), + ServiceClient: NewServiceClient(fc, testlog.Logger(t)), FakeConsul: fc, Task: testTask(), Restarter: &restartRecorder{}, @@ -124,7 +115,7 @@ func setupFake() *testFakeCtx { } func TestConsul_ChangeTags(t *testing.T) { - ctx := setupFake() + ctx := setupFake(t) allocID := "allocid" if err := ctx.ServiceClient.RegisterTask(allocID, ctx.Task, ctx.Restarter, nil, nil); err != nil { @@ -225,7 +216,7 @@ func TestConsul_ChangeTags(t *testing.T) { // it in Consul. Pre-0.7.1 ports were not part of the service ID and this was a // slightly different code path than changing tags. func TestConsul_ChangePorts(t *testing.T) { - ctx := setupFake() + ctx := setupFake(t) ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ { Name: "c1", @@ -406,7 +397,7 @@ func TestConsul_ChangePorts(t *testing.T) { // TestConsul_ChangeChecks asserts that updating only the checks on a service // properly syncs with Consul. func TestConsul_ChangeChecks(t *testing.T) { - ctx := setupFake() + ctx := setupFake(t) ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ { Name: "c1", @@ -641,7 +632,7 @@ func TestConsul_ChangeChecks(t *testing.T) { // TestConsul_RegServices tests basic service registration. func TestConsul_RegServices(t *testing.T) { - ctx := setupFake() + ctx := setupFake(t) // Add a check w/restarting ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ @@ -778,7 +769,7 @@ func TestConsul_RegServices(t *testing.T) { // ServiceClient. func TestConsul_ShutdownOK(t *testing.T) { require := require.New(t) - ctx := setupFake() + ctx := setupFake(t) // Add a script check to make sure its TTL gets updated ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ @@ -840,8 +831,8 @@ func TestConsul_ShutdownOK(t *testing.T) { // TestConsul_ShutdownSlow tests the slow but ok path for the shutdown logic in // ServiceClient. func TestConsul_ShutdownSlow(t *testing.T) { - t.Parallel() // run the slow tests in parallel - ctx := setupFake() + t.Parallel() + ctx := setupFake(t) // Add a script check to make sure its TTL gets updated ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ @@ -912,8 +903,8 @@ func TestConsul_ShutdownSlow(t *testing.T) { // TestConsul_ShutdownBlocked tests the blocked past deadline path for the // shutdown logic in ServiceClient. func TestConsul_ShutdownBlocked(t *testing.T) { - t.Parallel() // run the slow tests in parallel - ctx := setupFake() + t.Parallel() + ctx := setupFake(t) // Add a script check to make sure its TTL gets updated ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ @@ -981,7 +972,7 @@ func TestConsul_ShutdownBlocked(t *testing.T) { // TestConsul_RemoveScript assert removing a script check removes all objects // related to that check. func TestConsul_CancelScript(t *testing.T) { - ctx := setupFake() + ctx := setupFake(t) ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ { Name: "scriptcheckDel", @@ -1069,7 +1060,8 @@ func TestConsul_CancelScript(t *testing.T) { // auto-use set then services should advertise it unless explicitly set to // host. Checks should always use host. func TestConsul_DriverNetwork_AutoUse(t *testing.T) { - ctx := setupFake() + t.Parallel() + ctx := setupFake(t) ctx.Task.Services = []*structs.Service{ { @@ -1195,7 +1187,8 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) { // set auto-use only services which request the driver's network should // advertise it. func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) { - ctx := setupFake() + t.Parallel() + ctx := setupFake(t) ctx.Task.Services = []*structs.Service{ { @@ -1268,7 +1261,8 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) { // TestConsul_DriverNetwork_Change asserts that if a driver network is // specified and a service updates its use its properly updated in Consul. func TestConsul_DriverNetwork_Change(t *testing.T) { - ctx := setupFake() + t.Parallel() + ctx := setupFake(t) ctx.Task.Services = []*structs.Service{ { @@ -1337,9 +1331,38 @@ func TestConsul_DriverNetwork_Change(t *testing.T) { syncAndAssertPort(net.PortMap["x"]) } +// TestConsul_PeriodicSync asserts that Nomad periodically reconciles with +// Consul. +func TestConsul_PeriodicSync(t *testing.T) { + t.Parallel() + + ctx := setupFake(t) + defer ctx.ServiceClient.Shutdown() + + // Lower periodic sync interval to speed up test + ctx.ServiceClient.periodicInterval = 2 * time.Millisecond + + // Run for 10ms and assert hits >= 5 because each sync() calls multiple + // Consul APIs + go ctx.ServiceClient.Run() + + select { + case <-ctx.ServiceClient.exitCh: + t.Fatalf("exited unexpectedly") + case <-time.After(10 * time.Millisecond): + } + + minHits := 5 + if hits := ctx.FakeConsul.getHits(); hits < minHits { + t.Fatalf("expected at least %d hits but found %d", minHits, hits) + } +} + // TestIsNomadService asserts the isNomadService helper returns true for Nomad // task IDs and false for unknown IDs and Nomad agent IDs (see #2827). func TestIsNomadService(t *testing.T) { + t.Parallel() + tests := []struct { id string result bool