From 324702a287e03771d944522ee3a55411785a61ee Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Mon, 8 May 2023 15:24:48 -0500 Subject: [PATCH] backport of commit c2dc1c58dda2c926bf1b8acb35944202bcee8c8c plus MinJob() from nomad/mock/job.go --- .changelog/17104.txt | 3 + client/allocrunner/alloc_runner.go | 7 +- client/allocrunner/alloc_runner_hooks.go | 4 + client/allocrunner/fail_hook.go | 118 ++++++++++++++++++ client/allocrunner/taskrunner/task_runner.go | 33 ++--- .../taskrunner/task_runner_test.go | 64 +++++++++- client/client_test.go | 95 +++++++++++++- client/config/config.go | 10 +- nomad/mock/alloc.go | 30 +++++ nomad/mock/job.go | 32 +++++ testutil/wait.go | 4 +- 11 files changed, 366 insertions(+), 34 deletions(-) create mode 100644 .changelog/17104.txt create mode 100644 client/allocrunner/fail_hook.go diff --git a/.changelog/17104.txt b/.changelog/17104.txt new file mode 100644 index 00000000000..8df9033cbd0 --- /dev/null +++ b/.changelog/17104.txt @@ -0,0 +1,3 @@ +```release-note:bug +client: clean up resources upon failure to restore task during client restart +``` diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 80224b0ecc0..9de657b788e 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -8,6 +8,7 @@ import ( log "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/allocrunner/state" @@ -344,17 +345,15 @@ func (ar *allocRunner) Run() { ar.logger.Error("prerun failed", "error", err) for _, tr := range ar.tasks { - tr.MarkFailedDead(fmt.Sprintf("failed to setup alloc: %v", err)) + // emit event and mark task to be cleaned up during runTasks() + tr.MarkFailedKill(fmt.Sprintf("failed to setup alloc: %v", err)) } - - goto POST } } // Run the runners (blocks until they exit) ar.runTasks() -POST: if ar.isShuttingDown() { return } diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 5e59c659e40..2288f697fce 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -5,6 +5,7 @@ import ( "time" multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" clientconfig "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/taskenv" @@ -135,6 +136,9 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, ar.hookResources, ar.clientConfig.Node.SecretID), newChecksHook(hookLogger, alloc, ar.checkStore, ar), } + if config.ExtraAllocHooks != nil { + ar.runnerHooks = append(ar.runnerHooks, config.ExtraAllocHooks...) + } return nil } diff --git a/client/allocrunner/fail_hook.go b/client/allocrunner/fail_hook.go new file mode 100644 index 00000000000..abe5db3f85d --- /dev/null +++ b/client/allocrunner/fail_hook.go @@ -0,0 +1,118 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +// FailHook is designed to fail for testing purposes, +// so should never be included in a release. +//go:build !release + +package allocrunner + +import ( + "errors" + "fmt" + "os" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/hcl/v2/hclsimple" + + "github.com/hashicorp/nomad/client/allocrunner/interfaces" +) + +var ErrFailHookError = errors.New("failed successfully") + +func NewFailHook(l hclog.Logger, name string) *FailHook { + return &FailHook{ + name: name, + logger: l.Named(name), + } +} + +type FailHook struct { + name string + logger hclog.Logger + Fail struct { + Prerun bool `hcl:"prerun,optional"` + PreKill bool `hcl:"prekill,optional"` + Postrun bool `hcl:"postrun,optional"` + Destroy bool `hcl:"destroy,optional"` + Update bool `hcl:"update,optional"` + PreTaskRestart bool `hcl:"pretaskrestart,optional"` + Shutdown bool `hcl:"shutdown,optional"` + } +} + +func (h *FailHook) Name() string { + return h.name +} + +func (h *FailHook) LoadConfig(path string) *FailHook { + if _, err := os.Stat(path); os.IsNotExist(err) { + h.logger.Error("couldn't load config", "error", err) + return h + } + if err := hclsimple.DecodeFile(path, nil, &h.Fail); err != nil { + h.logger.Error("error parsing config", "path", path, "error", err) + } + return h +} + +var _ interfaces.RunnerPrerunHook = &FailHook{} + +func (h *FailHook) Prerun() error { + if h.Fail.Prerun { + return fmt.Errorf("prerun %w", ErrFailHookError) + } + return nil +} + +var _ interfaces.RunnerPreKillHook = &FailHook{} + +func (h *FailHook) PreKill() { + if h.Fail.PreKill { + h.logger.Error("prekill", "error", ErrFailHookError) + } +} + +var _ interfaces.RunnerPostrunHook = &FailHook{} + +func (h *FailHook) Postrun() error { + if h.Fail.Postrun { + return fmt.Errorf("postrun %w", ErrFailHookError) + } + return nil +} + +var _ interfaces.RunnerDestroyHook = &FailHook{} + +func (h *FailHook) Destroy() error { + if h.Fail.Destroy { + return fmt.Errorf("destroy %w", ErrFailHookError) + } + return nil +} + +var _ interfaces.RunnerUpdateHook = &FailHook{} + +func (h *FailHook) Update(request *interfaces.RunnerUpdateRequest) error { + if h.Fail.Update { + return fmt.Errorf("update %w", ErrFailHookError) + } + return nil +} + +var _ interfaces.RunnerTaskRestartHook = &FailHook{} + +func (h *FailHook) PreTaskRestart() error { + if h.Fail.PreTaskRestart { + return fmt.Errorf("destroy %w", ErrFailHookError) + } + return nil +} + +var _ interfaces.ShutdownHook = &FailHook{} + +func (h *FailHook) Shutdown() { + if h.Fail.Shutdown { + h.logger.Error("shutdown", "error", ErrFailHookError) + } +} diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 52402af0ca9..e3c506d70ad 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -8,13 +8,13 @@ import ( "sync" "time" - "github.com/hashicorp/nomad/client/lib/cgutil" "golang.org/x/exp/slices" metrics "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/hcl/v2/hcldec" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts" @@ -24,6 +24,7 @@ import ( "github.com/hashicorp/nomad/client/devicemanager" "github.com/hashicorp/nomad/client/dynamicplugins" cinterfaces "github.com/hashicorp/nomad/client/interfaces" + "github.com/hashicorp/nomad/client/lib/cgutil" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" "github.com/hashicorp/nomad/client/serviceregistration" @@ -492,30 +493,20 @@ func (tr *TaskRunner) initLabels() { } } -// MarkFailedDead marks a task as failed and not to run. Aimed to be invoked -// when alloc runner prestart hooks failed. Should never be called with Run(). -func (tr *TaskRunner) MarkFailedDead(reason string) { - defer close(tr.waitCh) - - tr.stateLock.Lock() - if err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState); err != nil { - //TODO Nomad will be unable to restore this task; try to kill - // it now and fail? In general we prefer to leave running - // tasks running even if the agent encounters an error. - tr.logger.Warn("error persisting local failed task state; may be unable to restore after a Nomad restart", - "error", err) - } - tr.stateLock.Unlock() - +// MarkFailedKill marks a task as failed and should be killed. +// It should be invoked when alloc runner prestart hooks fail. +// Afterwards, Run() will perform any necessary cleanup. +func (tr *TaskRunner) MarkFailedKill(reason string) { + // Emit an event that fails the task and gives reasons for humans. event := structs.NewTaskEvent(structs.TaskSetupFailure). + SetKillReason(structs.TaskRestoreFailed). SetDisplayMessage(reason). SetFailsTask() - tr.UpdateState(structs.TaskStateDead, event) + tr.EmitEvent(event) - // Run the stop hooks in case task was a restored task that failed prestart - if err := tr.stop(); err != nil { - tr.logger.Error("stop failed while marking task dead", "error", err) - } + // Cancel kill context, so later when allocRunner runs tr.Run(), + // we'll follow the usual kill path and do all the appropriate cleanup steps. + tr.killCtxCancel() } // Run the TaskRunner. Starts the user's task or reattaches to a restored task. diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index fc67f727305..a6e2745b679 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -13,6 +13,12 @@ import ( "time" "github.com/golang/snappy" + "github.com/kr/pretty" + "github.com/shoenig/test" + "github.com/shoenig/test/must" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" @@ -38,9 +44,6 @@ import ( "github.com/hashicorp/nomad/plugins/device" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/testutil" - "github.com/kr/pretty" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) type MockTaskStateUpdater struct { @@ -658,6 +661,61 @@ func TestTaskRunner_Restore_System(t *testing.T) { }) } +// TestTaskRunner_MarkFailedKill asserts that MarkFailedKill marks the task as failed +// and cancels the killCtx so a subsequent Run() will do any necessary task cleanup. +func TestTaskRunner_MarkFailedKill(t *testing.T) { + ci.Parallel(t) + + // set up some taskrunner + alloc := mock.MinAlloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + t.Cleanup(cleanup) + tr, err := NewTaskRunner(conf) + must.NoError(t, err) + + // side quest: set this lifecycle coordination channel, + // so early in tr MAIN, it doesn't randomly follow that route. + // test config creates this already closed, but not so in real life. + startCh := make(chan struct{}) + t.Cleanup(func() { close(startCh) }) + tr.startConditionMetCh = startCh + + // function under test: should mark the task as failed and cancel kill context + reason := "because i said so" + tr.MarkFailedKill(reason) + + // explicitly check kill context. + select { + case <-tr.killCtx.Done(): + default: + t.Fatal("kill context should be done") + } + + // Run() should now follow the kill path. + go tr.Run() + + select { // it should finish up very quickly + case <-tr.WaitCh(): + case <-time.After(time.Second): + t.Error("task not killed (or not as fast as expected)") + } + + // check state for expected values and events + state := tr.TaskState() + + // this gets set directly by MarkFailedKill() + test.True(t, state.Failed, test.Sprint("task should have failed")) + // this is set in Run() + test.Eq(t, structs.TaskStateDead, state.State, test.Sprint("task should be dead")) + // reason "because i said so" should be a task event message + foundMessages := make(map[string]bool) + for _, event := range state.Events { + foundMessages[event.DisplayMessage] = true + } + test.True(t, foundMessages[reason], test.Sprintf("expected '%s' in events: %#v", reason, foundMessages)) +} + // TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are // interpolated. func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) { diff --git a/client/client_test.go b/client/client_test.go index c9b4b6fb98b..f6df9e75b01 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -11,7 +11,14 @@ import ( "time" memdb "github.com/hashicorp/go-memdb" + "github.com/shoenig/test" + "github.com/shoenig/test/must" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/client/allocrunner" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/fingerprint" @@ -30,8 +37,6 @@ import ( "github.com/hashicorp/nomad/plugins/device" psstructs "github.com/hashicorp/nomad/plugins/shared/structs" "github.com/hashicorp/nomad/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func testACLServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string, *structs.ACLToken, func()) { @@ -1793,3 +1798,89 @@ func TestClient_ReconnectAllocs(t *testing.T) { require.False(t, invalid, "expected alloc to not be marked invalid") require.Equal(t, unknownAlloc.AllocModifyIndex, finalAlloc.AllocModifyIndex) } + +// TestClient_AllocPrerunErrorDuringRestore ensures that a running allocation, +// which fails Prerun during Restore on client restart, should be killed. +func TestClient_AllocPrerunErrorDuringRestore(t *testing.T) { + ci.Parallel(t) + + logger := testlog.HCLogger(t) + + // set up server + server, _, cleanS1 := testServer(t, nil) + t.Cleanup(cleanS1) + testutil.WaitForLeader(t, server.RPC) + + // set up first client, which will initially start the job cleanly + c1, cleanC1 := TestClient(t, func(c *config.Config) { + c.DevMode = false // so state persists to client 2 + c.RPCHandler = server + }) + t.Cleanup(func() { + test.NoError(t, cleanC1()) + }) + waitTilNodeReady(c1, t) + + // register a happy job to run until we cause it to fail + job := mock.MinJob() + testutil.RegisterJob(t, server.RPC, job) + + // wait for our alloc to be running + testutil.WaitForJobAllocStatus(t, server.RPC, job, map[string]int{ + structs.AllocClientStatusRunning: 1, + }) + t.Logf("job %s allocs running 👍", job.ID) + + // stop client 1, shutdown will dump state to disk but leave allocs running + must.NoError(t, c1.Shutdown()) + + // make a new client, using parts from the old one to be able to restore state + restoreClient := func() { + conf := c1.config.Copy() + // we want the prerun hook to fail + hook := allocrunner.NewFailHook(logger, t.Name()) + hook.Fail.Prerun = true + conf.ExtraAllocHooks = []interfaces.RunnerHook{hook} + + // this is so in-memory driver handles from client 1 can be restored by client 2 + conf.PluginSingletonLoader = singleton.NewSingletonLoader(logger, c1.config.PluginLoader) + + // actually make and start the client + c2, err := NewClient(conf, c1.consulCatalog, nil, c1.consulService, nil) + must.NoError(t, err) + t.Cleanup(func() { + test.NoError(t, c2.Shutdown()) + }) + } + restoreClient() + + // wait for the client to pick up the alloc and fail prerun hook + testutil.WaitForJobAllocStatus(t, server.RPC, job, map[string]int{ + structs.AllocClientStatusFailed: 1, + }) + t.Logf("job %s allocs failed 👍", job.ID) + + // ok, final assertions + allocs, err := server.State().AllocsByJob(nil, job.Namespace, job.ID, true) + must.NoError(t, err) + + ts := allocs[0].TaskStates["t"] + test.True(t, ts.Failed) + test.Eq(t, structs.TaskStateDead, ts.State) + + expectEvents := []string{ + // initial successful setup + structs.TaskReceived, + structs.TaskSetup, + structs.TaskStarted, + // after prerun error during restore + structs.TaskSetupFailure, + structs.TaskTerminated, // this whole test is to ensure this happens. + } + var actual []string + for _, event := range ts.Events { + actual = append(actual, event.Type) + } + must.Eq(t, expectEvents, actual) + test.StrContains(t, ts.Events[3].DisplayMessage, allocrunner.ErrFailHookError.Error()) +} diff --git a/client/config/config.go b/client/config/config.go index 8ff6e986104..b5c38df0b6e 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -11,13 +11,14 @@ import ( "time" "github.com/hashicorp/consul-template/config" - "github.com/hashicorp/nomad/client/lib/cgutil" - "github.com/hashicorp/nomad/command/agent/host" + log "github.com/hashicorp/go-hclog" "golang.org/x/exp/maps" "golang.org/x/exp/slices" - log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/lib/cgutil" "github.com/hashicorp/nomad/client/state" + "github.com/hashicorp/nomad/command/agent/host" "github.com/hashicorp/nomad/helper/bufconndialer" "github.com/hashicorp/nomad/helper/pluginutils/loader" "github.com/hashicorp/nomad/helper/pointer" @@ -299,6 +300,9 @@ type Config struct { // Artifact configuration from the agent's config file. Artifact *ArtifactConfig + + // ExtraAllocHooks are run with other allocation hooks, mainly for testing. + ExtraAllocHooks []interfaces.RunnerHook } // ClientTemplateConfig is configuration on the client specific to template diff --git a/nomad/mock/alloc.go b/nomad/mock/alloc.go index fe99ee73bb3..8e66a26bd48 100644 --- a/nomad/mock/alloc.go +++ b/nomad/mock/alloc.go @@ -83,6 +83,36 @@ func Alloc() *structs.Allocation { return alloc } +func MinAlloc() *structs.Allocation { + job := MinJob() + group := job.TaskGroups[0] + task := group.Tasks[0] + return &structs.Allocation{ + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: uuid.Generate(), + Job: job, + TaskGroup: group.Name, + ClientStatus: structs.AllocClientStatusPending, + DesiredStatus: structs.AllocDesiredStatusRun, + AllocatedResources: &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + task.Name: { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 100, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 256, + }, + }, + }, + Shared: structs.AllocatedSharedResources{ + DiskMB: 150, + }, + }, + } +} + func AllocWithoutReservedPort() *structs.Allocation { alloc := Alloc() alloc.Resources.Networks[0].ReservedPorts = nil diff --git a/nomad/mock/job.go b/nomad/mock/job.go index 3422c7e56f0..ddd296a8da0 100644 --- a/nomad/mock/job.go +++ b/nomad/mock/job.go @@ -122,6 +122,38 @@ func Job() *structs.Job { return job } +// MinJob returns a minimal service job with a mock driver task. +func MinJob() *structs.Job { + job := &structs.Job{ + ID: "j" + uuid.Short(), + Name: "j", + Region: "global", + Type: "service", + TaskGroups: []*structs.TaskGroup{ + { + Name: "g", + Count: 1, + Tasks: []*structs.Task{ + { + Name: "t", + Driver: "mock_driver", + Config: map[string]any{ + // An empty config actually causes an error, so set a reasonably + // long run_for duration. + "run_for": "10m", + }, + LogConfig: structs.DefaultLogConfig(), + }, + }, + }, + }, + Datacenters: []string{"dc1"}, + Priority: 50, + } + job.Canonicalize() + return job +} + func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) { job := Job() policy := &structs.ScalingPolicy{ diff --git a/testutil/wait.go b/testutil/wait.go index 85446e7e2a4..4c7b2eae716 100644 --- a/testutil/wait.go +++ b/testutil/wait.go @@ -7,11 +7,12 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/hashicorp/nomad/nomad/structs" "github.com/kr/pretty" "github.com/shoenig/test/must" "github.com/shoenig/test/wait" "github.com/stretchr/testify/require" + + "github.com/hashicorp/nomad/nomad/structs" ) type testFn func() (bool, error) @@ -228,6 +229,7 @@ func WaitForVotingMembers(t testing.TB, rpc rpcFn, nPeers int) { // RegisterJobWithToken registers a job and uses the job's Region and Namespace. func RegisterJobWithToken(t testing.TB, rpc rpcFn, job *structs.Job, token string) { + t.Helper() WaitForResult(func() (bool, error) { args := &structs.JobRegisterRequest{} args.Job = job