diff --git a/client/client.go b/client/client.go index e27d2d62caf..785936db98e 100644 --- a/client/client.go +++ b/client/client.go @@ -229,7 +229,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic } fingerprintManager := NewFingerprintManager(c.GetConfig, c.config.Node, - c.shutdownCh, c.updateNodeFromFingerprint, c.logger) + c.shutdownCh, c.updateNodeFromFingerprint, c.updateNodeFromHealthCheck, c.logger) // Fingerprint the node and scan for drivers if err := fingerprintManager.Run(); err != nil { @@ -856,6 +856,9 @@ func (c *Client) setupNode() error { if node.Links == nil { node.Links = make(map[string]string) } + if node.Drivers == nil { + node.Drivers = make(map[string]*structs.DriverInfo) + } if node.Meta == nil { node.Meta = make(map[string]string) } @@ -948,6 +951,19 @@ func (c *Client) updateNodeFromFingerprint(response *cstructs.FingerprintRespons if response.Resources != nil { c.config.Node.Resources.Merge(response.Resources) } + + return c.config.Node +} + +func (c *Client) updateNodeFromHealthCheck(response *cstructs.HealthCheckResponse) *structs.Node { + c.configLock.Lock() + defer c.configLock.Unlock() + + // update the node with the latest driver health information + for name, val := range response.Drivers { + c.config.Node.Drivers[name] = val + } + return c.config.Node } diff --git a/client/driver/docker.go b/client/driver/docker.go index 9851fa40a2e..be7b5647cc4 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -492,7 +492,6 @@ func (d *DockerDriver) Fingerprint(req *cstructs.FingerprintRequest, resp *cstru d.logger.Printf("[INFO] driver.docker: failed to initialize client: %s", err) } d.fingerprintSuccess = helper.BoolToPtr(false) - resp.RemoveAttribute(dockerDriverAttr) return nil } @@ -552,6 +551,33 @@ func (d *DockerDriver) Fingerprint(req *cstructs.FingerprintRequest, resp *cstru return nil } +func (d *DockerDriver) Check(req *cstructs.HealthCheckRequest, resp *cstructs.HealthCheckResponse) error { + unhealthy := &structs.DriverInfo{ + HealthDescription: "Docker driver is available but unresponsive", + UpdateTime: time.Now(), + } + + _, err := client.ListContainers(docker.ListContainersOptions{}) + if err != nil { + d.logger.Printf("[WARN] driver.docker: docker driver is available but is unresponsive to `docker ps`") + resp.AddDriverInfo("driver.docker", unhealthy) + return err + } + + d.logger.Printf("[DEBUG] driver.docker: docker driver is available and is responsive to `docker ps`") + healthy := &structs.DriverInfo{ + Healthy: true, + HealthDescription: "Docker driver is available and responsive", + UpdateTime: time.Now(), + } + resp.AddDriverInfo("driver.docker", healthy) + return nil +} + +func (d *DockerDriver) CheckHealthPeriodic() (bool, time.Duration) { + return true, 1 * time.Minute +} + // Validate is used to validate the driver configuration func (d *DockerDriver) Validate(config map[string]interface{}) error { fd := &fields.FieldData{ diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index d8abea032fb..80be88e4a40 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -21,6 +21,7 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/env" + "github.com/hashicorp/nomad/client/fingerprint" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/testutil" "github.com/hashicorp/nomad/helper/uuid" @@ -164,6 +165,7 @@ func TestDockerDriver_Fingerprint(t *testing.T) { if !tu.IsTravis() { t.Parallel() } + ctx := testDockerDriverContexts(t, &structs.Task{Name: "foo", Driver: "docker", Resources: basicResources}) //ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} defer ctx.AllocDir.Destroy() @@ -227,6 +229,7 @@ func TestDockerDriver_Fingerprint_Bridge(t *testing.T) { request := &cstructs.FingerprintRequest{Config: conf, Node: conf.Node} var response cstructs.FingerprintResponse + err = dd.Fingerprint(request, &response) if err != nil { t.Fatalf("error fingerprinting docker: %v", err) @@ -251,6 +254,46 @@ func TestDockerDriver_Fingerprint_Bridge(t *testing.T) { t.Logf("docker bridge ip: %q", attributes["driver.docker.bridge_ip"]) } +func TestDockerDriver_Check_DockerHealthStatus(t *testing.T) { + if !tu.IsTravis() { + t.Parallel() + } + if !testutil.DockerIsConnected(t) { + t.Skip("requires Docker") + } + if runtime.GOOS != "linux" { + t.Skip("expect only on linux") + } + + require := require.New(t) + + // This seems fragile, so we might need to reconsider this test if it + // proves flaky + expectedAddr, err := sockaddr.GetInterfaceIP("docker0") + if err != nil { + t.Fatalf("unable to get ip for docker0: %v", err) + } + if expectedAddr == "" { + t.Fatalf("unable to get ip for docker bridge") + } + + conf := testConfig(t) + conf.Node = mock.Node() + dd := NewDockerDriver(NewDriverContext("", "", conf, conf.Node, testLogger(), nil)) + + request := &cstructs.HealthCheckRequest{} + var response cstructs.HealthCheckResponse + + dc, ok := dd.(fingerprint.HealthCheck) + require.True(ok) + err = dc.Check(request, &response) + require.Nil(err) + + driverInfo := response.Drivers["driver.docker"] + require.NotNil(driverInfo) + require.True(driverInfo.Healthy) +} + func TestDockerDriver_StartOpen_Wait(t *testing.T) { if !tu.IsTravis() { t.Parallel() diff --git a/client/driver/mock_driver.go b/client/driver/mock_driver.go index 15cc56b5b41..03811ef3dd4 100644 --- a/client/driver/mock_driver.go +++ b/client/driver/mock_driver.go @@ -30,6 +30,8 @@ const ( // to "stop" a previously functioning driver after the specified duration // (specified in seconds) for testing of periodic drivers and fingerprinters. ShutdownPeriodicDuration = "test.shutdown_periodic_duration" + + mockDriverName = "driver.mock_driver" ) // Add the mock driver to the list of builtin drivers @@ -225,14 +227,43 @@ func (m *MockDriver) Fingerprint(req *cstructs.FingerprintRequest, resp *cstruct // current time is after the time which the node should shut down, simulate // driver failure case !m.shutdownFingerprintTime.IsZero() && time.Now().After(m.shutdownFingerprintTime): - resp.RemoveAttribute("driver.mock_driver") + resp.RemoveAttribute(mockDriverName) default: - resp.AddAttribute("driver.mock_driver", "1") + resp.AddAttribute(mockDriverName, "1") resp.Detected = true } return nil } +// Check implements the interface for HealthCheck, and indicates the current +// health status of the mock driver. +func (m *MockDriver) Check(req *cstructs.HealthCheckRequest, resp *cstructs.HealthCheckResponse) error { + if !m.shutdownFingerprintTime.IsZero() && time.Now().After(m.shutdownFingerprintTime) { + notHealthy := &structs.DriverInfo{ + Healthy: false, + HealthDescription: "not running", + UpdateTime: time.Now(), + } + resp.AddDriverInfo(mockDriverName, notHealthy) + return nil + } + healthy := &structs.DriverInfo{ + Healthy: true, + HealthDescription: "running", + UpdateTime: time.Now(), + } + resp.AddDriverInfo(mockDriverName, healthy) + return nil +} + +// CheckHealthPeriodic implements the interface for HealthCheck and indicates +// that mock driver should be checked periodically. Returns a boolean +// indicating if ti should be checked, and the duration at which to do this +// check. +func (m *MockDriver) CheckHealthPeriodic() (bool, time.Duration) { + return true, 1 * time.Second +} + // MockDriverHandle is a driver handler which supervises a mock task type mockDriverHandle struct { taskName string diff --git a/client/fingerprint/fingerprint.go b/client/fingerprint/fingerprint.go index 8a3477f5174..322788462a6 100644 --- a/client/fingerprint/fingerprint.go +++ b/client/fingerprint/fingerprint.go @@ -85,6 +85,21 @@ func NewFingerprint(name string, logger *log.Logger) (Fingerprint, error) { // Factory is used to instantiate a new Fingerprint type Factory func(*log.Logger) Fingerprint +// HealthCheck is used for doing periodic health checks. On a given time +// interfal, a health check will be called by the fingerprint manager of the +// node. +type HealthCheck interface { + // Check is used to update properties of the node on the status of the health + // check + Check(*cstructs.HealthCheckRequest, *cstructs.HealthCheckResponse) error + + // CheckHealthPeriodic is a mechanism for the health checker to indicate that + // it should be run periodically. The return value is a boolean indicating + // whether it should be done periodically, and the time interval at which + // this check should happen. + CheckHealthPeriodic() (bool, time.Duration) +} + // Fingerprint is used for doing "fingerprinting" of the // host to automatically determine attributes, resources, // and metadata about it. Each of these is a heuristic, and diff --git a/client/fingerprint_manager.go b/client/fingerprint_manager.go index a191555aaad..fcec58050ae 100644 --- a/client/fingerprint_manager.go +++ b/client/fingerprint_manager.go @@ -22,8 +22,9 @@ type FingerprintManager struct { // updateNode is a callback to the client to update the state of its // associated node - updateNode func(*cstructs.FingerprintResponse) *structs.Node - logger *log.Logger + updateNode func(*cstructs.FingerprintResponse) *structs.Node + updateHealthCheck func(*cstructs.HealthCheckResponse) *structs.Node + logger *log.Logger } // NewFingerprintManager is a constructor that creates and returns an instance @@ -32,18 +33,20 @@ func NewFingerprintManager(getConfig func() *config.Config, node *structs.Node, shutdownCh chan struct{}, updateNode func(*cstructs.FingerprintResponse) *structs.Node, + updateHealthCheck func(*cstructs.HealthCheckResponse) *structs.Node, logger *log.Logger) *FingerprintManager { return &FingerprintManager{ - getConfig: getConfig, - updateNode: updateNode, - node: node, - shutdownCh: shutdownCh, - logger: logger, + getConfig: getConfig, + updateNode: updateNode, + updateHealthCheck: updateHealthCheck, + node: node, + shutdownCh: shutdownCh, + logger: logger, } } -// run runs each fingerprinter individually on an ongoing basis -func (fm *FingerprintManager) run(f fingerprint.Fingerprint, period time.Duration, name string) { +// runFingerprint runs each fingerprinter individually on an ongoing basis +func (fm *FingerprintManager) runFingerprint(f fingerprint.Fingerprint, period time.Duration, name string) { fm.logger.Printf("[DEBUG] client.fingerprint_manager: fingerprinting %s every %v", name, period) for { @@ -61,6 +64,25 @@ func (fm *FingerprintManager) run(f fingerprint.Fingerprint, period time.Duratio } } +// runHealthCheck runs each health check individually on an ongoing basis +func (fm *FingerprintManager) runHealthCheck(hc fingerprint.HealthCheck, period time.Duration, name string) { + fm.logger.Printf("[DEBUG] client.fingerprint_manager: healthchecking %s every %v", name, period) + + for { + select { + case <-time.After(period): + err := fm.healthCheck(name, hc) + if err != nil { + fm.logger.Printf("[DEBUG] client.fingerprint_manager: health checking for %v failed: %+v", name, err) + continue + } + + case <-fm.shutdownCh: + return + } + } +} + // setupDrivers is used to fingerprint the node to see if these drivers are // supported func (fm *FingerprintManager) setupDrivers(drivers []string) error { @@ -86,7 +108,13 @@ func (fm *FingerprintManager) setupDrivers(drivers []string) error { p, period := d.Periodic() if p { - go fm.run(d, period, name) + go fm.runFingerprint(d, period, name) + } + + if hc, ok := d.(fingerprint.HealthCheck); ok { + if checkPeriodic, interval := hc.CheckHealthPeriodic(); checkPeriodic { + go fm.runHealthCheck(hc, interval, name) + } } } @@ -113,6 +141,23 @@ func (fm *FingerprintManager) fingerprint(name string, f fingerprint.Fingerprint return response.Detected, nil } +// healthcheck checks the health of the specified resource. +func (fm *FingerprintManager) healthCheck(name string, hc fingerprint.HealthCheck) error { + request := &cstructs.HealthCheckRequest{} + var response cstructs.HealthCheckResponse + if err := hc.Check(request, &response); err != nil { + return err + } + + fm.nodeLock.Lock() + if node := fm.updateHealthCheck(&response); node != nil { + fm.node = node + } + fm.nodeLock.Unlock() + + return nil +} + // setupFingerprints is used to fingerprint the node to see if these attributes are // supported func (fm *FingerprintManager) setupFingerprinters(fingerprints []string) error { @@ -138,7 +183,13 @@ func (fm *FingerprintManager) setupFingerprinters(fingerprints []string) error { p, period := f.Periodic() if p { - go fm.run(f, period, name) + go fm.runFingerprint(f, period, name) + } + + if hc, ok := f.(fingerprint.HealthCheck); ok { + if checkPeriodic, interval := hc.CheckHealthPeriodic(); checkPeriodic { + go fm.runHealthCheck(hc, interval, name) + } } } diff --git a/client/fingerprint_manager_test.go b/client/fingerprint_manager_test.go index fe37966e3e3..268fb21099e 100644 --- a/client/fingerprint_manager_test.go +++ b/client/fingerprint_manager_test.go @@ -27,6 +27,10 @@ func TestFingerprintManager_Run_MockDriver(t *testing.T) { } return node } + + updateHealthCheck := func(resp *cstructs.HealthCheckResponse) *structs.Node { + return node + } conf := config.DefaultConfig() getConfig := func() *config.Config { return conf @@ -37,6 +41,7 @@ func TestFingerprintManager_Run_MockDriver(t *testing.T) { node, make(chan struct{}), updateNode, + updateHealthCheck, testLogger(), ) @@ -65,12 +70,16 @@ func TestFingerprintManager_Fingerprint_Run(t *testing.T) { } return node } + updateHealthCheck := func(resp *cstructs.HealthCheckResponse) *structs.Node { + return node + } fm := NewFingerprintManager( getConfig, node, make(chan struct{}), updateNode, + updateHealthCheck, testLogger(), ) @@ -93,6 +102,9 @@ func TestFingerprintManager_Fingerprint_Periodic(t *testing.T) { } return node } + updateHealthCheck := func(resp *cstructs.HealthCheckResponse) *structs.Node { + return node + } conf := config.DefaultConfig() conf.Options = map[string]string{ "test.shutdown_periodic_after": "true", @@ -112,13 +124,14 @@ func TestFingerprintManager_Fingerprint_Periodic(t *testing.T) { node, shutdownCh, updateNode, + updateHealthCheck, testLogger(), ) err := fm.Run() require.Nil(err) - // Ensure the mock driver is registered on the client + // Ensure the mock driver is registered and healthy on the client testutil.WaitForResult(func() (bool, error) { mockDriverStatus := node.Attributes["driver.mock_driver"] if mockDriverStatus == "" { @@ -129,7 +142,8 @@ func TestFingerprintManager_Fingerprint_Periodic(t *testing.T) { t.Fatalf("err: %v", err) }) - // Ensure that the client fingerprinter eventually removes this attribute + // Ensure that the client fingerprinter eventually removes this attribute and + // marks the driver as unhealthy testutil.WaitForResult(func() (bool, error) { mockDriverStatus := node.Attributes["driver.mock_driver"] if mockDriverStatus != "" { @@ -141,6 +155,78 @@ func TestFingerprintManager_Fingerprint_Periodic(t *testing.T) { }) } +func TestFingerprintManager_HealthCheck_Periodic(t *testing.T) { + t.Parallel() + require := require.New(t) + + node := &structs.Node{ + Drivers: make(map[string]*structs.DriverInfo, 0), + } + updateNode := func(r *cstructs.FingerprintResponse) *structs.Node { + return node + } + updateHealthCheck := func(resp *cstructs.HealthCheckResponse) *structs.Node { + for k, v := range resp.Drivers { + node.Drivers[k] = v + } + return node + } + conf := config.DefaultConfig() + conf.Options = map[string]string{ + "test.shutdown_periodic_after": "true", + "test.shutdown_periodic_duration": "2", + } + getConfig := func() *config.Config { + return conf + } + + shutdownCh := make(chan struct{}) + defer (func() { + close(shutdownCh) + })() + + fm := NewFingerprintManager( + getConfig, + node, + shutdownCh, + updateNode, + updateHealthCheck, + testLogger(), + ) + + err := fm.Run() + require.Nil(err) + + // Ensure the mock driver is registered and healthy on the client + testutil.WaitForResult(func() (bool, error) { + mockDriverInfo := node.Drivers["driver.mock_driver"] + if mockDriverInfo == nil { + return false, fmt.Errorf("mock driver info should be set on the client") + } + if !mockDriverInfo.Healthy { + return false, fmt.Errorf("mock driver info should be healthy") + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Ensure that the client health check eventually removes this attribute and + // marks the driver as unhealthy + testutil.WaitForResult(func() (bool, error) { + mockDriverInfo := node.Drivers["driver.mock_driver"] + if mockDriverInfo == nil { + return false, fmt.Errorf("mock driver info should be set on the client") + } + if mockDriverInfo.Healthy { + return false, fmt.Errorf("mock driver info should not be healthy") + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + func TestFimgerprintManager_Run_InWhitelist(t *testing.T) { t.Parallel() require := require.New(t) @@ -154,6 +240,9 @@ func TestFimgerprintManager_Run_InWhitelist(t *testing.T) { } return node } + updateHealthCheck := func(resp *cstructs.HealthCheckResponse) *structs.Node { + return node + } conf := config.DefaultConfig() conf.Options = map[string]string{"fingerprint.whitelist": " arch,cpu,memory,network,storage,foo,bar "} getConfig := func() *config.Config { @@ -170,6 +259,7 @@ func TestFimgerprintManager_Run_InWhitelist(t *testing.T) { node, shutdownCh, updateNode, + updateHealthCheck, testLogger(), ) @@ -191,6 +281,9 @@ func TestFimgerprintManager_Run_InBlacklist(t *testing.T) { } return node } + updateHealthCheck := func(resp *cstructs.HealthCheckResponse) *structs.Node { + return node + } conf := config.DefaultConfig() conf.Options = map[string]string{"fingerprint.whitelist": " arch,memory,foo,bar "} conf.Options = map[string]string{"fingerprint.blacklist": " cpu "} @@ -208,6 +301,7 @@ func TestFimgerprintManager_Run_InBlacklist(t *testing.T) { node, shutdownCh, updateNode, + updateHealthCheck, testLogger(), ) @@ -230,6 +324,9 @@ func TestFimgerprintManager_Run_Combination(t *testing.T) { } return node } + updateHealthCheck := func(resp *cstructs.HealthCheckResponse) *structs.Node { + return node + } conf := config.DefaultConfig() conf.Options = map[string]string{"fingerprint.whitelist": " arch,cpu,memory,foo,bar "} conf.Options = map[string]string{"fingerprint.blacklist": " memory,nomad "} @@ -247,6 +344,7 @@ func TestFimgerprintManager_Run_Combination(t *testing.T) { node, shutdownCh, updateNode, + updateHealthCheck, testLogger(), ) @@ -271,6 +369,9 @@ func TestFimgerprintManager_Run_WhitelistDrivers(t *testing.T) { } return node } + updateHealthCheck := func(resp *cstructs.HealthCheckResponse) *structs.Node { + return node + } conf := config.DefaultConfig() conf.Options = map[string]string{ "driver.raw_exec.enable": "1", @@ -290,6 +391,7 @@ func TestFimgerprintManager_Run_WhitelistDrivers(t *testing.T) { node, shutdownCh, updateNode, + updateHealthCheck, testLogger(), ) @@ -311,6 +413,9 @@ func TestFimgerprintManager_Run_AllDriversBlacklisted(t *testing.T) { } return node } + updateHealthCheck := func(resp *cstructs.HealthCheckResponse) *structs.Node { + return node + } conf := config.DefaultConfig() conf.Options = map[string]string{ "driver.whitelist": " foo,bar,baz ", @@ -329,6 +434,7 @@ func TestFimgerprintManager_Run_AllDriversBlacklisted(t *testing.T) { node, shutdownCh, updateNode, + updateHealthCheck, testLogger(), ) @@ -352,6 +458,9 @@ func TestFimgerprintManager_Run_DriversWhiteListBlacklistCombination(t *testing. } return node } + updateHealthCheck := func(resp *cstructs.HealthCheckResponse) *structs.Node { + return node + } conf := config.DefaultConfig() conf.Options = map[string]string{ "driver.raw_exec.enable": "1", @@ -372,6 +481,7 @@ func TestFimgerprintManager_Run_DriversWhiteListBlacklistCombination(t *testing. node, shutdownCh, updateNode, + updateHealthCheck, testLogger(), ) @@ -397,6 +507,9 @@ func TestFimgerprintManager_Run_DriversInBlacklist(t *testing.T) { } return node } + updateHealthCheck := func(resp *cstructs.HealthCheckResponse) *structs.Node { + return node + } conf := config.DefaultConfig() conf.Options = map[string]string{ "driver.raw_exec.enable": "1", @@ -417,6 +530,7 @@ func TestFimgerprintManager_Run_DriversInBlacklist(t *testing.T) { node, shutdownCh, updateNode, + updateHealthCheck, testLogger(), ) diff --git a/client/structs/structs.go b/client/structs/structs.go index 97887232de0..758074fd8d1 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -249,3 +249,22 @@ func (f *FingerprintResponse) RemoveLink(name string) { f.Links[name] = "" } + +type HealthCheckRequest struct{} + +type HealthCheckResponse struct { + + // Drivers is a map of driver names to current driver information + Drivers map[string]*structs.DriverInfo +} + +// AddDriverInfo adds information about a driver to the fingerprint response. +// If the Drivers field has not yet been initialized, it does so here. +func (h *HealthCheckResponse) AddDriverInfo(name string, driverInfo *structs.DriverInfo) { + // initialize Drivers if it has not been already + if h.Drivers == nil { + h.Drivers = make(map[string]*structs.DriverInfo, 0) + } + + h.Drivers[name] = driverInfo +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 72a46e0636b..abc8c47e7e1 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1062,6 +1062,15 @@ func ValidNodeStatus(status string) bool { } } +// DriverInfo is the current state of a single driver. This is updated +// regularly as driver health changes on the node. +type DriverInfo struct { + Enabled bool + Healthy bool + HealthDescription string + UpdateTime time.Time +} + // Node is a representation of a schedulable client node type Node struct { // ID is a unique identifier for the node. It can be constructed @@ -1139,6 +1148,9 @@ type Node struct { // Raft Indexes CreateIndex uint64 ModifyIndex uint64 + + // Drivers is a map of driver names to current driver information + Drivers map[string]*DriverInfo } // Ready returns if the node is ready for running allocations