Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix regression by restarting on artifact download errors #5322

Merged
merged 3 commits into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ IMPROVEMENTS:
* client: Extend timeout to 60 seconds for Windows CPU fingerprinting [[GH-4441](https://github.com/hashicorp/nomad/pull/4441)]
* client: Refactor client to support plugins and improve state handling [[GH-4792](https://github.com/hashicorp/nomad/pull/4792)]
* client: Updated consul-template library to pick up recent fixes and improvements[[GH-4885](https://github.com/hashicorp/nomad/pull/4885)]
* client: When retrying a failed artifact, do not download any successfully downloaded artifacts again [[GH-5322](https://github.com/hashicorp/nomad/issues/5322)]
* client: Added service metadata tag that enables the Consul UI to show a Nomad icon for services registered by Nomad [[GH-4889](https://github.com/hashicorp/nomad/issues/4889)]
* driver/docker: Support logs when using Docker for Mac [[GH-4758](https://github.com/hashicorp/nomad/issues/4758)]
* driver/docker: Added support for specifying `storage_opt` in the Docker driver [[GH-4908](https://github.com/hashicorp/nomad/pull/4908)]
Expand Down
21 changes: 20 additions & 1 deletion client/allocrunner/taskrunner/artifact_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,35 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar
return nil
}

// Initialize HookData to store download progress
resp.HookData = make(map[string]string, len(req.Task.Artifacts))

h.eventEmitter.EmitEvent(structs.NewTaskEvent(structs.TaskDownloadingArtifacts))

for _, artifact := range req.Task.Artifacts {
aid := artifact.Hash()
if req.HookData[aid] != "" {
h.logger.Trace("skipping already downloaded artifact", "artifact", artifact.GetterSource)
resp.HookData[aid] = req.HookData[aid]
continue
}

h.logger.Debug("downloading artifact", "artifact", artifact.GetterSource)
//XXX add ctx to GetArtifact to allow cancelling long downloads
if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir.Dir); err != nil {
wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err)
wrapped := structs.NewRecoverableError(
fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err),
schmichael marked this conversation as resolved.
Show resolved Hide resolved
true,
)
herr := NewHookError(wrapped, structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped))

return herr
}

// Mark artifact as downloaded to avoid re-downloading due to
// retries caused by subsequent artifacts failing. Any
// non-empty value works.
resp.HookData[aid] = "1"
}

resp.Done = true
Expand Down
159 changes: 159 additions & 0 deletions client/allocrunner/taskrunner/artifact_hook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package taskrunner

import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"sort"
"testing"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

// Statically assert the artifact hook implements the expected interface
var _ interfaces.TaskPrestartHook = (*artifactHook)(nil)

type mockEmitter struct {
events []*structs.TaskEvent
}

func (m *mockEmitter) EmitEvent(ev *structs.TaskEvent) {
m.events = append(m.events, ev)
}

// TestTaskRunner_ArtifactHook_Recoverable asserts that failures to download
// artifacts are a recoverable error.
func TestTaskRunner_ArtifactHook_Recoverable(t *testing.T) {
t.Parallel()

me := &mockEmitter{}
artifactHook := newArtifactHook(me, testlog.HCLogger(t))

req := &interfaces.TaskPrestartRequest{
TaskEnv: taskenv.NewEmptyTaskEnv(),
TaskDir: &allocdir.TaskDir{Dir: os.TempDir()},
Task: &structs.Task{
Artifacts: []*structs.TaskArtifact{
{
GetterSource: "http://127.0.0.1:0",
GetterMode: structs.GetterModeAny,
},
},
},
}

resp := interfaces.TaskPrestartResponse{}

err := artifactHook.Prestart(context.Background(), req, &resp)

require.False(t, resp.Done)
require.NotNil(t, err)
require.True(t, structs.IsRecoverable(err))
require.Len(t, me.events, 1)
require.Equal(t, structs.TaskDownloadingArtifacts, me.events[0].Type)
}

// TestTaskRunnerArtifactHook_PartialDone asserts that the artifact hook skips
// already downloaded artifacts when subsequent artifacts fail and cause a
// restart.
func TestTaskRunner_ArtifactHook_PartialDone(t *testing.T) {
t.Parallel()

me := &mockEmitter{}
artifactHook := newArtifactHook(me, testlog.HCLogger(t))

// Create a source directory with 1 of the 2 artifacts
srcdir, err := ioutil.TempDir("", "nomadtest-src")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(srcdir))
}()

// Only create one of the 2 artifacts to cause an error on first run.
file1 := filepath.Join(srcdir, "foo.txt")
require.NoError(t, ioutil.WriteFile(file1, []byte{'1'}, 0644))

// Test server to serve the artifacts
ts := httptest.NewServer(http.FileServer(http.Dir(srcdir)))
defer ts.Close()

// Create the target directory.
destdir, err := ioutil.TempDir("", "nomadtest-dest")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(destdir))
}()

req := &interfaces.TaskPrestartRequest{
TaskEnv: taskenv.NewEmptyTaskEnv(),
TaskDir: &allocdir.TaskDir{Dir: destdir},
Task: &structs.Task{
Artifacts: []*structs.TaskArtifact{
{
GetterSource: ts.URL + "/foo.txt",
GetterMode: structs.GetterModeAny,
},
{
GetterSource: ts.URL + "/bar.txt",
GetterMode: structs.GetterModeAny,
},
},
},
}

resp := interfaces.TaskPrestartResponse{}

// On first run file1 (foo) should download but file2 (bar) should
// fail.
err = artifactHook.Prestart(context.Background(), req, &resp)

require.NotNil(t, err)
require.True(t, structs.IsRecoverable(err))
require.Len(t, resp.HookData, 1)
require.False(t, resp.Done)
require.Len(t, me.events, 1)
require.Equal(t, structs.TaskDownloadingArtifacts, me.events[0].Type)

// Remove file1 from the server so it errors if its downloaded again.
require.NoError(t, os.Remove(file1))

// Write file2 so artifacts can download successfully
file2 := filepath.Join(srcdir, "bar.txt")
require.NoError(t, ioutil.WriteFile(file2, []byte{'1'}, 0644))

// Mock TaskRunner by copying HookData from resp to req and reset resp.
req.HookData = resp.HookData

resp = interfaces.TaskPrestartResponse{}

// Retry the download and assert it succeeds
err = artifactHook.Prestart(context.Background(), req, &resp)

require.NoError(t, err)
require.True(t, resp.Done)
require.Len(t, resp.HookData, 2)

// Assert both files downloaded properly
files, err := filepath.Glob(filepath.Join(destdir, "*.txt"))
require.NoError(t, err)
sort.Strings(files)
require.Contains(t, files[0], "bar.txt")
require.Contains(t, files[1], "foo.txt")

// Stop the test server entirely and assert that re-running works
ts.Close()
req.HookData = resp.HookData
resp = interfaces.TaskPrestartResponse{}
err = artifactHook.Prestart(context.Background(), req, &resp)
require.NoError(t, err)
require.True(t, resp.Done)
require.Len(t, resp.HookData, 2)
}
10 changes: 6 additions & 4 deletions client/allocrunner/taskrunner/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ var (
ErrTaskNotRunning = errors.New(errTaskNotRunning)
)

// NewHookError returns an implementation of a HookError with an underlying err
// and a pre-formatted task event.
// If the taskEvent is nil, then we won't attempt to generate one during error
// handling.
// NewHookError contains an underlying err and a pre-formatted task event.
func NewHookError(err error, taskEvent *structs.TaskEvent) error {
return &hookError{
err: err,
Expand All @@ -33,3 +30,8 @@ type hookError struct {
func (h *hookError) Error() string {
return h.err.Error()
}

// Recoverable is true if the underlying error is recoverable.
func (h *hookError) IsRecoverable() bool {
return structs.IsRecoverable(h.err)
}
52 changes: 52 additions & 0 deletions client/allocrunner/taskrunner/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package taskrunner

import (
"errors"
"testing"

"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

// Statically assert error implements the expected interfaces
var _ structs.Recoverable = (*hookError)(nil)

// TestHookError_Recoverable asserts that a NewHookError is recoverable if
// passed a recoverable error.
func TestHookError_Recoverable(t *testing.T) {
t.Parallel()

// Create root error
root := errors.New("test error")

// Make it recoverable
recov := structs.NewRecoverableError(root, true)

// Create a fake task event
ev := structs.NewTaskEvent("test event")

herr := NewHookError(recov, ev)

require.Equal(t, ev, herr.(*hookError).taskEvent)
require.True(t, structs.IsRecoverable(herr))
require.Equal(t, root.Error(), herr.Error())
require.Equal(t, recov.Error(), herr.Error())
}

// TestHookError_Unrecoverable asserts that a NewHookError is not recoverable
// unless it is passed a recoverable error.
func TestHookError_Unrecoverable(t *testing.T) {
t.Parallel()

// Create error
err := errors.New("test error")

// Create a fake task event
ev := structs.NewTaskEvent("test event")

herr := NewHookError(err, ev)

require.Equal(t, ev, herr.(*hookError).taskEvent)
require.False(t, structs.IsRecoverable(herr))
require.Equal(t, err.Error(), herr.Error())
}
6 changes: 1 addition & 5 deletions client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ func (tr *TaskRunner) emitHookError(err error, hookName string) {
taskEvent = structs.NewTaskEvent(structs.TaskHookFailed).SetMessage(message)
}

// The TaskEvent returned by a HookError may be nil if the hook chooses to opt
// out of sending a task event.
if taskEvent != nil {
tr.EmitEvent(taskEvent)
}
tr.EmitEvent(taskEvent)
}

// prestart is used to run the runners prestart hooks.
Expand Down
Loading