From 2f7b7e3bc43c78e792c21818c72a7f4dee1ba6d6 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sat, 12 Aug 2017 14:23:36 -0700 Subject: [PATCH 1/3] Refactor health watcher and emit events --- client/alloc_runner_health_watcher.go | 560 +++++++++++++++++++------- 1 file changed, 413 insertions(+), 147 deletions(-) diff --git a/client/alloc_runner_health_watcher.go b/client/alloc_runner_health_watcher.go index b2bd75b563b..5b3844c7d9c 100644 --- a/client/alloc_runner_health_watcher.go +++ b/client/alloc_runner_health_watcher.go @@ -2,9 +2,14 @@ package client import ( "context" + "fmt" + "log" + "strings" + "sync" "time" "github.com/hashicorp/consul/api" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" @@ -14,19 +19,19 @@ const ( // consulCheckLookupInterval is the interval at which we check if the // Consul checks are healthy or unhealthy. consulCheckLookupInterval = 500 * time.Millisecond + + // allocHealthEventSource is the source used for emitting task events + allocHealthEventSource = "Alloc Unhealthy" ) // watchHealth is responsible for watching an allocation's task status and -// potentially consul health check status to determine if the allocation is +// potentially Consul health check status to determine if the allocation is // healthy or unhealthy. func (r *AllocRunner) watchHealth(ctx context.Context) { + // See if we should watch the allocs health alloc := r.Alloc() - if alloc.DeploymentID == "" { - r.logger.Printf("[TRACE] client.alloc_watcher: exiting because alloc isn't part of a deployment") - return - } else if alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() { - r.logger.Printf("[TRACE] client.alloc_watcher: exiting because alloc deployment health already determined") + if alloc.DeploymentID == "" || alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() { return } @@ -34,200 +39,461 @@ func (r *AllocRunner) watchHealth(ctx context.Context) { if tg == nil { r.logger.Printf("[ERR] client.alloc_watcher: failed to lookup allocation's task group. Exiting watcher") return + } else if tg.Update == nil || tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Manual { + return } - // Checks marks whether we should be watching for Consul health checks - desiredChecks := 0 - var checkTicker *time.Ticker - var checkCh <-chan time.Time + // Get an allocation listener to watch for alloc events + l := r.allocBroadcast.Listen() + defer l.Close() - u := tg.Update - switch { - case u == nil: - r.logger.Printf("[TRACE] client.alloc_watcher: no update block for alloc %q. exiting", alloc.ID) - return - case u.HealthCheck == structs.UpdateStrategyHealthCheck_Manual: - r.logger.Printf("[TRACE] client.alloc_watcher: update block has manual checks for alloc %q. exiting", alloc.ID) - return - case u.HealthCheck == structs.UpdateStrategyHealthCheck_Checks: - for _, task := range tg.Tasks { - for _, s := range task.Services { - desiredChecks += len(s.Checks) - } + // Create a new context with the health deadline + deadline := time.Now().Add(tg.Update.HealthyDeadline) + healthCtx, healthCtxCancel := context.WithDeadline(ctx, deadline) + defer healthCtxCancel() + r.logger.Printf("[DEBUG] client.alloc_watcher: deadline (%v) for alloc %q is at %v", tg.Update.HealthyDeadline, alloc.ID, deadline) + + // Create the health tracker object + tracker := newAllocHealthTracker(healthCtx, r.logger, alloc, l, r.consulClient) + tracker.Start() + + allocHealthy := false + select { + case <-healthCtx.Done(): + // We were cancelled which means we are no longer needed + if healthCtx.Err() == context.Canceled { + return } - checkTicker = time.NewTicker(consulCheckLookupInterval) - checkCh = checkTicker.C + break + case <-tracker.AllocStoppedCh(): + // The allocation was stopped so nothing to do + return + case healthy := <-tracker.HealthyCh(): + allocHealthy = healthy } - // Get a listener so we know when an allocation is updated. - l := r.allocBroadcast.Listen() - - // Create a deadline timer for the health - r.logger.Printf("[DEBUG] client.alloc_watcher: deadline (%v) for alloc %q is at %v", u.HealthyDeadline, alloc.ID, time.Now().Add(u.HealthyDeadline)) - deadline := time.NewTimer(u.HealthyDeadline) + r.allocLock.Lock() + r.allocHealth = helper.BoolToPtr(allocHealthy) + r.allocLock.Unlock() - // Create a healthy timer - latestTaskHealthy := time.Unix(0, 0) - latestChecksHealthy := time.Unix(0, 0) - healthyTimer := time.NewTimer(0) - healthyTime := time.Time{} - cancelHealthyTimer := func() { - if !healthyTimer.Stop() { - select { - case <-healthyTimer.C: - default: + // We are unhealthy so emit task events explaining why + if !allocHealthy { + r.taskLock.RLock() + for task, event := range tracker.TaskEvents() { + if tr, ok := r.tasks[task]; ok { + tr.EmitEvent(allocHealthEventSource, event) } } + r.taskLock.RUnlock() } - cancelHealthyTimer() - // Cleanup function - defer func() { - if !deadline.Stop() { - <-deadline.C - } - if !healthyTimer.Stop() { - <-healthyTimer.C + r.syncStatus() +} + +type allocHealthTracker struct { + // logger is used to log + logger *log.Logger + + // ctx and cancelFn is used to shutdown the tracker + ctx context.Context + cancelFn context.CancelFunc + + // alloc is the alloc we are tracking + alloc *structs.Allocation + + // tg is the task group we are tracking + tg *structs.TaskGroup + + // consulCheckCount is the number of checks the task group will attempt to + // register + consulCheckCount int + + // allocUpdates is a listener for retrieving new alloc updates + allocUpdates *cstructs.AllocListener + + // consulClient is used to look up the state of the task's checks + consulClient ConsulServiceAPI + + // healthy is used to signal whether we have determined the allocation to be + // healthy or unhealthy + healthy chan bool + + // allocStopped is triggered when the allocation is stopped and tracking is + // not needed + allocStopped chan struct{} + + // tasksHealthy marks whether all the tasks have met their health check + // (disregards Consul) + tasksHealthy bool + + // allocFailed marks whether the allocation failed + allocFailed bool + + // checksHealthy marks whether all the task's Consul checks are healthy + checksHealthy bool + + taskHealth map[string]*taskHealthState + + // l is used to lock shared fields + l sync.Mutex +} + +func newAllocHealthTracker(parentCtx context.Context, logger *log.Logger, alloc *structs.Allocation, + allocUpdates *cstructs.AllocListener, consulClient ConsulServiceAPI) *allocHealthTracker { + + a := &allocHealthTracker{ + logger: logger, + healthy: make(chan bool, 1), + allocStopped: make(chan struct{}), + alloc: alloc, + tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup), + allocUpdates: allocUpdates, + consulClient: consulClient, + } + + a.taskHealth = make(map[string]*taskHealthState, len(a.tg.Tasks)) + for _, task := range a.tg.Tasks { + a.taskHealth[task.Name] = &taskHealthState{task: task} + } + + for _, task := range a.tg.Tasks { + for _, s := range task.Services { + a.consulCheckCount += len(s.Checks) } - if checkTicker != nil { - checkTicker.Stop() + } + + a.ctx, a.cancelFn = context.WithCancel(parentCtx) + return a +} + +func (a *allocHealthTracker) Start() { + go a.watchTaskEvents() + if a.tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks { + go a.watchConsulEvents() + } +} + +func (a *allocHealthTracker) HealthyCh() <-chan bool { + return a.healthy +} + +func (a *allocHealthTracker) AllocStoppedCh() <-chan struct{} { + return a.allocStopped +} + +func (a *allocHealthTracker) TaskEvents() map[string]string { + a.l.Lock() + defer a.l.Unlock() + + // Nothing to do since the failure wasn't task related + if a.allocFailed { + return nil + } + + deadline, _ := a.ctx.Deadline() + events := make(map[string]string, len(a.tg.Tasks)) + + // Go through are task information and build the event map + for task, state := range a.taskHealth { + if e, ok := state.event(deadline, a.tg.Update); ok { + events[task] = e } - l.Close() - }() + } - setHealth := func(h bool) { - r.allocLock.Lock() - r.allocHealth = helper.BoolToPtr(h) - r.allocLock.Unlock() - r.syncStatus() + return events +} + +func (a *allocHealthTracker) setTaskHealth(healthy, terminal bool) { + a.l.Lock() + defer a.l.Unlock() + a.tasksHealthy = healthy + + // If we are marked healthy but we also require Consul to be healthy and it + // isn't yet, return, unless the task is terminal + requireConsul := a.tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks && a.consulCheckCount > 0 + if !terminal && healthy && requireConsul && !a.checksHealthy { + return } - // Store whether the last consul checks call was successful or not - consulChecksErr := false + select { + case a.healthy <- healthy: + default: + } - var allocReg *consul.AllocRegistration - first := true -OUTER: - for { - if !first { - select { - case <-ctx.Done(): - return - case newAlloc, ok := <-l.Ch: - if !ok { - return - } + // Shutdown the tracker + a.cancelFn() +} - alloc = newAlloc - r.logger.Printf("[TRACE] client.alloc_watcher: new alloc version for %q", alloc.ID) - case <-checkCh: - newAllocReg, err := r.consulClient.AllocRegistrations(alloc.ID) - if err != nil { - if !consulChecksErr { - consulChecksErr = true - r.logger.Printf("[WARN] client.alloc_watcher: failed to lookup consul registrations for allocation %q: %v", alloc.ID, err) - } - } else { - consulChecksErr = false - allocReg = newAllocReg - } - case <-deadline.C: - // We have exceeded our deadline without being healthy. - r.logger.Printf("[TRACE] client.alloc_watcher: alloc %q hit healthy deadline", alloc.ID) - setHealth(false) - return - case <-healthyTimer.C: - r.logger.Printf("[TRACE] client.alloc_watcher: alloc %q is healthy", alloc.ID) - setHealth(true) - return - } +func (a *allocHealthTracker) setCheckHealth(healthy bool) { + a.l.Lock() + defer a.l.Unlock() + a.checksHealthy = healthy + + // Only signal if we are healthy and so is the tasks + if !healthy || !a.tasksHealthy { + return + } + + select { + case a.healthy <- healthy: + default: + } + + // Shutdown the tracker + a.cancelFn() +} + +func (a *allocHealthTracker) markAllocStopped() { + close(a.allocStopped) + a.cancelFn() +} + +func (a *allocHealthTracker) watchTaskEvents() { + alloc := a.alloc + allStartedTime := time.Time{} + healthyTimer := time.NewTimer(0) + if !healthyTimer.Stop() { + select { + case <-healthyTimer.C: + default: } - first = false + } + for { // If the alloc is being stopped by the server just exit switch alloc.DesiredStatus { case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: - r.logger.Printf("[TRACE] client.alloc_watcher: desired status terminal for alloc %q", alloc.ID) + a.logger.Printf("[TRACE] client.alloc_watcher: desired status terminal for alloc %q", alloc.ID) + a.markAllocStopped() return } - // If the alloc is marked as failed by the client set the status to - // unhealthy + // Store the task states + a.l.Lock() + for task, state := range alloc.TaskStates { + a.taskHealth[task].state = state + } + a.l.Unlock() + + // Detect if the alloc is unhealthy or if all tasks have started yet + latestStartTime := time.Time{} + for _, state := range alloc.TaskStates { + // One of the tasks has failed so we can exit watching + if state.Failed || !state.FinishedAt.IsZero() { + a.setTaskHealth(false, true) + return + } + + if state.State != structs.TaskStateRunning { + latestStartTime = time.Time{} + break + } else if state.StartedAt.After(latestStartTime) { + latestStartTime = state.StartedAt + } + } + + // If the alloc is marked as failed by the client but none of the + // individual tasks failed, that means something failed at the alloc + // level. if alloc.ClientStatus == structs.AllocClientStatusFailed { - r.logger.Printf("[TRACE] client.alloc_watcher: client status failed for alloc %q", alloc.ID) - setHealth(false) + a.logger.Printf("[TRACE] client.alloc_watcher: client status failed for alloc %q", alloc.ID) + a.l.Lock() + a.allocFailed = true + a.l.Unlock() + a.setTaskHealth(false, true) return } - if len(alloc.TaskStates) != len(tg.Tasks) { - r.logger.Printf("[TRACE] client.alloc_watcher: all task runners haven't started") - continue OUTER + if !latestStartTime.Equal(allStartedTime) { + // Avoid the timer from firing at the old start time + if !healthyTimer.Stop() { + select { + case <-healthyTimer.C: + default: + } + } + + // Set the timer since all tasks are started + if !latestStartTime.IsZero() { + allStartedTime = latestStartTime + healthyTimer.Reset(a.tg.Update.MinHealthyTime) + } } - // If the task is dead or has restarted, fail - for _, tstate := range alloc.TaskStates { - if tstate.Failed || !tstate.FinishedAt.IsZero() || tstate.Restarts != 0 { - r.logger.Printf("[TRACE] client.alloc_watcher: setting health to false for alloc %q", alloc.ID) - setHealth(false) + select { + case <-a.ctx.Done(): + return + case newAlloc, ok := <-a.allocUpdates.Ch: + if !ok { return } + alloc = newAlloc + case <-healthyTimer.C: + a.setTaskHealth(true, false) + } + } +} + +func (a *allocHealthTracker) watchConsulEvents() { + // checkTicker is the ticker that triggers us to look at the checks in + // Consul + checkTicker := time.NewTicker(consulCheckLookupInterval) + defer checkTicker.Stop() + + // healthyTimer fires when the checks have been healthy for the + // MinHealthyTime + healthyTimer := time.NewTimer(0) + if !healthyTimer.Stop() { + select { + case <-healthyTimer.C: + default: } + } + + // primed marks whether the healthy timer has been set + primed := false + + // Store whether the last Consul checks call was successful or not + consulChecksErr := false + + // allocReg are the registered objects in Consul for the allocation + var allocReg *consul.AllocRegistration - // If we should have checks and they aren't all healthy continue - if desiredChecks > 0 { - if allocReg.NumChecks() != desiredChecks { - r.logger.Printf("[TRACE] client.alloc_watcher: continuing since all checks (want %d; got %d) haven't been registered for alloc %q", desiredChecks, allocReg.NumChecks(), alloc.ID) - cancelHealthyTimer() +OUTER: + for { + select { + case <-a.ctx.Done(): + return + case <-checkTicker.C: + newAllocReg, err := a.consulClient.AllocRegistrations(a.alloc.ID) + if err != nil { + if !consulChecksErr { + consulChecksErr = true + a.logger.Printf("[WARN] client.alloc_watcher: failed to lookup Consul registrations for allocation %q: %v", a.alloc.ID, err) + } continue OUTER + } else { + consulChecksErr = false + allocReg = newAllocReg } + case <-healthyTimer.C: + a.setCheckHealth(true) + } + + if allocReg == nil { + continue + } + + // Store the task registrations + a.l.Lock() + for task, reg := range allocReg.Tasks { + a.taskHealth[task].taskRegistrations = reg + } + a.l.Unlock() + + // Detect if all the checks are passing + passed := true - // Check if all the checks are passing - for _, treg := range allocReg.Tasks { - for _, sreg := range treg.Services { - for _, check := range sreg.Checks { - if check.Status != api.HealthPassing { - r.logger.Printf("[TRACE] client.alloc_watcher: continuing since check %q isn't passing for alloc %q", check.CheckID, alloc.ID) - latestChecksHealthy = time.Time{} - cancelHealthyTimer() - continue OUTER - } + CHECKS: + for _, treg := range allocReg.Tasks { + for _, sreg := range treg.Services { + for _, check := range sreg.Checks { + if check.Status == api.HealthPassing { + continue } + + passed = false + a.setCheckHealth(false) + break CHECKS } } - if latestChecksHealthy.IsZero() { - latestChecksHealthy = time.Now() - } } - // Determine if the allocation is healthy - for task, tstate := range alloc.TaskStates { - if tstate.State != structs.TaskStateRunning { - r.logger.Printf("[TRACE] client.alloc_watcher: continuing since task %q hasn't started for alloc %q", task, alloc.ID) - continue OUTER + if !passed { + // Reset the timer since we have transistioned back to unhealthy + if primed { + if !healthyTimer.Stop() { + select { + case <-healthyTimer.C: + default: + } + } + primed = false + } + } else if !primed { + // Reset the timer to fire after MinHealthyTime + if !healthyTimer.Stop() { + select { + case <-healthyTimer.C: + default: + } } - if tstate.StartedAt.After(latestTaskHealthy) { - latestTaskHealthy = tstate.StartedAt + primed = true + healthyTimer.Reset(a.tg.Update.MinHealthyTime) + } + } +} + +type taskHealthState struct { + task *structs.Task + state *structs.TaskState + taskRegistrations *consul.TaskRegistration +} + +func (t *taskHealthState) event(deadline time.Time, update *structs.UpdateStrategy) (string, bool) { + requireChecks := false + desiredChecks := 0 + for _, s := range t.task.Services { + if nc := len(s.Checks); nc > 0 { + requireChecks = true + desiredChecks += nc + } + } + requireChecks = requireChecks && update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks + + if t.state != nil { + if t.state.Failed { + return "Unhealthy because of failed task", true + } + if t.state.State != structs.TaskStateRunning { + return "Task not running by deadline", true + } + + // We are running so check if we have been running long enough + if t.state.StartedAt.Add(update.MinHealthyTime).After(deadline) { + return fmt.Sprintf("Task not running for min_healthy_time of %v by deadline", update.MinHealthyTime), true + } + } + + if t.taskRegistrations != nil { + var notPassing []string + passing := 0 + + OUTER: + for _, sreg := range t.taskRegistrations.Services { + for _, check := range sreg.Checks { + if check.Status != api.HealthPassing { + notPassing = append(notPassing, sreg.Service.Service) + continue OUTER + } else { + passing++ + } } } - // Determine when we can mark ourselves as healthy. - totalHealthy := latestTaskHealthy - if totalHealthy.Before(latestChecksHealthy) { - totalHealthy = latestChecksHealthy + if len(notPassing) != 0 { + return fmt.Sprintf("Services not healthy by deadline: %s", strings.Join(notPassing, ", ")), true } - // Nothing to do since we are already waiting for the healthy timer to - // fire at the same time. - if totalHealthy.Equal(healthyTime) { - continue OUTER + if passing != desiredChecks { + return fmt.Sprintf("Only %d out of %d checks registered and passing", passing, desiredChecks), true } - healthyTime = totalHealthy - cancelHealthyTimer() - d := time.Until(totalHealthy.Add(u.MinHealthyTime)) - healthyTimer.Reset(d) - r.logger.Printf("[TRACE] client.alloc_watcher: setting healthy timer to %v for alloc %q", d, alloc.ID) + } else if requireChecks { + return "Service checks not registered", true } + + return "", false } From 0f2a05714b02fe9ed47a1d10f5844417c513d4f1 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sat, 12 Aug 2017 14:42:53 -0700 Subject: [PATCH 2/3] test --- client/alloc_runner_test.go | 98 ++++++++++++++++++++++++++++++++++++- 1 file changed, 96 insertions(+), 2 deletions(-) diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 33cf07bad01..4558e833270 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/kr/pretty" + "github.com/stretchr/testify/assert" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/vaultclient" @@ -97,6 +98,7 @@ func TestAllocRunner_SimpleRun(t *testing.T) { // Test that the watcher will mark the allocation as unhealthy. func TestAllocRunner_DeploymentHealth_Unhealthy_BadStart(t *testing.T) { t.Parallel() + assert := assert.New(t) // Ensure the task fails and restarts upd, ar := testAllocRunner(false) @@ -129,12 +131,22 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_BadStart(t *testing.T) { }, func(err error) { t.Fatalf("err: %v", err) }) + + // Assert that we have an event explaining why we are unhealthy. + assert.Len(ar.taskStates, 1) + state := ar.taskStates[task.Name] + assert.NotNil(state) + assert.NotEmpty(state.Events) + last := state.Events[len(state.Events)-1] + assert.Equal(allocHealthEventSource, last.GenericSource) + assert.Contains(last.Message, "failed task") } // Test that the watcher will mark the allocation as unhealthy if it hits its // deadline. func TestAllocRunner_DeploymentHealth_Unhealthy_Deadline(t *testing.T) { t.Parallel() + assert := assert.New(t) // Ensure the task fails and restarts upd, ar := testAllocRunner(false) @@ -169,6 +181,15 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Deadline(t *testing.T) { }, func(err error) { t.Fatalf("err: %v", err) }) + + // Assert that we have an event explaining why we are unhealthy. + assert.Len(ar.taskStates, 1) + state := ar.taskStates[task.Name] + assert.NotNil(state) + assert.NotEmpty(state.Events) + last := state.Events[len(state.Events)-1] + assert.Equal(allocHealthEventSource, last.GenericSource) + assert.Contains(last.Message, "not running by deadline") } // Test that the watcher will mark the allocation as healthy. @@ -263,7 +284,8 @@ func TestAllocRunner_DeploymentHealth_Healthy_Checks(t *testing.T) { task.Name: { Services: map[string]*consul.ServiceRegistration{ "123": { - Checks: []*api.AgentCheck{checkHealthy}, + Service: &api.AgentService{Service: "foo"}, + Checks: []*api.AgentCheck{checkHealthy}, }, }, }, @@ -275,7 +297,8 @@ func TestAllocRunner_DeploymentHealth_Healthy_Checks(t *testing.T) { task.Name: { Services: map[string]*consul.ServiceRegistration{ "123": { - Checks: []*api.AgentCheck{checkUnhealthy}, + Service: &api.AgentService{Service: "foo"}, + Checks: []*api.AgentCheck{checkUnhealthy}, }, }, }, @@ -308,6 +331,77 @@ func TestAllocRunner_DeploymentHealth_Healthy_Checks(t *testing.T) { } } +// Test that the watcher will mark the allocation as unhealthy with failing +// checks +func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + // Ensure the task fails and restarts + upd, ar := testAllocRunner(false) + + // Make the task fail + task := ar.alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config["run_for"] = "10s" + + // Make the alloc be part of a deployment + ar.alloc.DeploymentID = structs.GenerateUUID() + ar.alloc.Job.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() + ar.alloc.Job.TaskGroups[0].Update.HealthCheck = structs.UpdateStrategyHealthCheck_Checks + ar.alloc.Job.TaskGroups[0].Update.MaxParallel = 1 + ar.alloc.Job.TaskGroups[0].Update.MinHealthyTime = 100 * time.Millisecond + ar.alloc.Job.TaskGroups[0].Update.HealthyDeadline = 1 * time.Second + + checkUnhealthy := &api.AgentCheck{ + CheckID: structs.GenerateUUID(), + Status: api.HealthWarning, + } + + // Only return the check as healthy after a duration + ar.consulClient.(*mockConsulServiceClient).allocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) { + return &consul.AllocRegistration{ + Tasks: map[string]*consul.TaskRegistration{ + task.Name: { + Services: map[string]*consul.ServiceRegistration{ + "123": { + Service: &api.AgentService{Service: "foo"}, + Checks: []*api.AgentCheck{checkUnhealthy}, + }, + }, + }, + }, + }, nil + } + + go ar.Run() + defer ar.Destroy() + + testutil.WaitForResult(func() (bool, error) { + _, last := upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + if last.DeploymentStatus == nil || last.DeploymentStatus.Healthy == nil { + return false, fmt.Errorf("want deployment status unhealthy; got unset") + } else if *last.DeploymentStatus.Healthy { + return false, fmt.Errorf("want deployment status unhealthy; got healthy") + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Assert that we have an event explaining why we are unhealthy. + assert.Len(ar.taskStates, 1) + state := ar.taskStates[task.Name] + assert.NotNil(state) + assert.NotEmpty(state.Events) + last := state.Events[len(state.Events)-1] + assert.Equal(allocHealthEventSource, last.GenericSource) + assert.Contains(last.Message, "Services not healthy by deadline") +} + // Test that the watcher will mark the allocation as healthy. func TestAllocRunner_DeploymentHealth_Healthy_UpdatedDeployment(t *testing.T) { t.Parallel() From ac5984125c5558b84383ad8e585c714e7695f8a9 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 15 Aug 2017 12:23:29 -0700 Subject: [PATCH 3/3] comments --- client/alloc_runner_health_watcher.go | 36 ++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/client/alloc_runner_health_watcher.go b/client/alloc_runner_health_watcher.go index 5b3844c7d9c..7fbe89f5fc2 100644 --- a/client/alloc_runner_health_watcher.go +++ b/client/alloc_runner_health_watcher.go @@ -65,7 +65,7 @@ func (r *AllocRunner) watchHealth(ctx context.Context) { return } - break + // Since the deadline has been reached we are not healthy case <-tracker.AllocStoppedCh(): // The allocation was stopped so nothing to do return @@ -91,6 +91,8 @@ func (r *AllocRunner) watchHealth(ctx context.Context) { r.syncStatus() } +// allocHealthTracker tracks the health of an allocation and makes health events +// watchable via channels. type allocHealthTracker struct { // logger is used to log logger *log.Logger @@ -123,6 +125,9 @@ type allocHealthTracker struct { // not needed allocStopped chan struct{} + // l is used to lock shared fields listed below + l sync.Mutex + // tasksHealthy marks whether all the tasks have met their health check // (disregards Consul) tasksHealthy bool @@ -133,12 +138,13 @@ type allocHealthTracker struct { // checksHealthy marks whether all the task's Consul checks are healthy checksHealthy bool + // taskHealth contains the health state for each task taskHealth map[string]*taskHealthState - - // l is used to lock shared fields - l sync.Mutex } +// newAllocHealthTracker returns a health tracker for the given allocation. An +// alloc listener and consul API object are given so that the watcher can detect +// health changes. func newAllocHealthTracker(parentCtx context.Context, logger *log.Logger, alloc *structs.Allocation, allocUpdates *cstructs.AllocListener, consulClient ConsulServiceAPI) *allocHealthTracker { @@ -167,6 +173,7 @@ func newAllocHealthTracker(parentCtx context.Context, logger *log.Logger, alloc return a } +// Start starts the watcher. func (a *allocHealthTracker) Start() { go a.watchTaskEvents() if a.tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks { @@ -174,14 +181,21 @@ func (a *allocHealthTracker) Start() { } } +// HealthyCh returns a channel that will emit a boolean indicating the health of +// the allocation. func (a *allocHealthTracker) HealthyCh() <-chan bool { return a.healthy } +// AllocStoppedCh returns a channel that will be fired if the allocation is +// stopped. This means that health will not be set. func (a *allocHealthTracker) AllocStoppedCh() <-chan struct{} { return a.allocStopped } +// TaskEvents returns a map of events by task. This should only be called after +// health has been determined. Only tasks that have contributed to the +// allocation being unhealthy will have an event. func (a *allocHealthTracker) TaskEvents() map[string]string { a.l.Lock() defer a.l.Unlock() @@ -204,6 +218,8 @@ func (a *allocHealthTracker) TaskEvents() map[string]string { return events } +// setTaskHealth is used to set the tasks health as healthy or unhealthy. If the +// allocation is terminal, health is immediately broadcasted. func (a *allocHealthTracker) setTaskHealth(healthy, terminal bool) { a.l.Lock() defer a.l.Unlock() @@ -225,6 +241,7 @@ func (a *allocHealthTracker) setTaskHealth(healthy, terminal bool) { a.cancelFn() } +// setCheckHealth is used to mark the checks as either healthy or unhealthy. func (a *allocHealthTracker) setCheckHealth(healthy bool) { a.l.Lock() defer a.l.Unlock() @@ -244,11 +261,14 @@ func (a *allocHealthTracker) setCheckHealth(healthy bool) { a.cancelFn() } +// markAllocStopped is used to mark the allocation as having stopped. func (a *allocHealthTracker) markAllocStopped() { close(a.allocStopped) a.cancelFn() } +// watchTaskEvents is a long lived watcher that watches for the health of the +// allocation's tasks. func (a *allocHealthTracker) watchTaskEvents() { alloc := a.alloc allStartedTime := time.Time{} @@ -335,6 +355,8 @@ func (a *allocHealthTracker) watchTaskEvents() { } } +// watchConsulEvents iis a long lived watcher that watches for the health of the +// allocation's Consul checks. func (a *allocHealthTracker) watchConsulEvents() { // checkTicker is the ticker that triggers us to look at the checks in // Consul @@ -436,12 +458,18 @@ OUTER: } } +// taskHealthState captures all known health information about a task. It is +// largely used to determine if the task has contributed to the allocation being +// unhealthy. type taskHealthState struct { task *structs.Task state *structs.TaskState taskRegistrations *consul.TaskRegistration } +// event takes the deadline time for the allocation to be healthy and the update +// strategy of the group. It returns true if the task has contributed to the +// allocation being unhealthy and if so, an event description of why. func (t *taskHealthState) event(deadline time.Time, update *structs.UpdateStrategy) (string, bool) { requireChecks := false desiredChecks := 0