-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
health: detect failing tasks #7383
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,10 @@ type Tracker struct { | |
// considered healthy | ||
minHealthyTime time.Duration | ||
|
||
// checkLookupInterval is the interval at which we check if the | ||
// Consul checks are healthy or unhealthy. | ||
checkLookupInterval time.Duration | ||
|
||
// useChecks specifies whether to use Consul healh checks or not | ||
useChecks bool | ||
|
||
|
@@ -62,6 +66,10 @@ type Tracker struct { | |
// not needed | ||
allocStopped chan struct{} | ||
|
||
// lifecycleTasks is a set of tasks with lifecycle hook set and may | ||
// terminate without affecting alloc health | ||
lifecycleTasks map[string]bool | ||
|
||
// l is used to lock shared fields listed below | ||
l sync.Mutex | ||
|
||
|
@@ -92,28 +100,36 @@ func NewTracker(parentCtx context.Context, logger hclog.Logger, alloc *structs.A | |
// this struct should pass in an appropriately named | ||
// sub-logger. | ||
t := &Tracker{ | ||
healthy: make(chan bool, 1), | ||
allocStopped: make(chan struct{}), | ||
alloc: alloc, | ||
tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup), | ||
minHealthyTime: minHealthyTime, | ||
useChecks: useChecks, | ||
allocUpdates: allocUpdates, | ||
consulClient: consulClient, | ||
logger: logger, | ||
healthy: make(chan bool, 1), | ||
allocStopped: make(chan struct{}), | ||
alloc: alloc, | ||
tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup), | ||
minHealthyTime: minHealthyTime, | ||
useChecks: useChecks, | ||
allocUpdates: allocUpdates, | ||
consulClient: consulClient, | ||
checkLookupInterval: consulCheckLookupInterval, | ||
logger: logger, | ||
lifecycleTasks: map[string]bool{}, | ||
} | ||
|
||
t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks)) | ||
for _, task := range t.tg.Tasks { | ||
t.taskHealth[task.Name] = &taskHealthState{task: task} | ||
} | ||
|
||
for _, task := range t.tg.Tasks { | ||
if task.Lifecycle != nil && !task.Lifecycle.Sidecar { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may want a helper like connect has for this as it's easy for people to forget the initial nil check and cause a panic: https://github.com/hashicorp/nomad/blob/v0.10.5/nomad/structs/services.go#L585 |
||
t.lifecycleTasks[task.Name] = true | ||
} | ||
|
||
for _, s := range task.Services { | ||
t.consulCheckCount += len(s.Checks) | ||
} | ||
} | ||
|
||
for _, s := range t.tg.Services { | ||
t.consulCheckCount += len(s.Checks) | ||
} | ||
|
||
t.ctx, t.cancelFn = context.WithCancel(parentCtx) | ||
return t | ||
} | ||
|
@@ -171,6 +187,12 @@ func (t *Tracker) setTaskHealth(healthy, terminal bool) { | |
defer t.l.Unlock() | ||
t.tasksHealthy = healthy | ||
|
||
// if unhealthy, force waiting for new checks health status | ||
if !terminal && !healthy { | ||
t.checksHealthy = false | ||
return | ||
} | ||
|
||
// If we are marked healthy but we also require Consul to be healthy and it | ||
// isn't yet, return, unless the task is terminal | ||
requireConsul := t.useChecks && t.consulCheckCount > 0 | ||
|
@@ -191,10 +213,13 @@ func (t *Tracker) setTaskHealth(healthy, terminal bool) { | |
func (t *Tracker) setCheckHealth(healthy bool) { | ||
t.l.Lock() | ||
defer t.l.Unlock() | ||
t.checksHealthy = healthy | ||
|
||
// check health should always be false if tasks are unhealthy | ||
// as checks might be missing from unhealthy tasks | ||
t.checksHealthy = healthy && t.tasksHealthy | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This calls out for having a state machine to represent these dependent states. Would appreciate suggestions, specially ones that don't require significant rewrite. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your logic seems sound even if the structure of this package is sub-optimal. 👍 to your approach. |
||
|
||
// Only signal if we are healthy and so is the tasks | ||
if !healthy || !t.tasksHealthy { | ||
if !t.checksHealthy { | ||
return | ||
} | ||
|
||
|
@@ -249,17 +274,18 @@ func (t *Tracker) watchTaskEvents() { | |
|
||
// Detect if the alloc is unhealthy or if all tasks have started yet | ||
latestStartTime := time.Time{} | ||
for _, state := range alloc.TaskStates { | ||
for taskName, state := range alloc.TaskStates { | ||
// One of the tasks has failed so we can exit watching | ||
if state.Failed || !state.FinishedAt.IsZero() { | ||
if state.Failed || (!state.FinishedAt.IsZero() && !t.lifecycleTasks[taskName]) { | ||
t.setTaskHealth(false, true) | ||
return | ||
} | ||
|
||
if state.State != structs.TaskStateRunning { | ||
if state.State == structs.TaskStatePending { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably missing context: do tasks controlled by a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. When a task fails, it moves to dead once it's beyond restart policy attempts, otherwise, it moves to pending until it's scheduled to run after restart policy delay. |
||
latestStartTime = time.Time{} | ||
break | ||
} else if state.StartedAt.After(latestStartTime) { | ||
// task is either running or exited successfully | ||
latestStartTime = state.StartedAt | ||
} | ||
} | ||
|
@@ -276,6 +302,9 @@ func (t *Tracker) watchTaskEvents() { | |
} | ||
|
||
if !latestStartTime.Equal(allStartedTime) { | ||
// reset task health | ||
t.setTaskHealth(false, false) | ||
|
||
// Avoid the timer from firing at the old start time | ||
if !healthyTimer.Stop() { | ||
select { | ||
|
@@ -310,7 +339,7 @@ func (t *Tracker) watchTaskEvents() { | |
func (t *Tracker) watchConsulEvents() { | ||
// checkTicker is the ticker that triggers us to look at the checks in | ||
// Consul | ||
checkTicker := time.NewTicker(consulCheckLookupInterval) | ||
checkTicker := time.NewTicker(t.checkLookupInterval) | ||
defer checkTicker.Stop() | ||
|
||
// healthyTimer fires when the checks have been healthy for the | ||
|
@@ -440,13 +469,20 @@ func (t *taskHealthState) event(deadline time.Time, minHealthyTime time.Duration | |
if t.state.Failed { | ||
return "Unhealthy because of failed task", true | ||
} | ||
if t.state.State != structs.TaskStateRunning { | ||
return "Task not running by deadline", true | ||
} | ||
|
||
// We are running so check if we have been running long enough | ||
if t.state.StartedAt.Add(minHealthyTime).After(deadline) { | ||
return fmt.Sprintf("Task not running for min_healthy_time of %v by deadline", minHealthyTime), true | ||
switch t.state.State { | ||
case structs.TaskStatePending: | ||
return "Task not running by deadline", true | ||
case structs.TaskStateDead: | ||
// hook tasks are healthy when dead successfully | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could maybe get a bit more descriptive here? Just an idea: // non-sidecar lifecycle tasks are expected to terminate and therefore healthy when dead |
||
if t.task.Lifecycle == nil || t.task.Lifecycle.Sidecar { | ||
return "Unhealthy because of dead task", true | ||
} | ||
case structs.TaskStateRunning: | ||
// We are running so check if we have been running long enough | ||
if t.state.StartedAt.Add(minHealthyTime).After(deadline) { | ||
return fmt.Sprintf("Task not running for min_healthy_time of %v by deadline", minHealthyTime), true | ||
} | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
package allochealth | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync/atomic" | ||
"testing" | ||
"time" | ||
|
||
consulapi "github.com/hashicorp/consul/api" | ||
"github.com/hashicorp/nomad/client/consul" | ||
cstructs "github.com/hashicorp/nomad/client/structs" | ||
agentconsul "github.com/hashicorp/nomad/command/agent/consul" | ||
"github.com/hashicorp/nomad/helper/testlog" | ||
"github.com/hashicorp/nomad/nomad/mock" | ||
"github.com/hashicorp/nomad/nomad/structs" | ||
"github.com/hashicorp/nomad/testutil" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestTracker_Checks_Healthy(t *testing.T) { | ||
t.Parallel() | ||
|
||
alloc := mock.Alloc() | ||
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up | ||
task := alloc.Job.TaskGroups[0].Tasks[0] | ||
|
||
// Synthesize running alloc and tasks | ||
alloc.ClientStatus = structs.AllocClientStatusRunning | ||
alloc.TaskStates = map[string]*structs.TaskState{ | ||
task.Name: { | ||
State: structs.TaskStateRunning, | ||
StartedAt: time.Now(), | ||
}, | ||
} | ||
|
||
// Make Consul response | ||
check := &consulapi.AgentCheck{ | ||
Name: task.Services[0].Checks[0].Name, | ||
Status: consulapi.HealthPassing, | ||
} | ||
taskRegs := map[string]*agentconsul.ServiceRegistrations{ | ||
task.Name: { | ||
Services: map[string]*agentconsul.ServiceRegistration{ | ||
task.Services[0].Name: { | ||
Service: &consulapi.AgentService{ | ||
ID: "foo", | ||
Service: task.Services[0].Name, | ||
}, | ||
Checks: []*consulapi.AgentCheck{check}, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
logger := testlog.HCLogger(t) | ||
b := cstructs.NewAllocBroadcaster(logger) | ||
defer b.Close() | ||
|
||
// Don't reply on the first call | ||
var called uint64 | ||
consul := consul.NewMockConsulServiceClient(t, logger) | ||
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { | ||
if atomic.AddUint64(&called, 1) == 1 { | ||
return nil, nil | ||
} | ||
|
||
reg := &agentconsul.AllocRegistration{ | ||
Tasks: taskRegs, | ||
} | ||
|
||
return reg, nil | ||
} | ||
|
||
ctx, cancelFn := context.WithCancel(context.Background()) | ||
defer cancelFn() | ||
|
||
checkInterval := 10 * time.Millisecond | ||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, | ||
time.Millisecond, true) | ||
tracker.checkLookupInterval = checkInterval | ||
tracker.Start() | ||
|
||
select { | ||
case <-time.After(4 * checkInterval): | ||
require.Fail(t, "timed out while waiting for health") | ||
case h := <-tracker.HealthyCh(): | ||
require.True(t, h) | ||
} | ||
} | ||
|
||
func TestTracker_Checks_Unhealthy(t *testing.T) { | ||
t.Parallel() | ||
|
||
alloc := mock.Alloc() | ||
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up | ||
task := alloc.Job.TaskGroups[0].Tasks[0] | ||
|
||
newCheck := task.Services[0].Checks[0].Copy() | ||
newCheck.Name = "failing-check" | ||
task.Services[0].Checks = append(task.Services[0].Checks, newCheck) | ||
|
||
// Synthesize running alloc and tasks | ||
alloc.ClientStatus = structs.AllocClientStatusRunning | ||
alloc.TaskStates = map[string]*structs.TaskState{ | ||
task.Name: { | ||
State: structs.TaskStateRunning, | ||
StartedAt: time.Now(), | ||
}, | ||
} | ||
|
||
// Make Consul response | ||
checkHealthy := &consulapi.AgentCheck{ | ||
Name: task.Services[0].Checks[0].Name, | ||
Status: consulapi.HealthPassing, | ||
} | ||
checksUnhealthy := &consulapi.AgentCheck{ | ||
Name: task.Services[0].Checks[1].Name, | ||
Status: consulapi.HealthCritical, | ||
} | ||
taskRegs := map[string]*agentconsul.ServiceRegistrations{ | ||
task.Name: { | ||
Services: map[string]*agentconsul.ServiceRegistration{ | ||
task.Services[0].Name: { | ||
Service: &consulapi.AgentService{ | ||
ID: "foo", | ||
Service: task.Services[0].Name, | ||
}, | ||
Checks: []*consulapi.AgentCheck{checkHealthy, checksUnhealthy}, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
logger := testlog.HCLogger(t) | ||
b := cstructs.NewAllocBroadcaster(logger) | ||
defer b.Close() | ||
|
||
// Don't reply on the first call | ||
var called uint64 | ||
consul := consul.NewMockConsulServiceClient(t, logger) | ||
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { | ||
if atomic.AddUint64(&called, 1) == 1 { | ||
return nil, nil | ||
} | ||
|
||
reg := &agentconsul.AllocRegistration{ | ||
Tasks: taskRegs, | ||
} | ||
|
||
return reg, nil | ||
} | ||
|
||
ctx, cancelFn := context.WithCancel(context.Background()) | ||
defer cancelFn() | ||
|
||
checkInterval := 10 * time.Millisecond | ||
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, | ||
time.Millisecond, true) | ||
tracker.checkLookupInterval = checkInterval | ||
tracker.Start() | ||
|
||
testutil.WaitForResult(func() (bool, error) { | ||
lookup := atomic.LoadUint64(&called) | ||
return lookup < 4, fmt.Errorf("wait to get more task registration lookups: %v", lookup) | ||
}, func(err error) { | ||
require.NoError(t, err) | ||
}) | ||
|
||
tracker.l.Lock() | ||
require.False(t, tracker.checksHealthy) | ||
tracker.l.Unlock() | ||
|
||
select { | ||
case v := <-tracker.HealthyCh(): | ||
require.Failf(t, "expected no health value", " got %v", v) | ||
default: | ||
// good | ||
} | ||
} | ||
|
||
func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) { | ||
t.Parallel() | ||
|
||
alloc := mock.Alloc() | ||
logger := testlog.HCLogger(t) | ||
|
||
ctx, cancelFn := context.WithCancel(context.Background()) | ||
defer cancelFn() | ||
|
||
tracker := NewTracker(ctx, logger, alloc, nil, nil, | ||
time.Millisecond, true) | ||
|
||
assertNoHealth := func() { | ||
require.NoError(t, tracker.ctx.Err()) | ||
select { | ||
case v := <-tracker.HealthyCh(): | ||
require.Failf(t, "unexpected healthy event", "got %v", v) | ||
default: | ||
} | ||
} | ||
|
||
// first set task health without checks | ||
tracker.setTaskHealth(true, false) | ||
assertNoHealth() | ||
|
||
// now fail task health again before checks are successful | ||
tracker.setTaskHealth(false, false) | ||
assertNoHealth() | ||
|
||
// now pass health checks - do not propagate health yet | ||
tracker.setCheckHealth(true) | ||
assertNoHealth() | ||
|
||
// set tasks to healthy - don't propagate health yet, wait for the next check | ||
tracker.setTaskHealth(true, false) | ||
assertNoHealth() | ||
|
||
// set checks to true, now propagate health status | ||
tracker.setCheckHealth(true) | ||
|
||
require.Error(t, tracker.ctx.Err()) | ||
select { | ||
case v := <-tracker.HealthyCh(): | ||
require.True(t, v) | ||
default: | ||
require.Fail(t, "expected a health status") | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's document that sidecars are explicitly excluded from this (code suggestions are disabled, sorry)