From 27c03c8d53afabc95e13854882ed7b28122dd711 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 13 Feb 2019 08:26:23 -0800 Subject: [PATCH] client: artifact errors are retry-able 0.9.0beta2 contains a regression where artifact download errors would not cause a task restart and instead immediately fail the task. This restores the pre-0.9 behavior of retrying all artifact errors and adds missing tests. --- .../allocrunner/taskrunner/artifact_hook.go | 5 +- .../taskrunner/artifact_hook_test.go | 57 +++++++++++++++++++ client/allocrunner/taskrunner/errors.go | 10 ++-- client/allocrunner/taskrunner/errors_test.go | 52 +++++++++++++++++ .../taskrunner/task_runner_hooks.go | 6 +- .../taskrunner/task_runner_test.go | 44 ++++++++++++++ 6 files changed, 164 insertions(+), 10 deletions(-) create mode 100644 client/allocrunner/taskrunner/artifact_hook_test.go create mode 100644 client/allocrunner/taskrunner/errors_test.go diff --git a/client/allocrunner/taskrunner/artifact_hook.go b/client/allocrunner/taskrunner/artifact_hook.go index 7bed440abe1..fd4efc71873 100644 --- a/client/allocrunner/taskrunner/artifact_hook.go +++ b/client/allocrunner/taskrunner/artifact_hook.go @@ -42,7 +42,10 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar for _, artifact := range req.Task.Artifacts { //XXX add ctx to GetArtifact to allow cancelling long downloads if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir.Dir); err != nil { - wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err) + wrapped := structs.NewRecoverableError( + fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err), + true, + ) herr := NewHookError(wrapped, structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped)) return herr diff --git a/client/allocrunner/taskrunner/artifact_hook_test.go b/client/allocrunner/taskrunner/artifact_hook_test.go new file mode 100644 index 00000000000..a9babf88d76 --- /dev/null +++ b/client/allocrunner/taskrunner/artifact_hook_test.go @@ -0,0 +1,57 @@ +package taskrunner + +import ( + "context" + "os" + "testing" + + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/taskenv" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +// Statically assert the artifact hook implements the expected interface +var _ interfaces.TaskPrestartHook = (*artifactHook)(nil) + +type mockEmitter struct { + events []*structs.TaskEvent +} + +func (m *mockEmitter) EmitEvent(ev *structs.TaskEvent) { + m.events = append(m.events, ev) +} + +// TestTaskRunner_ArtifactHook_Recoverable asserts that failures to download +// artifacts are a recoverable error. +func TestTaskRunner_ArtifactHook_Recoverable(t *testing.T) { + t.Parallel() + + me := &mockEmitter{} + artifactHook := newArtifactHook(me, testlog.HCLogger(t)) + + req := &interfaces.TaskPrestartRequest{ + TaskEnv: taskenv.NewEmptyTaskEnv(), + TaskDir: &allocdir.TaskDir{Dir: os.TempDir()}, + Task: &structs.Task{ + Artifacts: []*structs.TaskArtifact{ + { + GetterSource: "http://127.0.0.1:0", + GetterMode: structs.GetterModeAny, + }, + }, + }, + } + + resp := interfaces.TaskPrestartResponse{} + + err := artifactHook.Prestart(context.Background(), req, &resp) + + require.False(t, resp.Done) + require.NotNil(t, err) + require.True(t, structs.IsRecoverable(err)) + require.Len(t, me.events, 1) + require.Equal(t, structs.TaskDownloadingArtifacts, me.events[0].Type) +} diff --git a/client/allocrunner/taskrunner/errors.go b/client/allocrunner/taskrunner/errors.go index 4a1f6af2a87..a3503757cd6 100644 --- a/client/allocrunner/taskrunner/errors.go +++ b/client/allocrunner/taskrunner/errors.go @@ -14,10 +14,7 @@ var ( ErrTaskNotRunning = errors.New(errTaskNotRunning) ) -// NewHookError returns an implementation of a HookError with an underlying err -// and a pre-formatted task event. -// If the taskEvent is nil, then we won't attempt to generate one during error -// handling. +// NewHookError contains an underlying err and a pre-formatted task event. func NewHookError(err error, taskEvent *structs.TaskEvent) error { return &hookError{ err: err, @@ -33,3 +30,8 @@ type hookError struct { func (h *hookError) Error() string { return h.err.Error() } + +// Recoverable is true if the underlying error is recoverable. +func (h *hookError) IsRecoverable() bool { + return structs.IsRecoverable(h.err) +} diff --git a/client/allocrunner/taskrunner/errors_test.go b/client/allocrunner/taskrunner/errors_test.go new file mode 100644 index 00000000000..9b32e9cdf73 --- /dev/null +++ b/client/allocrunner/taskrunner/errors_test.go @@ -0,0 +1,52 @@ +package taskrunner + +import ( + "errors" + "testing" + + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +// Statically assert error implements the expected interfaces +var _ structs.Recoverable = (*hookError)(nil) + +// TestHookError_Recoverable asserts that a NewHookError is recoverable if +// passed a recoverable error. +func TestHookError_Recoverable(t *testing.T) { + t.Parallel() + + // Create root error + root := errors.New("test error") + + // Make it recoverable + recov := structs.NewRecoverableError(root, true) + + // Create a fake task event + ev := structs.NewTaskEvent("test event") + + herr := NewHookError(recov, ev) + + require.Equal(t, ev, herr.(*hookError).taskEvent) + require.True(t, structs.IsRecoverable(herr)) + require.Equal(t, root.Error(), herr.Error()) + require.Equal(t, recov.Error(), herr.Error()) +} + +// TestHookError_Unrecoverable asserts that a NewHookError is not recoverable +// unless it is passed a recoverable error. +func TestHookError_Unrecoverable(t *testing.T) { + t.Parallel() + + // Create error + err := errors.New("test error") + + // Create a fake task event + ev := structs.NewTaskEvent("test event") + + herr := NewHookError(err, ev) + + require.Equal(t, ev, herr.(*hookError).taskEvent) + require.False(t, structs.IsRecoverable(herr)) + require.Equal(t, err.Error(), herr.Error()) +} diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index a41e76b7e13..c1e022f1b36 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -113,11 +113,7 @@ func (tr *TaskRunner) emitHookError(err error, hookName string) { taskEvent = structs.NewTaskEvent(structs.TaskHookFailed).SetMessage(message) } - // The TaskEvent returned by a HookError may be nil if the hook chooses to opt - // out of sending a task event. - if taskEvent != nil { - tr.EmitEvent(taskEvent) - } + tr.EmitEvent(taskEvent) } // prestart is used to run the runners prestart hooks. diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index a67e22eb733..e2a18cc811d 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -953,6 +953,50 @@ func TestTaskRunner_Download_List(t *testing.T) { require.NoErrorf(t, err, "%v not downloaded", f2) } +// TestTaskRunner_Download_Retries asserts that failed artifact downloads are +// retried according to the task's restart policy. +func TestTaskRunner_Download_Retries(t *testing.T) { + t.Parallel() + + // Create an allocation that has a task with bad artifacts. + alloc := mock.BatchAlloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + artifact := structs.TaskArtifact{ + GetterSource: "http://127.0.0.1:0/foo/bar/baz", + } + task.Artifacts = []*structs.TaskArtifact{&artifact} + + // Make the restart policy retry once + alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{ + Attempts: 1, + Interval: 10 * time.Minute, + Delay: 1 * time.Second, + Mode: structs.RestartPolicyModeFail, + } + + tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name) + defer cleanup() + + select { + case <-tr.WaitCh(): + case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): + require.Fail(t, "timed out waiting for task to exit") + } + + state := tr.TaskState() + require.Equal(t, structs.TaskStateDead, state.State) + require.True(t, state.Failed) + require.Len(t, state.Events, 8, pretty.Sprint(state.Events)) + require.Equal(t, structs.TaskReceived, state.Events[0].Type) + require.Equal(t, structs.TaskSetup, state.Events[1].Type) + require.Equal(t, structs.TaskDownloadingArtifacts, state.Events[2].Type) + require.Equal(t, structs.TaskArtifactDownloadFailed, state.Events[3].Type) + require.Equal(t, structs.TaskRestarting, state.Events[4].Type) + require.Equal(t, structs.TaskDownloadingArtifacts, state.Events[5].Type) + require.Equal(t, structs.TaskArtifactDownloadFailed, state.Events[6].Type) + require.Equal(t, structs.TaskNotRestarting, state.Events[7].Type) +} + // testWaitForTaskToStart waits for the task to be running or fails the test func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { testutil.WaitForResult(func() (bool, error) {