Skip to content

Commit

Permalink
Merge pull request #5322 from hashicorp/b-artifact-retries
Browse files Browse the repository at this point in the history
Fix regression by restarting on artifact download errors
  • Loading branch information
schmichael authored Feb 21, 2019
2 parents cc328c7 + 1acd4ca commit 234f644
Show file tree
Hide file tree
Showing 9 changed files with 421 additions and 91 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ IMPROVEMENTS:
* client: Extend timeout to 60 seconds for Windows CPU fingerprinting [[GH-4441](https://github.com/hashicorp/nomad/pull/4441)]
* client: Refactor client to support plugins and improve state handling [[GH-4792](https://github.com/hashicorp/nomad/pull/4792)]
* client: Updated consul-template library to pick up recent fixes and improvements[[GH-4885](https://github.com/hashicorp/nomad/pull/4885)]
* client: When retrying a failed artifact, do not download any successfully downloaded artifacts again [[GH-5322](https://github.com/hashicorp/nomad/issues/5322)]
* client: Added service metadata tag that enables the Consul UI to show a Nomad icon for services registered by Nomad [[GH-4889](https://github.com/hashicorp/nomad/issues/4889)]
* driver/docker: Support logs when using Docker for Mac [[GH-4758](https://github.com/hashicorp/nomad/issues/4758)]
* driver/docker: Added support for specifying `storage_opt` in the Docker driver [[GH-4908](https://github.com/hashicorp/nomad/pull/4908)]
Expand Down
21 changes: 20 additions & 1 deletion client/allocrunner/taskrunner/artifact_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,35 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar
return nil
}

// Initialize HookData to store download progress
resp.HookData = make(map[string]string, len(req.Task.Artifacts))

h.eventEmitter.EmitEvent(structs.NewTaskEvent(structs.TaskDownloadingArtifacts))

for _, artifact := range req.Task.Artifacts {
aid := artifact.Hash()
if req.HookData[aid] != "" {
h.logger.Trace("skipping already downloaded artifact", "artifact", artifact.GetterSource)
resp.HookData[aid] = req.HookData[aid]
continue
}

h.logger.Debug("downloading artifact", "artifact", artifact.GetterSource)
//XXX add ctx to GetArtifact to allow cancelling long downloads
if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir.Dir); err != nil {
wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err)
wrapped := structs.NewRecoverableError(
fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err),
true,
)
herr := NewHookError(wrapped, structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped))

return herr
}

// Mark artifact as downloaded to avoid re-downloading due to
// retries caused by subsequent artifacts failing. Any
// non-empty value works.
resp.HookData[aid] = "1"
}

resp.Done = true
Expand Down
159 changes: 159 additions & 0 deletions client/allocrunner/taskrunner/artifact_hook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package taskrunner

import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"sort"
"testing"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

// Statically assert the artifact hook implements the expected interface
var _ interfaces.TaskPrestartHook = (*artifactHook)(nil)

type mockEmitter struct {
events []*structs.TaskEvent
}

func (m *mockEmitter) EmitEvent(ev *structs.TaskEvent) {
m.events = append(m.events, ev)
}

// TestTaskRunner_ArtifactHook_Recoverable asserts that failures to download
// artifacts are a recoverable error.
func TestTaskRunner_ArtifactHook_Recoverable(t *testing.T) {
t.Parallel()

me := &mockEmitter{}
artifactHook := newArtifactHook(me, testlog.HCLogger(t))

req := &interfaces.TaskPrestartRequest{
TaskEnv: taskenv.NewEmptyTaskEnv(),
TaskDir: &allocdir.TaskDir{Dir: os.TempDir()},
Task: &structs.Task{
Artifacts: []*structs.TaskArtifact{
{
GetterSource: "http://127.0.0.1:0",
GetterMode: structs.GetterModeAny,
},
},
},
}

resp := interfaces.TaskPrestartResponse{}

err := artifactHook.Prestart(context.Background(), req, &resp)

require.False(t, resp.Done)
require.NotNil(t, err)
require.True(t, structs.IsRecoverable(err))
require.Len(t, me.events, 1)
require.Equal(t, structs.TaskDownloadingArtifacts, me.events[0].Type)
}

// TestTaskRunnerArtifactHook_PartialDone asserts that the artifact hook skips
// already downloaded artifacts when subsequent artifacts fail and cause a
// restart.
func TestTaskRunner_ArtifactHook_PartialDone(t *testing.T) {
t.Parallel()

me := &mockEmitter{}
artifactHook := newArtifactHook(me, testlog.HCLogger(t))

// Create a source directory with 1 of the 2 artifacts
srcdir, err := ioutil.TempDir("", "nomadtest-src")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(srcdir))
}()

// Only create one of the 2 artifacts to cause an error on first run.
file1 := filepath.Join(srcdir, "foo.txt")
require.NoError(t, ioutil.WriteFile(file1, []byte{'1'}, 0644))

// Test server to serve the artifacts
ts := httptest.NewServer(http.FileServer(http.Dir(srcdir)))
defer ts.Close()

// Create the target directory.
destdir, err := ioutil.TempDir("", "nomadtest-dest")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(destdir))
}()

req := &interfaces.TaskPrestartRequest{
TaskEnv: taskenv.NewEmptyTaskEnv(),
TaskDir: &allocdir.TaskDir{Dir: destdir},
Task: &structs.Task{
Artifacts: []*structs.TaskArtifact{
{
GetterSource: ts.URL + "/foo.txt",
GetterMode: structs.GetterModeAny,
},
{
GetterSource: ts.URL + "/bar.txt",
GetterMode: structs.GetterModeAny,
},
},
},
}

resp := interfaces.TaskPrestartResponse{}

// On first run file1 (foo) should download but file2 (bar) should
// fail.
err = artifactHook.Prestart(context.Background(), req, &resp)

require.NotNil(t, err)
require.True(t, structs.IsRecoverable(err))
require.Len(t, resp.HookData, 1)
require.False(t, resp.Done)
require.Len(t, me.events, 1)
require.Equal(t, structs.TaskDownloadingArtifacts, me.events[0].Type)

// Remove file1 from the server so it errors if its downloaded again.
require.NoError(t, os.Remove(file1))

// Write file2 so artifacts can download successfully
file2 := filepath.Join(srcdir, "bar.txt")
require.NoError(t, ioutil.WriteFile(file2, []byte{'1'}, 0644))

// Mock TaskRunner by copying HookData from resp to req and reset resp.
req.HookData = resp.HookData

resp = interfaces.TaskPrestartResponse{}

// Retry the download and assert it succeeds
err = artifactHook.Prestart(context.Background(), req, &resp)

require.NoError(t, err)
require.True(t, resp.Done)
require.Len(t, resp.HookData, 2)

// Assert both files downloaded properly
files, err := filepath.Glob(filepath.Join(destdir, "*.txt"))
require.NoError(t, err)
sort.Strings(files)
require.Contains(t, files[0], "bar.txt")
require.Contains(t, files[1], "foo.txt")

// Stop the test server entirely and assert that re-running works
ts.Close()
req.HookData = resp.HookData
resp = interfaces.TaskPrestartResponse{}
err = artifactHook.Prestart(context.Background(), req, &resp)
require.NoError(t, err)
require.True(t, resp.Done)
require.Len(t, resp.HookData, 2)
}
10 changes: 6 additions & 4 deletions client/allocrunner/taskrunner/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ var (
ErrTaskNotRunning = errors.New(errTaskNotRunning)
)

// NewHookError returns an implementation of a HookError with an underlying err
// and a pre-formatted task event.
// If the taskEvent is nil, then we won't attempt to generate one during error
// handling.
// NewHookError contains an underlying err and a pre-formatted task event.
func NewHookError(err error, taskEvent *structs.TaskEvent) error {
return &hookError{
err: err,
Expand All @@ -33,3 +30,8 @@ type hookError struct {
func (h *hookError) Error() string {
return h.err.Error()
}

// Recoverable is true if the underlying error is recoverable.
func (h *hookError) IsRecoverable() bool {
return structs.IsRecoverable(h.err)
}
52 changes: 52 additions & 0 deletions client/allocrunner/taskrunner/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package taskrunner

import (
"errors"
"testing"

"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

// Statically assert error implements the expected interfaces
var _ structs.Recoverable = (*hookError)(nil)

// TestHookError_Recoverable asserts that a NewHookError is recoverable if
// passed a recoverable error.
func TestHookError_Recoverable(t *testing.T) {
t.Parallel()

// Create root error
root := errors.New("test error")

// Make it recoverable
recov := structs.NewRecoverableError(root, true)

// Create a fake task event
ev := structs.NewTaskEvent("test event")

herr := NewHookError(recov, ev)

require.Equal(t, ev, herr.(*hookError).taskEvent)
require.True(t, structs.IsRecoverable(herr))
require.Equal(t, root.Error(), herr.Error())
require.Equal(t, recov.Error(), herr.Error())
}

// TestHookError_Unrecoverable asserts that a NewHookError is not recoverable
// unless it is passed a recoverable error.
func TestHookError_Unrecoverable(t *testing.T) {
t.Parallel()

// Create error
err := errors.New("test error")

// Create a fake task event
ev := structs.NewTaskEvent("test event")

herr := NewHookError(err, ev)

require.Equal(t, ev, herr.(*hookError).taskEvent)
require.False(t, structs.IsRecoverable(herr))
require.Equal(t, err.Error(), herr.Error())
}
6 changes: 1 addition & 5 deletions client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ func (tr *TaskRunner) emitHookError(err error, hookName string) {
taskEvent = structs.NewTaskEvent(structs.TaskHookFailed).SetMessage(message)
}

// The TaskEvent returned by a HookError may be nil if the hook chooses to opt
// out of sending a task event.
if taskEvent != nil {
tr.EmitEvent(taskEvent)
}
tr.EmitEvent(taskEvent)
}

// prestart is used to run the runners prestart hooks.
Expand Down
Loading

0 comments on commit 234f644

Please sign in to comment.