Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix task leak during client restore when allocrunner prerun hook fails #17104

Merged
merged 1 commit into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/17104.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
client: clean up resources upon failure to restore task during client restart
```
7 changes: 3 additions & 4 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/allocrunner/state"
Expand Down Expand Up @@ -347,17 +348,15 @@ func (ar *allocRunner) Run() {
ar.logger.Error("prerun failed", "error", err)

for _, tr := range ar.tasks {
tr.MarkFailedDead(fmt.Sprintf("failed to setup alloc: %v", err))
// emit event and mark task to be cleaned up during runTasks()
tr.MarkFailedKill(fmt.Sprintf("failed to setup alloc: %v", err))
}

goto POST
}
}

// Run the runners (blocks until they exit)
ar.runTasks()

POST:
if ar.isShuttingDown() {
return
}
Expand Down
4 changes: 4 additions & 0 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

multierror "github.com/hashicorp/go-multierror"

"github.com/hashicorp/nomad/client/allocrunner/interfaces"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/taskenv"
Expand Down Expand Up @@ -138,6 +139,9 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, ar.hookResources, ar.clientConfig.Node.SecretID),
newChecksHook(hookLogger, alloc, ar.checkStore, ar),
}
if config.ExtraAllocHooks != nil {
ar.runnerHooks = append(ar.runnerHooks, config.ExtraAllocHooks...)
}

return nil
}
Expand Down
118 changes: 118 additions & 0 deletions client/allocrunner/fail_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

// FailHook is designed to fail for testing purposes,
// so should never be included in a release.
//go:build !release

package allocrunner

import (
"errors"
"fmt"
"os"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/hcl/v2/hclsimple"

"github.com/hashicorp/nomad/client/allocrunner/interfaces"
)

var ErrFailHookError = errors.New("failed successfully")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆


func NewFailHook(l hclog.Logger, name string) *FailHook {
return &FailHook{
name: name,
logger: l.Named(name),
}
}

type FailHook struct {
name string
logger hclog.Logger
Fail struct {
Prerun bool `hcl:"prerun,optional"`
PreKill bool `hcl:"prekill,optional"`
Postrun bool `hcl:"postrun,optional"`
Destroy bool `hcl:"destroy,optional"`
Update bool `hcl:"update,optional"`
PreTaskRestart bool `hcl:"pretaskrestart,optional"`
Shutdown bool `hcl:"shutdown,optional"`
}
}

func (h *FailHook) Name() string {
return h.name
}

func (h *FailHook) LoadConfig(path string) *FailHook {
if _, err := os.Stat(path); os.IsNotExist(err) {
h.logger.Error("couldn't load config", "error", err)
return h
}
if err := hclsimple.DecodeFile(path, nil, &h.Fail); err != nil {
h.logger.Error("error parsing config", "path", path, "error", err)
}
return h
}

var _ interfaces.RunnerPrerunHook = &FailHook{}

func (h *FailHook) Prerun() error {
if h.Fail.Prerun {
return fmt.Errorf("prerun %w", ErrFailHookError)
}
return nil
}

var _ interfaces.RunnerPreKillHook = &FailHook{}

func (h *FailHook) PreKill() {
if h.Fail.PreKill {
h.logger.Error("prekill", "error", ErrFailHookError)
}
}

var _ interfaces.RunnerPostrunHook = &FailHook{}

func (h *FailHook) Postrun() error {
if h.Fail.Postrun {
return fmt.Errorf("postrun %w", ErrFailHookError)
}
return nil
}

var _ interfaces.RunnerDestroyHook = &FailHook{}

func (h *FailHook) Destroy() error {
if h.Fail.Destroy {
return fmt.Errorf("destroy %w", ErrFailHookError)
}
return nil
}

var _ interfaces.RunnerUpdateHook = &FailHook{}

func (h *FailHook) Update(request *interfaces.RunnerUpdateRequest) error {
if h.Fail.Update {
return fmt.Errorf("update %w", ErrFailHookError)
}
return nil
}

var _ interfaces.RunnerTaskRestartHook = &FailHook{}

func (h *FailHook) PreTaskRestart() error {
if h.Fail.PreTaskRestart {
return fmt.Errorf("destroy %w", ErrFailHookError)
}
return nil
}

var _ interfaces.ShutdownHook = &FailHook{}

func (h *FailHook) Shutdown() {
if h.Fail.Shutdown {
h.logger.Error("shutdown", "error", ErrFailHookError)
}
}
33 changes: 12 additions & 21 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
"sync"
"time"

"github.com/hashicorp/nomad/client/lib/cgutil"
"golang.org/x/exp/slices"

metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/hcl/v2/hcldec"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts"
Expand All @@ -27,6 +27,7 @@ import (
"github.com/hashicorp/nomad/client/devicemanager"
"github.com/hashicorp/nomad/client/dynamicplugins"
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
"github.com/hashicorp/nomad/client/serviceregistration"
Expand Down Expand Up @@ -495,30 +496,20 @@ func (tr *TaskRunner) initLabels() {
}
}

// MarkFailedDead marks a task as failed and not to run. Aimed to be invoked
// when alloc runner prestart hooks failed. Should never be called with Run().
func (tr *TaskRunner) MarkFailedDead(reason string) {
defer close(tr.waitCh)

tr.stateLock.Lock()
if err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState); err != nil {
//TODO Nomad will be unable to restore this task; try to kill
// it now and fail? In general we prefer to leave running
// tasks running even if the agent encounters an error.
tr.logger.Warn("error persisting local failed task state; may be unable to restore after a Nomad restart",
"error", err)
}
tr.stateLock.Unlock()
tgross marked this conversation as resolved.
Show resolved Hide resolved

// MarkFailedKill marks a task as failed and should be killed.
// It should be invoked when alloc runner prestart hooks fail.
// Afterwards, Run() will perform any necessary cleanup.
func (tr *TaskRunner) MarkFailedKill(reason string) {
// Emit an event that fails the task and gives reasons for humans.
event := structs.NewTaskEvent(structs.TaskSetupFailure).
SetKillReason(structs.TaskRestoreFailed).
SetDisplayMessage(reason).
SetFailsTask()
tr.UpdateState(structs.TaskStateDead, event)
tr.EmitEvent(event)

// Run the stop hooks in case task was a restored task that failed prestart
if err := tr.stop(); err != nil {
tr.logger.Error("stop failed while marking task dead", "error", err)
}
// Cancel kill context, so later when allocRunner runs tr.Run(),
// we'll follow the usual kill path and do all the appropriate cleanup steps.
tr.killCtxCancel()
}

// Run the TaskRunner. Starts the user's task or reattaches to a restored task.
Expand Down
65 changes: 61 additions & 4 deletions client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ import (
"time"

"github.com/golang/snappy"
"github.com/kr/pretty"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
Expand All @@ -41,10 +47,6 @@ import (
"github.com/hashicorp/nomad/plugins/device"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type MockTaskStateUpdater struct {
Expand Down Expand Up @@ -662,6 +664,61 @@ func TestTaskRunner_Restore_System(t *testing.T) {
})
}

// TestTaskRunner_MarkFailedKill asserts that MarkFailedKill marks the task as failed
// and cancels the killCtx so a subsequent Run() will do any necessary task cleanup.
func TestTaskRunner_MarkFailedKill(t *testing.T) {
ci.Parallel(t)

// set up some taskrunner
alloc := mock.MinAlloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
t.Cleanup(cleanup)
tr, err := NewTaskRunner(conf)
must.NoError(t, err)

// side quest: set this lifecycle coordination channel,
// so early in tr MAIN, it doesn't randomly follow that route.
// test config creates this already closed, but not so in real life.
startCh := make(chan struct{})
t.Cleanup(func() { close(startCh) })
tr.startConditionMetCh = startCh

// function under test: should mark the task as failed and cancel kill context
reason := "because i said so"
tr.MarkFailedKill(reason)

// explicitly check kill context.
select {
case <-tr.killCtx.Done():
default:
t.Fatal("kill context should be done")
}

// Run() should now follow the kill path.
go tr.Run()

select { // it should finish up very quickly
case <-tr.WaitCh():
case <-time.After(time.Second):
t.Error("task not killed (or not as fast as expected)")
}

// check state for expected values and events
state := tr.TaskState()

// this gets set directly by MarkFailedKill()
test.True(t, state.Failed, test.Sprint("task should have failed"))
// this is set in Run()
test.Eq(t, structs.TaskStateDead, state.State, test.Sprint("task should be dead"))
// reason "because i said so" should be a task event message
foundMessages := make(map[string]bool)
for _, event := range state.Events {
foundMessages[event.DisplayMessage] = true
}
test.True(t, foundMessages[reason], test.Sprintf("expected '%s' in events: %#v", reason, foundMessages))
Comment on lines +715 to +719
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Map contains value that meets this condition" seems like it'd be nice new frequently-used assertion for @shoenig's test library. But out of scope for this PR.

}

// TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are
// interpolated.
func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) {
Expand Down
Loading