Skip to content

Commit

Permalink
backport of commit c2dc1c5 (#17120)
Browse files Browse the repository at this point in the history
plus MinJob() from nomad/mock/job.go

Co-authored-by: Daniel Bennett <[email protected]>
  • Loading branch information
hc-github-team-nomad-core and gulducat authored May 9, 2023
1 parent a961fdc commit f4180fd
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 34 deletions.
3 changes: 3 additions & 0 deletions .changelog/17104.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
client: clean up resources upon failure to restore task during client restart
```
7 changes: 3 additions & 4 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/allocrunner/state"
Expand Down Expand Up @@ -344,17 +345,15 @@ func (ar *allocRunner) Run() {
ar.logger.Error("prerun failed", "error", err)

for _, tr := range ar.tasks {
tr.MarkFailedDead(fmt.Sprintf("failed to setup alloc: %v", err))
// emit event and mark task to be cleaned up during runTasks()
tr.MarkFailedKill(fmt.Sprintf("failed to setup alloc: %v", err))
}

goto POST
}
}

// Run the runners (blocks until they exit)
ar.runTasks()

POST:
if ar.isShuttingDown() {
return
}
Expand Down
4 changes: 4 additions & 0 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

multierror "github.com/hashicorp/go-multierror"

"github.com/hashicorp/nomad/client/allocrunner/interfaces"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/taskenv"
Expand Down Expand Up @@ -135,6 +136,9 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, ar.hookResources, ar.clientConfig.Node.SecretID),
newChecksHook(hookLogger, alloc, ar.checkStore, ar),
}
if config.ExtraAllocHooks != nil {
ar.runnerHooks = append(ar.runnerHooks, config.ExtraAllocHooks...)
}

return nil
}
Expand Down
118 changes: 118 additions & 0 deletions client/allocrunner/fail_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

// FailHook is designed to fail for testing purposes,
// so should never be included in a release.
//go:build !release

package allocrunner

import (
"errors"
"fmt"
"os"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/hcl/v2/hclsimple"

"github.com/hashicorp/nomad/client/allocrunner/interfaces"
)

var ErrFailHookError = errors.New("failed successfully")

func NewFailHook(l hclog.Logger, name string) *FailHook {
return &FailHook{
name: name,
logger: l.Named(name),
}
}

type FailHook struct {
name string
logger hclog.Logger
Fail struct {
Prerun bool `hcl:"prerun,optional"`
PreKill bool `hcl:"prekill,optional"`
Postrun bool `hcl:"postrun,optional"`
Destroy bool `hcl:"destroy,optional"`
Update bool `hcl:"update,optional"`
PreTaskRestart bool `hcl:"pretaskrestart,optional"`
Shutdown bool `hcl:"shutdown,optional"`
}
}

func (h *FailHook) Name() string {
return h.name
}

func (h *FailHook) LoadConfig(path string) *FailHook {
if _, err := os.Stat(path); os.IsNotExist(err) {
h.logger.Error("couldn't load config", "error", err)
return h
}
if err := hclsimple.DecodeFile(path, nil, &h.Fail); err != nil {
h.logger.Error("error parsing config", "path", path, "error", err)
}
return h
}

var _ interfaces.RunnerPrerunHook = &FailHook{}

func (h *FailHook) Prerun() error {
if h.Fail.Prerun {
return fmt.Errorf("prerun %w", ErrFailHookError)
}
return nil
}

var _ interfaces.RunnerPreKillHook = &FailHook{}

func (h *FailHook) PreKill() {
if h.Fail.PreKill {
h.logger.Error("prekill", "error", ErrFailHookError)
}
}

var _ interfaces.RunnerPostrunHook = &FailHook{}

func (h *FailHook) Postrun() error {
if h.Fail.Postrun {
return fmt.Errorf("postrun %w", ErrFailHookError)
}
return nil
}

var _ interfaces.RunnerDestroyHook = &FailHook{}

func (h *FailHook) Destroy() error {
if h.Fail.Destroy {
return fmt.Errorf("destroy %w", ErrFailHookError)
}
return nil
}

var _ interfaces.RunnerUpdateHook = &FailHook{}

func (h *FailHook) Update(request *interfaces.RunnerUpdateRequest) error {
if h.Fail.Update {
return fmt.Errorf("update %w", ErrFailHookError)
}
return nil
}

var _ interfaces.RunnerTaskRestartHook = &FailHook{}

func (h *FailHook) PreTaskRestart() error {
if h.Fail.PreTaskRestart {
return fmt.Errorf("destroy %w", ErrFailHookError)
}
return nil
}

var _ interfaces.ShutdownHook = &FailHook{}

func (h *FailHook) Shutdown() {
if h.Fail.Shutdown {
h.logger.Error("shutdown", "error", ErrFailHookError)
}
}
33 changes: 12 additions & 21 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"sync"
"time"

"github.com/hashicorp/nomad/client/lib/cgutil"
"golang.org/x/exp/slices"

metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/hcl/v2/hcldec"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts"
Expand All @@ -24,6 +24,7 @@ import (
"github.com/hashicorp/nomad/client/devicemanager"
"github.com/hashicorp/nomad/client/dynamicplugins"
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
"github.com/hashicorp/nomad/client/serviceregistration"
Expand Down Expand Up @@ -492,30 +493,20 @@ func (tr *TaskRunner) initLabels() {
}
}

// MarkFailedDead marks a task as failed and not to run. Aimed to be invoked
// when alloc runner prestart hooks failed. Should never be called with Run().
func (tr *TaskRunner) MarkFailedDead(reason string) {
defer close(tr.waitCh)

tr.stateLock.Lock()
if err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState); err != nil {
//TODO Nomad will be unable to restore this task; try to kill
// it now and fail? In general we prefer to leave running
// tasks running even if the agent encounters an error.
tr.logger.Warn("error persisting local failed task state; may be unable to restore after a Nomad restart",
"error", err)
}
tr.stateLock.Unlock()

// MarkFailedKill marks a task as failed and should be killed.
// It should be invoked when alloc runner prestart hooks fail.
// Afterwards, Run() will perform any necessary cleanup.
func (tr *TaskRunner) MarkFailedKill(reason string) {
// Emit an event that fails the task and gives reasons for humans.
event := structs.NewTaskEvent(structs.TaskSetupFailure).
SetKillReason(structs.TaskRestoreFailed).
SetDisplayMessage(reason).
SetFailsTask()
tr.UpdateState(structs.TaskStateDead, event)
tr.EmitEvent(event)

// Run the stop hooks in case task was a restored task that failed prestart
if err := tr.stop(); err != nil {
tr.logger.Error("stop failed while marking task dead", "error", err)
}
// Cancel kill context, so later when allocRunner runs tr.Run(),
// we'll follow the usual kill path and do all the appropriate cleanup steps.
tr.killCtxCancel()
}

// Run the TaskRunner. Starts the user's task or reattaches to a restored task.
Expand Down
64 changes: 61 additions & 3 deletions client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import (
"time"

"github.com/golang/snappy"
"github.com/kr/pretty"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
Expand All @@ -38,9 +44,6 @@ import (
"github.com/hashicorp/nomad/plugins/device"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type MockTaskStateUpdater struct {
Expand Down Expand Up @@ -658,6 +661,61 @@ func TestTaskRunner_Restore_System(t *testing.T) {
})
}

// TestTaskRunner_MarkFailedKill asserts that MarkFailedKill marks the task as failed
// and cancels the killCtx so a subsequent Run() will do any necessary task cleanup.
func TestTaskRunner_MarkFailedKill(t *testing.T) {
ci.Parallel(t)

// set up some taskrunner
alloc := mock.MinAlloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
t.Cleanup(cleanup)
tr, err := NewTaskRunner(conf)
must.NoError(t, err)

// side quest: set this lifecycle coordination channel,
// so early in tr MAIN, it doesn't randomly follow that route.
// test config creates this already closed, but not so in real life.
startCh := make(chan struct{})
t.Cleanup(func() { close(startCh) })
tr.startConditionMetCh = startCh

// function under test: should mark the task as failed and cancel kill context
reason := "because i said so"
tr.MarkFailedKill(reason)

// explicitly check kill context.
select {
case <-tr.killCtx.Done():
default:
t.Fatal("kill context should be done")
}

// Run() should now follow the kill path.
go tr.Run()

select { // it should finish up very quickly
case <-tr.WaitCh():
case <-time.After(time.Second):
t.Error("task not killed (or not as fast as expected)")
}

// check state for expected values and events
state := tr.TaskState()

// this gets set directly by MarkFailedKill()
test.True(t, state.Failed, test.Sprint("task should have failed"))
// this is set in Run()
test.Eq(t, structs.TaskStateDead, state.State, test.Sprint("task should be dead"))
// reason "because i said so" should be a task event message
foundMessages := make(map[string]bool)
for _, event := range state.Events {
foundMessages[event.DisplayMessage] = true
}
test.True(t, foundMessages[reason], test.Sprintf("expected '%s' in events: %#v", reason, foundMessages))
}

// TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are
// interpolated.
func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) {
Expand Down
Loading

0 comments on commit f4180fd

Please sign in to comment.