Skip to content

Commit

Permalink
Merge pull request #5975 from hashicorp/b-check-watcher-deadlock
Browse files Browse the repository at this point in the history
consul: fix deadlock in check-based restarts
  • Loading branch information
schmichael authored Jul 18, 2019
2 parents 5f44755 + 1763672 commit 858d18d
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 6 deletions.
26 changes: 20 additions & 6 deletions command/agent/consul/check_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,34 @@ func (c *checkRestart) apply(ctx context.Context, now time.Time, status string)
c.logger.Debug("restarting due to unhealthy check")

// Tell TaskRunner to restart due to failure
const failure = true
reason := fmt.Sprintf("healthcheck: check %q unhealthy", c.checkName)
event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reason)
err := c.task.Restart(ctx, event, failure)
if err != nil {
// Error restarting
return false
}
go asyncRestart(ctx, c.logger, c.task, event)
return true
}

return false
}

// asyncRestart mimics the pre-0.9 TaskRunner.Restart behavior and is intended
// to be called in a goroutine.
func asyncRestart(ctx context.Context, logger log.Logger, task TaskRestarter, event *structs.TaskEvent) {
// Check watcher restarts are always failures
const failure = true

// Restarting is asynchronous so there's no reason to allow this
// goroutine to block indefinitely.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

if err := task.Restart(ctx, event, failure); err != nil {
// Restart errors are not actionable and only relevant when
// debugging allocation lifecycle management.
logger.Debug("failed to restart task", "error", err,
"event_time", event.Time, "event_type", event.Type)
}
}

// checkWatchUpdates add or remove checks from the watcher
type checkWatchUpdate struct {
checkID string
Expand Down
102 changes: 102 additions & 0 deletions command/agent/consul/check_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package consul
import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

// checkRestartRecord is used by a testFakeCtx to record when restarts occur
Expand All @@ -33,6 +36,8 @@ type fakeCheckRestarter struct {
allocID string
taskName string
checkName string

mu sync.Mutex
}

// newFakeCheckRestart creates a new TaskRestarter. It needs all of the
Expand All @@ -53,6 +58,8 @@ func newFakeCheckRestarter(w *checkWatcher, allocID, taskName, checkName string,
// Restarts are recorded in the []restarts field and re-Watch the check.
//func (c *fakeCheckRestarter) Restart(source, reason string, failure bool) {
func (c *fakeCheckRestarter) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
c.mu.Lock()
defer c.mu.Unlock()
restart := checkRestartRecord{
timestamp: time.Now(),
source: event.Type,
Expand All @@ -68,13 +75,26 @@ func (c *fakeCheckRestarter) Restart(ctx context.Context, event *structs.TaskEve

// String for debugging
func (c *fakeCheckRestarter) String() string {
c.mu.Lock()
defer c.mu.Unlock()

s := fmt.Sprintf("%s %s %s restarts:\n", c.allocID, c.taskName, c.checkName)
for _, r := range c.restarts {
s += fmt.Sprintf("%s - %s: %s (failure: %t)\n", r.timestamp, r.source, r.reason, r.failure)
}
return s
}

// GetRestarts for testing in a threadsafe way
func (c *fakeCheckRestarter) GetRestarts() []checkRestartRecord {
c.mu.Lock()
defer c.mu.Unlock()

o := make([]checkRestartRecord, len(c.restarts))
copy(o, c.restarts)
return o
}

// checkResponse is a response returned by the fakeChecksAPI after the given
// time.
type checkResponse struct {
Expand All @@ -88,6 +108,8 @@ type fakeChecksAPI struct {
// responses is a map of check ids to their status at a particular
// time. checkResponses must be in chronological order.
responses map[string][]checkResponse

mu sync.Mutex
}

func newFakeChecksAPI() *fakeChecksAPI {
Expand All @@ -96,10 +118,14 @@ func newFakeChecksAPI() *fakeChecksAPI {

// add a new check status to Consul at the given time.
func (c *fakeChecksAPI) add(id, status string, at time.Time) {
c.mu.Lock()
c.responses[id] = append(c.responses[id], checkResponse{at, id, status})
c.mu.Unlock()
}

func (c *fakeChecksAPI) Checks() (map[string]*api.AgentCheck, error) {
c.mu.Lock()
defer c.mu.Unlock()
now := time.Now()
result := make(map[string]*api.AgentCheck, len(c.responses))

Expand Down Expand Up @@ -194,6 +220,28 @@ func TestCheckWatcher_Healthy(t *testing.T) {
}
}

// TestCheckWatcher_Unhealthy asserts unhealthy tasks are restarted exactly once.
func TestCheckWatcher_Unhealthy(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup(t)

check1 := testCheck()
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)

// Check has always been failing
fakeAPI.add("testcheck1", "critical", time.Time{})

// Run
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
cw.Run(ctx)

// Ensure restart was called exactly once
require.Len(t, restarter1.restarts, 1)
}

// TestCheckWatcher_HealthyWarning asserts checks in warning with
// ignore_warnings=true do not restart tasks.
func TestCheckWatcher_HealthyWarning(t *testing.T) {
Expand Down Expand Up @@ -327,3 +375,57 @@ func TestCheckWatcher_MultipleChecks(t *testing.T) {
t.Errorf("expected check 3 to not be restarted but found %d:\n%s", n, restarter3)
}
}

// TestCheckWatcher_Deadlock asserts that check watcher will not deadlock when
// attempting to restart a task even if its update queue is full.
// https://github.com/hashicorp/nomad/issues/5395
func TestCheckWatcher_Deadlock(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup(t)

// If TR.Restart blocks, restarting len(checkUpdateCh)+1 checks causes
// a deadlock due to checkWatcher.Run being blocked in
// checkRestart.apply and unable to process updates from the chan!
n := cap(cw.checkUpdateCh) + 1
checks := make([]*structs.ServiceCheck, n)
restarters := make([]*fakeCheckRestarter, n)
for i := 0; i < n; i++ {
c := testCheck()
r := newFakeCheckRestarter(cw,
fmt.Sprintf("alloc%d", i),
fmt.Sprintf("task%d", i),
fmt.Sprintf("check%d", i),
c,
)
checks[i] = c
restarters[i] = r
}

// Run
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go cw.Run(ctx)

// Watch
for _, r := range restarters {
cw.Watch(r.allocID, r.taskName, r.checkName, r.check, r)
}

// Make them all fail
for _, r := range restarters {
fakeAPI.add(r.checkName, "critical", time.Time{})
}

// Ensure that restart was called exactly once on all checks
testutil.WaitForResult(func() (bool, error) {
for _, r := range restarters {
if n := len(r.GetRestarts()); n != 1 {
return false, fmt.Errorf("expected 1 restart but found %d", n)
}
}
return true, nil
}, func(err error) {
require.NoError(t, err)
})
}

0 comments on commit 858d18d

Please sign in to comment.