Skip to content

Commit

Permalink
Merge pull request #7383 from hashicorp/b-health-detect-failing-tasks
Browse files Browse the repository at this point in the history
health: detect failing tasks
  • Loading branch information
Mahmood Ali authored Mar 25, 2020
2 parents e718b2f + 525623c commit 4a27cdd
Show file tree
Hide file tree
Showing 3 changed files with 374 additions and 25 deletions.
82 changes: 59 additions & 23 deletions client/allochealth/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type Tracker struct {
// considered healthy
minHealthyTime time.Duration

// checkLookupInterval is the interval at which we check if the
// Consul checks are healthy or unhealthy.
checkLookupInterval time.Duration

// useChecks specifies whether to use Consul healh checks or not
useChecks bool

Expand All @@ -62,6 +66,10 @@ type Tracker struct {
// not needed
allocStopped chan struct{}

// lifecycleTasks is a set of tasks with lifecycle hook set and may
// terminate without affecting alloc health
lifecycleTasks map[string]bool

// l is used to lock shared fields listed below
l sync.Mutex

Expand Down Expand Up @@ -92,28 +100,36 @@ func NewTracker(parentCtx context.Context, logger hclog.Logger, alloc *structs.A
// this struct should pass in an appropriately named
// sub-logger.
t := &Tracker{
healthy: make(chan bool, 1),
allocStopped: make(chan struct{}),
alloc: alloc,
tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup),
minHealthyTime: minHealthyTime,
useChecks: useChecks,
allocUpdates: allocUpdates,
consulClient: consulClient,
logger: logger,
healthy: make(chan bool, 1),
allocStopped: make(chan struct{}),
alloc: alloc,
tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup),
minHealthyTime: minHealthyTime,
useChecks: useChecks,
allocUpdates: allocUpdates,
consulClient: consulClient,
checkLookupInterval: consulCheckLookupInterval,
logger: logger,
lifecycleTasks: map[string]bool{},
}

t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks))
for _, task := range t.tg.Tasks {
t.taskHealth[task.Name] = &taskHealthState{task: task}
}

for _, task := range t.tg.Tasks {
if task.Lifecycle != nil && !task.Lifecycle.Sidecar {
t.lifecycleTasks[task.Name] = true
}

for _, s := range task.Services {
t.consulCheckCount += len(s.Checks)
}
}

for _, s := range t.tg.Services {
t.consulCheckCount += len(s.Checks)
}

t.ctx, t.cancelFn = context.WithCancel(parentCtx)
return t
}
Expand Down Expand Up @@ -171,6 +187,12 @@ func (t *Tracker) setTaskHealth(healthy, terminal bool) {
defer t.l.Unlock()
t.tasksHealthy = healthy

// if unhealthy, force waiting for new checks health status
if !terminal && !healthy {
t.checksHealthy = false
return
}

// If we are marked healthy but we also require Consul to be healthy and it
// isn't yet, return, unless the task is terminal
requireConsul := t.useChecks && t.consulCheckCount > 0
Expand All @@ -191,10 +213,13 @@ func (t *Tracker) setTaskHealth(healthy, terminal bool) {
func (t *Tracker) setCheckHealth(healthy bool) {
t.l.Lock()
defer t.l.Unlock()
t.checksHealthy = healthy

// check health should always be false if tasks are unhealthy
// as checks might be missing from unhealthy tasks
t.checksHealthy = healthy && t.tasksHealthy

// Only signal if we are healthy and so is the tasks
if !healthy || !t.tasksHealthy {
if !t.checksHealthy {
return
}

Expand Down Expand Up @@ -249,17 +274,18 @@ func (t *Tracker) watchTaskEvents() {

// Detect if the alloc is unhealthy or if all tasks have started yet
latestStartTime := time.Time{}
for _, state := range alloc.TaskStates {
for taskName, state := range alloc.TaskStates {
// One of the tasks has failed so we can exit watching
if state.Failed || !state.FinishedAt.IsZero() {
if state.Failed || (!state.FinishedAt.IsZero() && !t.lifecycleTasks[taskName]) {
t.setTaskHealth(false, true)
return
}

if state.State != structs.TaskStateRunning {
if state.State == structs.TaskStatePending {
latestStartTime = time.Time{}
break
} else if state.StartedAt.After(latestStartTime) {
// task is either running or exited successfully
latestStartTime = state.StartedAt
}
}
Expand All @@ -276,6 +302,9 @@ func (t *Tracker) watchTaskEvents() {
}

if !latestStartTime.Equal(allStartedTime) {
// reset task health
t.setTaskHealth(false, false)

// Avoid the timer from firing at the old start time
if !healthyTimer.Stop() {
select {
Expand Down Expand Up @@ -310,7 +339,7 @@ func (t *Tracker) watchTaskEvents() {
func (t *Tracker) watchConsulEvents() {
// checkTicker is the ticker that triggers us to look at the checks in
// Consul
checkTicker := time.NewTicker(consulCheckLookupInterval)
checkTicker := time.NewTicker(t.checkLookupInterval)
defer checkTicker.Stop()

// healthyTimer fires when the checks have been healthy for the
Expand Down Expand Up @@ -440,13 +469,20 @@ func (t *taskHealthState) event(deadline time.Time, minHealthyTime time.Duration
if t.state.Failed {
return "Unhealthy because of failed task", true
}
if t.state.State != structs.TaskStateRunning {
return "Task not running by deadline", true
}

// We are running so check if we have been running long enough
if t.state.StartedAt.Add(minHealthyTime).After(deadline) {
return fmt.Sprintf("Task not running for min_healthy_time of %v by deadline", minHealthyTime), true
switch t.state.State {
case structs.TaskStatePending:
return "Task not running by deadline", true
case structs.TaskStateDead:
// hook tasks are healthy when dead successfully
if t.task.Lifecycle == nil || t.task.Lifecycle.Sidecar {
return "Unhealthy because of dead task", true
}
case structs.TaskStateRunning:
// We are running so check if we have been running long enough
if t.state.StartedAt.Add(minHealthyTime).After(deadline) {
return fmt.Sprintf("Task not running for min_healthy_time of %v by deadline", minHealthyTime), true
}
}
}

Expand Down
229 changes: 229 additions & 0 deletions client/allochealth/tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package allochealth

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client/consul"
cstructs "github.com/hashicorp/nomad/client/structs"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

func TestTracker_Checks_Healthy(t *testing.T) {
t.Parallel()

alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]

// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}

// Make Consul response
check := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
taskRegs := map[string]*agentconsul.ServiceRegistrations{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{check},
},
},
},
}

logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()

// Don't reply on the first call
var called uint64
consul := consul.NewMockConsulServiceClient(t, logger)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
if atomic.AddUint64(&called, 1) == 1 {
return nil, nil
}

reg := &agentconsul.AllocRegistration{
Tasks: taskRegs,
}

return reg, nil
}

ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul,
time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()

select {
case <-time.After(4 * checkInterval):
require.Fail(t, "timed out while waiting for health")
case h := <-tracker.HealthyCh():
require.True(t, h)
}
}

func TestTracker_Checks_Unhealthy(t *testing.T) {
t.Parallel()

alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]

newCheck := task.Services[0].Checks[0].Copy()
newCheck.Name = "failing-check"
task.Services[0].Checks = append(task.Services[0].Checks, newCheck)

// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}

// Make Consul response
checkHealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
checksUnhealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[1].Name,
Status: consulapi.HealthCritical,
}
taskRegs := map[string]*agentconsul.ServiceRegistrations{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{checkHealthy, checksUnhealthy},
},
},
},
}

logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()

// Don't reply on the first call
var called uint64
consul := consul.NewMockConsulServiceClient(t, logger)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
if atomic.AddUint64(&called, 1) == 1 {
return nil, nil
}

reg := &agentconsul.AllocRegistration{
Tasks: taskRegs,
}

return reg, nil
}

ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul,
time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()

testutil.WaitForResult(func() (bool, error) {
lookup := atomic.LoadUint64(&called)
return lookup < 4, fmt.Errorf("wait to get more task registration lookups: %v", lookup)
}, func(err error) {
require.NoError(t, err)
})

tracker.l.Lock()
require.False(t, tracker.checksHealthy)
tracker.l.Unlock()

select {
case v := <-tracker.HealthyCh():
require.Failf(t, "expected no health value", " got %v", v)
default:
// good
}
}

func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) {
t.Parallel()

alloc := mock.Alloc()
logger := testlog.HCLogger(t)

ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

tracker := NewTracker(ctx, logger, alloc, nil, nil,
time.Millisecond, true)

assertNoHealth := func() {
require.NoError(t, tracker.ctx.Err())
select {
case v := <-tracker.HealthyCh():
require.Failf(t, "unexpected healthy event", "got %v", v)
default:
}
}

// first set task health without checks
tracker.setTaskHealth(true, false)
assertNoHealth()

// now fail task health again before checks are successful
tracker.setTaskHealth(false, false)
assertNoHealth()

// now pass health checks - do not propagate health yet
tracker.setCheckHealth(true)
assertNoHealth()

// set tasks to healthy - don't propagate health yet, wait for the next check
tracker.setTaskHealth(true, false)
assertNoHealth()

// set checks to true, now propagate health status
tracker.setCheckHealth(true)

require.Error(t, tracker.ctx.Err())
select {
case v := <-tracker.HealthyCh():
require.True(t, v)
default:
require.Fail(t, "expected a health status")
}
}
Loading

0 comments on commit 4a27cdd

Please sign in to comment.