From 9c418c224b5749126423a4a82b05bbe2b3dcb199 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 17 Jul 2019 15:22:21 -0700 Subject: [PATCH 1/3] consul: fix deadlock in check-based restarts Fixes #5395 Alternative to #5957 Make task restarting asynchronous when handling check-based restarts. This matches the pre-0.9 behavior where TaskRunner.Restart was an asynchronous signal. The check-based restarting code was not designed to handle blocking in TaskRunner.Restart. 0.9 made it reentrant and could easily overwhelm the buffered update chan and deadlock. Many thanks to @byronwolfman for his excellent debugging, PR, and reproducer! I created this alternative as changing the functionality of TaskRunner.Restart has a much larger impact. This approach reverts to old known-good behavior and minimizes the number of places changes are made. --- command/agent/consul/check_watcher.go | 26 +++++++++++++++++----- command/agent/consul/check_watcher_test.go | 23 +++++++++++++++++++ 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/command/agent/consul/check_watcher.go b/command/agent/consul/check_watcher.go index b6e8610fc86..eab7ceb114e 100644 --- a/command/agent/consul/check_watcher.go +++ b/command/agent/consul/check_watcher.go @@ -103,20 +103,34 @@ func (c *checkRestart) apply(ctx context.Context, now time.Time, status string) c.logger.Debug("restarting due to unhealthy check") // Tell TaskRunner to restart due to failure - const failure = true reason := fmt.Sprintf("healthcheck: check %q unhealthy", c.checkName) event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reason) - err := c.task.Restart(ctx, event, failure) - if err != nil { - // Error restarting - return false - } + go asyncRestart(ctx, c.logger, c.task, event) return true } return false } +// asyncRestart mimics the pre-0.9 TaskRunner.Restart behavior and is intended +// to be called in a goroutine. +func asyncRestart(ctx context.Context, logger log.Logger, task TaskRestarter, event *structs.TaskEvent) { + // Check watcher restarts are always failures + const failure = true + + // Restarting is asynchronous so there's no reason to allow this + // goroutine to block indefinitely. + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + if err := task.Restart(ctx, event, failure); err != nil { + // Restart errors are not actionable and only relevant when + // debugging allocation lifecycle management. + logger.Debug("error restarting task", "error", err, + "event_time", event.Time, "event_type", event.Type) + } +} + // checkWatchUpdates add or remove checks from the watcher type checkWatchUpdate struct { checkID string diff --git a/command/agent/consul/check_watcher_test.go b/command/agent/consul/check_watcher_test.go index fd8d578ed4a..5e303516cc6 100644 --- a/command/agent/consul/check_watcher_test.go +++ b/command/agent/consul/check_watcher_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" ) // checkRestartRecord is used by a testFakeCtx to record when restarts occur @@ -194,6 +195,28 @@ func TestCheckWatcher_Healthy(t *testing.T) { } } +// TestCheckWatcher_Unhealthy asserts unhealthy tasks are restarted exactly once. +func TestCheckWatcher_Unhealthy(t *testing.T) { + t.Parallel() + + fakeAPI, cw := testWatcherSetup(t) + + check1 := testCheck() + restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1) + cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1) + + // Check has always been failing + fakeAPI.add("testcheck1", "critical", time.Time{}) + + // Run + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + cw.Run(ctx) + + // Ensure restart was called exactly once + require.Len(t, restarter1.restarts, 1) +} + // TestCheckWatcher_HealthyWarning asserts checks in warning with // ignore_warnings=true do not restart tasks. func TestCheckWatcher_HealthyWarning(t *testing.T) { From b4b2b42e2834dd399813b428668bb46204153927 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 18 Jul 2019 07:08:27 -0700 Subject: [PATCH 2/3] Update command/agent/consul/check_watcher.go Co-Authored-By: Mahmood Ali --- command/agent/consul/check_watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command/agent/consul/check_watcher.go b/command/agent/consul/check_watcher.go index eab7ceb114e..99c4a5be4c3 100644 --- a/command/agent/consul/check_watcher.go +++ b/command/agent/consul/check_watcher.go @@ -126,7 +126,7 @@ func asyncRestart(ctx context.Context, logger log.Logger, task TaskRestarter, ev if err := task.Restart(ctx, event, failure); err != nil { // Restart errors are not actionable and only relevant when // debugging allocation lifecycle management. - logger.Debug("error restarting task", "error", err, + logger.Debug("failed to restart task", "error", err, "event_time", event.Time, "event_type", event.Type) } } From 17636727e171b36dc8dcb268892389ed2cae898d Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 18 Jul 2019 08:22:14 -0700 Subject: [PATCH 3/3] consul: add test for check watcher deadlock --- command/agent/consul/check_watcher_test.go | 79 ++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/command/agent/consul/check_watcher_test.go b/command/agent/consul/check_watcher_test.go index 5e303516cc6..e62c19f7bb0 100644 --- a/command/agent/consul/check_watcher_test.go +++ b/command/agent/consul/check_watcher_test.go @@ -3,12 +3,14 @@ package consul import ( "context" "fmt" + "sync" "testing" "time" "github.com/hashicorp/consul/api" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" ) @@ -34,6 +36,8 @@ type fakeCheckRestarter struct { allocID string taskName string checkName string + + mu sync.Mutex } // newFakeCheckRestart creates a new TaskRestarter. It needs all of the @@ -54,6 +58,8 @@ func newFakeCheckRestarter(w *checkWatcher, allocID, taskName, checkName string, // Restarts are recorded in the []restarts field and re-Watch the check. //func (c *fakeCheckRestarter) Restart(source, reason string, failure bool) { func (c *fakeCheckRestarter) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { + c.mu.Lock() + defer c.mu.Unlock() restart := checkRestartRecord{ timestamp: time.Now(), source: event.Type, @@ -69,6 +75,9 @@ func (c *fakeCheckRestarter) Restart(ctx context.Context, event *structs.TaskEve // String for debugging func (c *fakeCheckRestarter) String() string { + c.mu.Lock() + defer c.mu.Unlock() + s := fmt.Sprintf("%s %s %s restarts:\n", c.allocID, c.taskName, c.checkName) for _, r := range c.restarts { s += fmt.Sprintf("%s - %s: %s (failure: %t)\n", r.timestamp, r.source, r.reason, r.failure) @@ -76,6 +85,16 @@ func (c *fakeCheckRestarter) String() string { return s } +// GetRestarts for testing in a threadsafe way +func (c *fakeCheckRestarter) GetRestarts() []checkRestartRecord { + c.mu.Lock() + defer c.mu.Unlock() + + o := make([]checkRestartRecord, len(c.restarts)) + copy(o, c.restarts) + return o +} + // checkResponse is a response returned by the fakeChecksAPI after the given // time. type checkResponse struct { @@ -89,6 +108,8 @@ type fakeChecksAPI struct { // responses is a map of check ids to their status at a particular // time. checkResponses must be in chronological order. responses map[string][]checkResponse + + mu sync.Mutex } func newFakeChecksAPI() *fakeChecksAPI { @@ -97,10 +118,14 @@ func newFakeChecksAPI() *fakeChecksAPI { // add a new check status to Consul at the given time. func (c *fakeChecksAPI) add(id, status string, at time.Time) { + c.mu.Lock() c.responses[id] = append(c.responses[id], checkResponse{at, id, status}) + c.mu.Unlock() } func (c *fakeChecksAPI) Checks() (map[string]*api.AgentCheck, error) { + c.mu.Lock() + defer c.mu.Unlock() now := time.Now() result := make(map[string]*api.AgentCheck, len(c.responses)) @@ -350,3 +375,57 @@ func TestCheckWatcher_MultipleChecks(t *testing.T) { t.Errorf("expected check 3 to not be restarted but found %d:\n%s", n, restarter3) } } + +// TestCheckWatcher_Deadlock asserts that check watcher will not deadlock when +// attempting to restart a task even if its update queue is full. +// https://github.com/hashicorp/nomad/issues/5395 +func TestCheckWatcher_Deadlock(t *testing.T) { + t.Parallel() + + fakeAPI, cw := testWatcherSetup(t) + + // If TR.Restart blocks, restarting len(checkUpdateCh)+1 checks causes + // a deadlock due to checkWatcher.Run being blocked in + // checkRestart.apply and unable to process updates from the chan! + n := cap(cw.checkUpdateCh) + 1 + checks := make([]*structs.ServiceCheck, n) + restarters := make([]*fakeCheckRestarter, n) + for i := 0; i < n; i++ { + c := testCheck() + r := newFakeCheckRestarter(cw, + fmt.Sprintf("alloc%d", i), + fmt.Sprintf("task%d", i), + fmt.Sprintf("check%d", i), + c, + ) + checks[i] = c + restarters[i] = r + } + + // Run + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go cw.Run(ctx) + + // Watch + for _, r := range restarters { + cw.Watch(r.allocID, r.taskName, r.checkName, r.check, r) + } + + // Make them all fail + for _, r := range restarters { + fakeAPI.add(r.checkName, "critical", time.Time{}) + } + + // Ensure that restart was called exactly once on all checks + testutil.WaitForResult(func() (bool, error) { + for _, r := range restarters { + if n := len(r.GetRestarts()); n != 1 { + return false, fmt.Errorf("expected 1 restart but found %d", n) + } + } + return true, nil + }, func(err error) { + require.NoError(t, err) + }) +}