Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Blocked] Use driver health check status to determine scheduling #3905

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"sort"
"strconv"
"time"
)

// Nodes is used to query node-related API endpoints
Expand Down Expand Up @@ -94,6 +95,15 @@ func (n *Nodes) GC(nodeID string, q *QueryOptions) error {
return err
}

// DriverInfo is used to deserialize a DriverInfo entry
type DriverInfo struct {
Attributes map[string]string
Detected bool
Healthy bool
HealthDescription string
UpdateTime time.Time
}

// Node is used to deserialize a node entry.
type Node struct {
ID string
Expand All @@ -111,6 +121,7 @@ type Node struct {
Status string
StatusDescription string
StatusUpdatedAt int64
Drivers map[string]*DriverInfo
CreateIndex uint64
ModifyIndex uint64
}
Expand Down
38 changes: 37 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
}

fingerprintManager := NewFingerprintManager(c.GetConfig, c.config.Node,
c.shutdownCh, c.updateNodeFromFingerprint, c.logger)
c.shutdownCh, c.updateNodeFromFingerprint, c.updateNodeFromHealthCheck, c.logger)

// Fingerprint the node and scan for drivers
if err := fingerprintManager.Run(); err != nil {
Expand Down Expand Up @@ -856,6 +856,9 @@ func (c *Client) setupNode() error {
if node.Links == nil {
node.Links = make(map[string]string)
}
if node.Drivers == nil {
node.Drivers = make(map[string]*structs.DriverInfo)
}
if node.Meta == nil {
node.Meta = make(map[string]string)
}
Expand Down Expand Up @@ -948,6 +951,39 @@ func (c *Client) updateNodeFromFingerprint(response *cstructs.FingerprintRespons
if response.Resources != nil {
c.config.Node.Resources.Merge(response.Resources)
}

for name, new_val := range response.Drivers {
old_val := c.config.Node.Drivers[name]
if new_val.Equals(old_val) {
continue
}
if old_val == nil {
c.config.Node.Drivers[name] = new_val
} else {
c.config.Node.Drivers[name].MergeFingerprintInfo(new_val)
}
}

return c.config.Node
}

func (c *Client) updateNodeFromHealthCheck(response *cstructs.HealthCheckResponse) *structs.Node {
c.configLock.Lock()
defer c.configLock.Unlock()

// update the node with the latest driver health information
for name, new_val := range response.Drivers {
old_val := c.config.Node.Drivers[name]
if new_val.Equals(old_val) {
continue
}
if old_val == nil {
c.config.Node.Drivers[name] = new_val
} else {
c.config.Node.Drivers[name].MergeHealthCheck(new_val)
}
}

return c.config.Node
}

Expand Down
15 changes: 15 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,21 @@ func TestClient_Fingerprint_Periodic(t *testing.T) {
if mockDriverStatus == "" {
return false, fmt.Errorf("mock driver attribute should be set on the client")
}

// assert that the Driver information for the node is also set correctly
mockDriverInfo := node.Drivers["mock_driver"]
if mockDriverInfo == nil {
return false, fmt.Errorf("mock driver is nil when it should be set on node Drivers")
}
if !mockDriverInfo.Detected {
return false, fmt.Errorf("mock driver should be set as healthy")
}
if !mockDriverInfo.Healthy {
return false, fmt.Errorf("mock driver should be set as healthy")
}
if mockDriverInfo.HealthDescription == "" {
return false, fmt.Errorf("mock driver description should not be empty")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
Expand Down
30 changes: 29 additions & 1 deletion client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,6 @@ func (d *DockerDriver) Fingerprint(req *cstructs.FingerprintRequest, resp *cstru
d.logger.Printf("[INFO] driver.docker: failed to initialize client: %s", err)
}
d.fingerprintSuccess = helper.BoolToPtr(false)
resp.RemoveAttribute(dockerDriverAttr)
return nil
}

Expand Down Expand Up @@ -552,6 +551,35 @@ func (d *DockerDriver) Fingerprint(req *cstructs.FingerprintRequest, resp *cstru
return nil
}

func (d *DockerDriver) HealthCheck(req *cstructs.HealthCheckRequest, resp *cstructs.HealthCheckResponse) error {
unhealthy := &structs.DriverInfo{
HealthDescription: "Docker driver is available but unresponsive",
UpdateTime: time.Now(),
}

_, _, err := d.dockerClients()
if err != nil {
d.logger.Printf("[WARN] driver.docker: docker driver is available but is unresponsive to `docker ps`")
resp.AddDriverInfo("docker", unhealthy)
return err
}

d.logger.Printf("[TRACE] driver.docker: docker driver is available and is responsive to `docker ps`")
healthy := &structs.DriverInfo{
Healthy: true,
HealthDescription: "Docker driver is available and responsive",
UpdateTime: time.Now(),
}
resp.AddDriverInfo("docker", healthy)
return nil
}

func (d *DockerDriver) GetHealthCheckInterval(req *cstructs.HealthCheckIntervalRequest, resp *cstructs.HealthCheckIntervalResponse) error {
resp.Eligible = true
resp.Period = 1 * time.Minute
return nil
}

// Validate is used to validate the driver configuration
func (d *DockerDriver) Validate(config map[string]interface{}) error {
fd := &fields.FieldData{
Expand Down
43 changes: 43 additions & 0 deletions client/driver/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/client/fingerprint"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/helper/uuid"
Expand Down Expand Up @@ -164,6 +165,7 @@ func TestDockerDriver_Fingerprint(t *testing.T) {
if !tu.IsTravis() {
t.Parallel()
}

ctx := testDockerDriverContexts(t, &structs.Task{Name: "foo", Driver: "docker", Resources: basicResources})
//ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
defer ctx.AllocDir.Destroy()
Expand Down Expand Up @@ -227,6 +229,7 @@ func TestDockerDriver_Fingerprint_Bridge(t *testing.T) {

request := &cstructs.FingerprintRequest{Config: conf, Node: conf.Node}
var response cstructs.FingerprintResponse

err = dd.Fingerprint(request, &response)
if err != nil {
t.Fatalf("error fingerprinting docker: %v", err)
Expand All @@ -251,6 +254,46 @@ func TestDockerDriver_Fingerprint_Bridge(t *testing.T) {
t.Logf("docker bridge ip: %q", attributes["driver.docker.bridge_ip"])
}

func TestDockerDriver_Check_DockerHealthStatus(t *testing.T) {
if !tu.IsTravis() {
t.Parallel()
}
if !testutil.DockerIsConnected(t) {
t.Skip("requires Docker")
}
if runtime.GOOS != "linux" {
t.Skip("expect only on linux")
}

require := require.New(t)

// This seems fragile, so we might need to reconsider this test if it
// proves flaky
expectedAddr, err := sockaddr.GetInterfaceIP("docker0")
if err != nil {
t.Fatalf("unable to get ip for docker0: %v", err)
}
if expectedAddr == "" {
t.Fatalf("unable to get ip for docker bridge")
}

conf := testConfig(t)
conf.Node = mock.Node()
dd := NewDockerDriver(NewDriverContext("", "", conf, conf.Node, testLogger(), nil))

request := &cstructs.HealthCheckRequest{}
var response cstructs.HealthCheckResponse

dc, ok := dd.(fingerprint.HealthCheck)
require.True(ok)
err = dc.HealthCheck(request, &response)
require.Nil(err)

driverInfo := response.Drivers["driver.docker"]
require.NotNil(driverInfo)
require.True(driverInfo.Healthy)
}

func TestDockerDriver_StartOpen_Wait(t *testing.T) {
if !tu.IsTravis() {
t.Parallel()
Expand Down
39 changes: 37 additions & 2 deletions client/driver/mock_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const (
// to "stop" a previously functioning driver after the specified duration
// (specified in seconds) for testing of periodic drivers and fingerprinters.
ShutdownPeriodicDuration = "test.shutdown_periodic_duration"

mockDriverName = "driver.mock_driver"
)

// Add the mock driver to the list of builtin drivers
Expand Down Expand Up @@ -225,14 +227,47 @@ func (m *MockDriver) Fingerprint(req *cstructs.FingerprintRequest, resp *cstruct
// current time is after the time which the node should shut down, simulate
// driver failure
case !m.shutdownFingerprintTime.IsZero() && time.Now().After(m.shutdownFingerprintTime):
resp.RemoveAttribute("driver.mock_driver")
resp.RemoveAttribute(mockDriverName)
default:
resp.AddAttribute("driver.mock_driver", "1")
resp.AddAttribute(mockDriverName, "1")
resp.Detected = true
}
return nil
}

// HealthCheck implements the interface for HealthCheck, and indicates the current
// health status of the mock driver.
func (m *MockDriver) HealthCheck(req *cstructs.HealthCheckRequest, resp *cstructs.HealthCheckResponse) error {
switch {
case !m.shutdownFingerprintTime.IsZero() && time.Now().After(m.shutdownFingerprintTime):
notHealthy := &structs.DriverInfo{
Healthy: false,
HealthDescription: "not running",
UpdateTime: time.Now(),
}
resp.AddDriverInfo(mockDriverName, notHealthy)
return nil
default:
healthy := &structs.DriverInfo{
Healthy: true,
HealthDescription: "running",
UpdateTime: time.Now(),
}
resp.AddDriverInfo("mock_driver", healthy)
return nil
}
}

// GetHealthCheckInterval implements the interface for HealthCheck and indicates
// that mock driver should be checked periodically. Returns a boolean
// indicating if ti should be checked, and the duration at which to do this
// check.
func (m *MockDriver) GetHealthCheckInterval(req *cstructs.HealthCheckIntervalRequest, resp *cstructs.HealthCheckIntervalResponse) error {
resp.Eligible = true
resp.Period = 1 * time.Second
return nil
}

// MockDriverHandle is a driver handler which supervises a mock task
type mockDriverHandle struct {
taskName string
Expand Down
15 changes: 15 additions & 0 deletions client/fingerprint/fingerprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ func NewFingerprint(name string, logger *log.Logger) (Fingerprint, error) {
// Factory is used to instantiate a new Fingerprint
type Factory func(*log.Logger) Fingerprint

// HealthCheck is used for doing periodic health checks. On a given time
// interfal, a health check will be called by the fingerprint manager of the
// node.
type HealthCheck interface {
// Check is used to update properties of the node on the status of the health
// check
HealthCheck(*cstructs.HealthCheckRequest, *cstructs.HealthCheckResponse) error

// GetHealthCheckInterval is a mechanism for the health checker to indicate that
// it should be run periodically. The return value is a boolean indicating
// whether it should be done periodically, and the time interval at which
// this check should happen.
GetHealthCheckInterval(*cstructs.HealthCheckIntervalRequest, *cstructs.HealthCheckIntervalResponse) error
}

// Fingerprint is used for doing "fingerprinting" of the
// host to automatically determine attributes, resources,
// and metadata about it. Each of these is a heuristic, and
Expand Down
Loading