Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consul: fix deadlock in check-based restarts #5975

Merged
merged 3 commits into from
Jul 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions command/agent/consul/check_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,34 @@ func (c *checkRestart) apply(ctx context.Context, now time.Time, status string)
c.logger.Debug("restarting due to unhealthy check")

// Tell TaskRunner to restart due to failure
const failure = true
reason := fmt.Sprintf("healthcheck: check %q unhealthy", c.checkName)
event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reason)
err := c.task.Restart(ctx, event, failure)
if err != nil {
// Error restarting
return false
}
go asyncRestart(ctx, c.logger, c.task, event)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: does it make sense to use a semaphore or channel blocking technique used elsewhere, so we don't call task.Restart concurrently and if we get a spike of Restarts applies, we only restart once?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question!

The checkWatcher.Run loop removes a check after Restart is called, so the same task won't be restarted more than once (until it completes the restart and re-registers the check).

In 0.8 TR.Restart just ticked a chan and so was async without having to create a new goroutine. This seemed like the least risky way of replicating that behavior.

Tasks that fail in a tight loop (check_restart.grace=0 and restart.delay=0) could in theory spin up lots of goroutines, but the goroutines for a single task should rarely if ever overlap and restarting a task already involves creating a lot of resources more expensive than a goroutine.

That being said I hate this "fire and forget" pattern, so I'm open to ideas as long as they can't block checkWatcher.Run/checkRestart.apply. (checkWatcher should probably be refactored to separate Watch/Unwatch mutations from check watching, but that seemed way too big a risk for a point release)

return true
}

return false
}

// asyncRestart mimics the pre-0.9 TaskRunner.Restart behavior and is intended
// to be called in a goroutine.
func asyncRestart(ctx context.Context, logger log.Logger, task TaskRestarter, event *structs.TaskEvent) {
// Check watcher restarts are always failures
const failure = true

// Restarting is asynchronous so there's no reason to allow this
// goroutine to block indefinitely.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

if err := task.Restart(ctx, event, failure); err != nil {
// Restart errors are not actionable and only relevant when
// debugging allocation lifecycle management.
logger.Debug("failed to restart task", "error", err,
"event_time", event.Time, "event_type", event.Type)
}
}

// checkWatchUpdates add or remove checks from the watcher
type checkWatchUpdate struct {
checkID string
Expand Down
102 changes: 102 additions & 0 deletions command/agent/consul/check_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package consul
import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

// checkRestartRecord is used by a testFakeCtx to record when restarts occur
Expand All @@ -33,6 +36,8 @@ type fakeCheckRestarter struct {
allocID string
taskName string
checkName string

mu sync.Mutex
}

// newFakeCheckRestart creates a new TaskRestarter. It needs all of the
Expand All @@ -53,6 +58,8 @@ func newFakeCheckRestarter(w *checkWatcher, allocID, taskName, checkName string,
// Restarts are recorded in the []restarts field and re-Watch the check.
//func (c *fakeCheckRestarter) Restart(source, reason string, failure bool) {
func (c *fakeCheckRestarter) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
c.mu.Lock()
defer c.mu.Unlock()
restart := checkRestartRecord{
timestamp: time.Now(),
source: event.Type,
Expand All @@ -68,13 +75,26 @@ func (c *fakeCheckRestarter) Restart(ctx context.Context, event *structs.TaskEve

// String for debugging
func (c *fakeCheckRestarter) String() string {
c.mu.Lock()
defer c.mu.Unlock()

s := fmt.Sprintf("%s %s %s restarts:\n", c.allocID, c.taskName, c.checkName)
for _, r := range c.restarts {
s += fmt.Sprintf("%s - %s: %s (failure: %t)\n", r.timestamp, r.source, r.reason, r.failure)
}
return s
}

// GetRestarts for testing in a threadsafe way
func (c *fakeCheckRestarter) GetRestarts() []checkRestartRecord {
c.mu.Lock()
defer c.mu.Unlock()

o := make([]checkRestartRecord, len(c.restarts))
copy(o, c.restarts)
return o
}

// checkResponse is a response returned by the fakeChecksAPI after the given
// time.
type checkResponse struct {
Expand All @@ -88,6 +108,8 @@ type fakeChecksAPI struct {
// responses is a map of check ids to their status at a particular
// time. checkResponses must be in chronological order.
responses map[string][]checkResponse

mu sync.Mutex
}

func newFakeChecksAPI() *fakeChecksAPI {
Expand All @@ -96,10 +118,14 @@ func newFakeChecksAPI() *fakeChecksAPI {

// add a new check status to Consul at the given time.
func (c *fakeChecksAPI) add(id, status string, at time.Time) {
c.mu.Lock()
c.responses[id] = append(c.responses[id], checkResponse{at, id, status})
c.mu.Unlock()
}

func (c *fakeChecksAPI) Checks() (map[string]*api.AgentCheck, error) {
c.mu.Lock()
defer c.mu.Unlock()
now := time.Now()
result := make(map[string]*api.AgentCheck, len(c.responses))

Expand Down Expand Up @@ -194,6 +220,28 @@ func TestCheckWatcher_Healthy(t *testing.T) {
}
}

// TestCheckWatcher_Unhealthy asserts unhealthy tasks are restarted exactly once.
func TestCheckWatcher_Unhealthy(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm missing some context for this test - does it trigger the deadlock issue here? Is it a relatively easy thing to test for?

Copy link
Member Author

@schmichael schmichael Jul 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this test just asserts checks are only restarted once.

I added a new test for the deadlock in 1763672 and confirmed it does cause the deadlock before my changes.

t.Parallel()

fakeAPI, cw := testWatcherSetup(t)

check1 := testCheck()
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)

// Check has always been failing
fakeAPI.add("testcheck1", "critical", time.Time{})

// Run
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
cw.Run(ctx)

// Ensure restart was called exactly once
require.Len(t, restarter1.restarts, 1)
}

// TestCheckWatcher_HealthyWarning asserts checks in warning with
// ignore_warnings=true do not restart tasks.
func TestCheckWatcher_HealthyWarning(t *testing.T) {
Expand Down Expand Up @@ -327,3 +375,57 @@ func TestCheckWatcher_MultipleChecks(t *testing.T) {
t.Errorf("expected check 3 to not be restarted but found %d:\n%s", n, restarter3)
}
}

// TestCheckWatcher_Deadlock asserts that check watcher will not deadlock when
// attempting to restart a task even if its update queue is full.
// https://github.com/hashicorp/nomad/issues/5395
func TestCheckWatcher_Deadlock(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup(t)

// If TR.Restart blocks, restarting len(checkUpdateCh)+1 checks causes
// a deadlock due to checkWatcher.Run being blocked in
// checkRestart.apply and unable to process updates from the chan!
n := cap(cw.checkUpdateCh) + 1
checks := make([]*structs.ServiceCheck, n)
restarters := make([]*fakeCheckRestarter, n)
for i := 0; i < n; i++ {
c := testCheck()
r := newFakeCheckRestarter(cw,
fmt.Sprintf("alloc%d", i),
fmt.Sprintf("task%d", i),
fmt.Sprintf("check%d", i),
c,
)
checks[i] = c
restarters[i] = r
}

// Run
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go cw.Run(ctx)

// Watch
for _, r := range restarters {
cw.Watch(r.allocID, r.taskName, r.checkName, r.check, r)
}

// Make them all fail
for _, r := range restarters {
fakeAPI.add(r.checkName, "critical", time.Time{})
}

// Ensure that restart was called exactly once on all checks
testutil.WaitForResult(func() (bool, error) {
for _, r := range restarters {
if n := len(r.GetRestarts()); n != 1 {
return false, fmt.Errorf("expected 1 restart but found %d", n)
}
}
return true, nil
}, func(err error) {
require.NoError(t, err)
})
}