From 83979252cdef94fa8bd19bf0a63dc4f9c8287483 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 13 Feb 2019 08:25:25 -0800 Subject: [PATCH 1/3] tests: add new task runner test helper Adds a new helper and removes a duplicated test. --- .../taskrunner/task_runner_test.go | 104 ++++-------------- 1 file changed, 23 insertions(+), 81 deletions(-) diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index e7ec3cb4de8..a9928296f2d 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -106,6 +106,22 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri return conf, trCleanup } +// runTestTaskRunner runs a TaskRunner and returns its configuration as well as +// a cleanup function that ensures the runner is stopped and cleaned up. Tests +// which need to change the Config *must* use testTaskRunnerConfig instead. +func runTestTaskRunner(t *testing.T, alloc *structs.Allocation, taskName string) (*TaskRunner, *Config, func()) { + config, cleanup := testTaskRunnerConfig(t, alloc, taskName) + + tr, err := NewTaskRunner(config) + require.NoError(t, err) + go tr.Run() + + return tr, config, func() { + tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + cleanup() + } +} + // TestTaskRunner_Restore asserts restoring a running task does not rerun the // task. func TestTaskRunner_Restore_Running(t *testing.T) { @@ -170,7 +186,6 @@ func TestTaskRunner_TaskEnv(t *testing.T) { "common_user": "somebody", } task := alloc.Job.TaskGroups[0].Tasks[0] - task.Driver = "mock_driver" task.Meta = map[string]string{ "foo": "bar", } @@ -181,15 +196,9 @@ func TestTaskRunner_TaskEnv(t *testing.T) { "stdout_string": `${node.region} ${NOMAD_META_foo} ${NOMAD_META_common_user}`, } - conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - // Run the first TaskRunner - tr, err := NewTaskRunner(conf) - require.NoError(err) - go tr.Run() - defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) - // Wait for task to complete select { case <-tr.WaitCh(): @@ -208,48 +217,6 @@ func TestTaskRunner_TaskEnv(t *testing.T) { assert.Equal(t, "global bar somebody", mockCfg.StdoutString) } -func TestTaskRunner_TaskConfig(t *testing.T) { - t.Parallel() - require := require.New(t) - - alloc := mock.BatchAlloc() - task := alloc.Job.TaskGroups[0].Tasks[0] - task.Driver = "mock_driver" - - //// Use interpolation from both node attributes and meta vars - //task.Config = map[string]interface{}{ - // "run_for": "1ms", - //} - - conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) - defer cleanup() - - // Run the first TaskRunner - tr, err := NewTaskRunner(conf) - require.NoError(err) - go tr.Run() - defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) - - // Wait for task to complete - select { - case <-tr.WaitCh(): - case <-time.After(3 * time.Second): - } - - // Get the mock driver plugin - driverPlugin, err := conf.DriverManager.Dispense(mockdriver.PluginID.Name) - require.NoError(err) - mockDriver := driverPlugin.(*mockdriver.Driver) - - // Assert its config has been properly interpolated - driverCfg, mockCfg := mockDriver.GetTaskConfig() - require.NotNil(driverCfg) - require.NotNil(mockCfg) - assert.Equal(t, alloc.Job.Name, driverCfg.JobName) - assert.Equal(t, alloc.TaskGroup, driverCfg.TaskGroupName) - assert.Equal(t, alloc.Job.TaskGroups[0].Tasks[0].Name, driverCfg.Name) -} - // Test that devices get sent to the driver func TestTaskRunner_DevicePropogation(t *testing.T) { t.Parallel() @@ -473,16 +440,11 @@ func TestTaskRunner_ShutdownDelay(t *testing.T) { // No shutdown escape hatch for this delay, so don't set it too high task.ShutdownDelay = 1000 * time.Duration(testutil.TestMultiplier()) * time.Millisecond - conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() mockConsul := conf.Consul.(*consul.MockConsulServiceClient) - tr, err := NewTaskRunner(conf) - require.NoError(t, err) - go tr.Run() - defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) - // Wait for the task to start testWaitForTaskToStart(t, tr) @@ -568,14 +530,9 @@ func TestTaskRunner_Dispatch_Payload(t *testing.T) { compressed := snappy.Encode(nil, expected) alloc.Job.Payload = compressed - conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - tr, err := NewTaskRunner(conf) - require.NoError(t, err) - go tr.Run() - defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) - // Wait for it to finish testutil.WaitForResult(func() (bool, error) { ts := tr.TaskState() @@ -610,14 +567,9 @@ func TestTaskRunner_SignalFailure(t *testing.T) { "signal_error": errMsg, } - conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - tr, err := NewTaskRunner(conf) - require.NoError(t, err) - go tr.Run() - defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) - testWaitForTaskToStart(t, tr) require.EqualError(t, tr.Signal(&structs.TaskEvent{}, "SIGINT"), errMsg) @@ -635,14 +587,9 @@ func TestTaskRunner_RestartTask(t *testing.T) { "run_for": "10m", } - conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - tr, err := NewTaskRunner(conf) - require.NoError(t, err) - go tr.Run() - defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) - testWaitForTaskToStart(t, tr) // Restart task. Send a RestartSignal event like check watcher. Restart @@ -977,14 +924,9 @@ func TestTaskRunner_Download_List(t *testing.T) { } task.Artifacts = []*structs.TaskArtifact{&artifact1, &artifact2} - conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - tr, err := NewTaskRunner(conf) - require.NoError(t, err) - defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) - go tr.Run() - // Wait for task to run and exit select { case <-tr.WaitCh(): @@ -1004,7 +946,7 @@ func TestTaskRunner_Download_List(t *testing.T) { assert.Equal(t, structs.TaskTerminated, state.Events[4].Type) // Check that both files exist. - _, err = os.Stat(filepath.Join(conf.TaskDir.Dir, f1)) + _, err := os.Stat(filepath.Join(conf.TaskDir.Dir, f1)) require.NoErrorf(t, err, "%v not downloaded", f1) _, err = os.Stat(filepath.Join(conf.TaskDir.Dir, f2)) From c51a54cfeeb059e9dca9fae7038cbe482ccc80b3 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 13 Feb 2019 08:26:23 -0800 Subject: [PATCH 2/3] client: artifact errors are retry-able 0.9.0beta2 contains a regression where artifact download errors would not cause a task restart and instead immediately fail the task. This restores the pre-0.9 behavior of retrying all artifact errors and adds missing tests. --- .../allocrunner/taskrunner/artifact_hook.go | 5 +- .../taskrunner/artifact_hook_test.go | 57 +++++++++++++++++++ client/allocrunner/taskrunner/errors.go | 10 ++-- client/allocrunner/taskrunner/errors_test.go | 52 +++++++++++++++++ .../taskrunner/task_runner_hooks.go | 6 +- .../taskrunner/task_runner_test.go | 44 ++++++++++++++ 6 files changed, 164 insertions(+), 10 deletions(-) create mode 100644 client/allocrunner/taskrunner/artifact_hook_test.go create mode 100644 client/allocrunner/taskrunner/errors_test.go diff --git a/client/allocrunner/taskrunner/artifact_hook.go b/client/allocrunner/taskrunner/artifact_hook.go index 7bed440abe1..fd4efc71873 100644 --- a/client/allocrunner/taskrunner/artifact_hook.go +++ b/client/allocrunner/taskrunner/artifact_hook.go @@ -42,7 +42,10 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar for _, artifact := range req.Task.Artifacts { //XXX add ctx to GetArtifact to allow cancelling long downloads if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir.Dir); err != nil { - wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err) + wrapped := structs.NewRecoverableError( + fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err), + true, + ) herr := NewHookError(wrapped, structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped)) return herr diff --git a/client/allocrunner/taskrunner/artifact_hook_test.go b/client/allocrunner/taskrunner/artifact_hook_test.go new file mode 100644 index 00000000000..a9babf88d76 --- /dev/null +++ b/client/allocrunner/taskrunner/artifact_hook_test.go @@ -0,0 +1,57 @@ +package taskrunner + +import ( + "context" + "os" + "testing" + + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/taskenv" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +// Statically assert the artifact hook implements the expected interface +var _ interfaces.TaskPrestartHook = (*artifactHook)(nil) + +type mockEmitter struct { + events []*structs.TaskEvent +} + +func (m *mockEmitter) EmitEvent(ev *structs.TaskEvent) { + m.events = append(m.events, ev) +} + +// TestTaskRunner_ArtifactHook_Recoverable asserts that failures to download +// artifacts are a recoverable error. +func TestTaskRunner_ArtifactHook_Recoverable(t *testing.T) { + t.Parallel() + + me := &mockEmitter{} + artifactHook := newArtifactHook(me, testlog.HCLogger(t)) + + req := &interfaces.TaskPrestartRequest{ + TaskEnv: taskenv.NewEmptyTaskEnv(), + TaskDir: &allocdir.TaskDir{Dir: os.TempDir()}, + Task: &structs.Task{ + Artifacts: []*structs.TaskArtifact{ + { + GetterSource: "http://127.0.0.1:0", + GetterMode: structs.GetterModeAny, + }, + }, + }, + } + + resp := interfaces.TaskPrestartResponse{} + + err := artifactHook.Prestart(context.Background(), req, &resp) + + require.False(t, resp.Done) + require.NotNil(t, err) + require.True(t, structs.IsRecoverable(err)) + require.Len(t, me.events, 1) + require.Equal(t, structs.TaskDownloadingArtifacts, me.events[0].Type) +} diff --git a/client/allocrunner/taskrunner/errors.go b/client/allocrunner/taskrunner/errors.go index 4a1f6af2a87..a3503757cd6 100644 --- a/client/allocrunner/taskrunner/errors.go +++ b/client/allocrunner/taskrunner/errors.go @@ -14,10 +14,7 @@ var ( ErrTaskNotRunning = errors.New(errTaskNotRunning) ) -// NewHookError returns an implementation of a HookError with an underlying err -// and a pre-formatted task event. -// If the taskEvent is nil, then we won't attempt to generate one during error -// handling. +// NewHookError contains an underlying err and a pre-formatted task event. func NewHookError(err error, taskEvent *structs.TaskEvent) error { return &hookError{ err: err, @@ -33,3 +30,8 @@ type hookError struct { func (h *hookError) Error() string { return h.err.Error() } + +// Recoverable is true if the underlying error is recoverable. +func (h *hookError) IsRecoverable() bool { + return structs.IsRecoverable(h.err) +} diff --git a/client/allocrunner/taskrunner/errors_test.go b/client/allocrunner/taskrunner/errors_test.go new file mode 100644 index 00000000000..9b32e9cdf73 --- /dev/null +++ b/client/allocrunner/taskrunner/errors_test.go @@ -0,0 +1,52 @@ +package taskrunner + +import ( + "errors" + "testing" + + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +// Statically assert error implements the expected interfaces +var _ structs.Recoverable = (*hookError)(nil) + +// TestHookError_Recoverable asserts that a NewHookError is recoverable if +// passed a recoverable error. +func TestHookError_Recoverable(t *testing.T) { + t.Parallel() + + // Create root error + root := errors.New("test error") + + // Make it recoverable + recov := structs.NewRecoverableError(root, true) + + // Create a fake task event + ev := structs.NewTaskEvent("test event") + + herr := NewHookError(recov, ev) + + require.Equal(t, ev, herr.(*hookError).taskEvent) + require.True(t, structs.IsRecoverable(herr)) + require.Equal(t, root.Error(), herr.Error()) + require.Equal(t, recov.Error(), herr.Error()) +} + +// TestHookError_Unrecoverable asserts that a NewHookError is not recoverable +// unless it is passed a recoverable error. +func TestHookError_Unrecoverable(t *testing.T) { + t.Parallel() + + // Create error + err := errors.New("test error") + + // Create a fake task event + ev := structs.NewTaskEvent("test event") + + herr := NewHookError(err, ev) + + require.Equal(t, ev, herr.(*hookError).taskEvent) + require.False(t, structs.IsRecoverable(herr)) + require.Equal(t, err.Error(), herr.Error()) +} diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index a41e76b7e13..c1e022f1b36 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -113,11 +113,7 @@ func (tr *TaskRunner) emitHookError(err error, hookName string) { taskEvent = structs.NewTaskEvent(structs.TaskHookFailed).SetMessage(message) } - // The TaskEvent returned by a HookError may be nil if the hook chooses to opt - // out of sending a task event. - if taskEvent != nil { - tr.EmitEvent(taskEvent) - } + tr.EmitEvent(taskEvent) } // prestart is used to run the runners prestart hooks. diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index a9928296f2d..19c9b264d7a 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -953,6 +953,50 @@ func TestTaskRunner_Download_List(t *testing.T) { require.NoErrorf(t, err, "%v not downloaded", f2) } +// TestTaskRunner_Download_Retries asserts that failed artifact downloads are +// retried according to the task's restart policy. +func TestTaskRunner_Download_Retries(t *testing.T) { + t.Parallel() + + // Create an allocation that has a task with bad artifacts. + alloc := mock.BatchAlloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + artifact := structs.TaskArtifact{ + GetterSource: "http://127.0.0.1:0/foo/bar/baz", + } + task.Artifacts = []*structs.TaskArtifact{&artifact} + + // Make the restart policy retry once + alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{ + Attempts: 1, + Interval: 10 * time.Minute, + Delay: 1 * time.Second, + Mode: structs.RestartPolicyModeFail, + } + + tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name) + defer cleanup() + + select { + case <-tr.WaitCh(): + case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): + require.Fail(t, "timed out waiting for task to exit") + } + + state := tr.TaskState() + require.Equal(t, structs.TaskStateDead, state.State) + require.True(t, state.Failed) + require.Len(t, state.Events, 8, pretty.Sprint(state.Events)) + require.Equal(t, structs.TaskReceived, state.Events[0].Type) + require.Equal(t, structs.TaskSetup, state.Events[1].Type) + require.Equal(t, structs.TaskDownloadingArtifacts, state.Events[2].Type) + require.Equal(t, structs.TaskArtifactDownloadFailed, state.Events[3].Type) + require.Equal(t, structs.TaskRestarting, state.Events[4].Type) + require.Equal(t, structs.TaskDownloadingArtifacts, state.Events[5].Type) + require.Equal(t, structs.TaskArtifactDownloadFailed, state.Events[6].Type) + require.Equal(t, structs.TaskNotRestarting, state.Events[7].Type) +} + // testWaitForTaskToStart waits for the task to be running or fails the test func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { testutil.WaitForResult(func() (bool, error) { From 1acd4ca744895627f05511abe7edbdc4a8e2c03b Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 20 Feb 2019 08:41:51 -0800 Subject: [PATCH 3/3] client: don't redownload completed artifacts on retries Track the download status of each artifact independently so that if only one of many artifacts fails to download, completed artifacts aren't downloaded again. --- CHANGELOG.md | 1 + .../allocrunner/taskrunner/artifact_hook.go | 16 +++ .../taskrunner/artifact_hook_test.go | 102 ++++++++++++++++++ nomad/structs/structs.go | 27 +++++ nomad/structs/structs_test.go | 88 +++++++++++++++ 5 files changed, 234 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d9375a3da30..104dad6823e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,7 @@ IMPROVEMENTS: * client: Extend timeout to 60 seconds for Windows CPU fingerprinting [[GH-4441](https://github.com/hashicorp/nomad/pull/4441)] * client: Refactor client to support plugins and improve state handling [[GH-4792](https://github.com/hashicorp/nomad/pull/4792)] * client: Updated consul-template library to pick up recent fixes and improvements[[GH-4885](https://github.com/hashicorp/nomad/pull/4885)] + * client: When retrying a failed artifact, do not download any successfully downloaded artifacts again [[GH-5322](https://github.com/hashicorp/nomad/issues/5322)] * client: Added service metadata tag that enables the Consul UI to show a Nomad icon for services registered by Nomad [[GH-4889](https://github.com/hashicorp/nomad/issues/4889)] * driver/docker: Support logs when using Docker for Mac [[GH-4758](https://github.com/hashicorp/nomad/issues/4758)] * driver/docker: Added support for specifying `storage_opt` in the Docker driver [[GH-4908](https://github.com/hashicorp/nomad/pull/4908)] diff --git a/client/allocrunner/taskrunner/artifact_hook.go b/client/allocrunner/taskrunner/artifact_hook.go index fd4efc71873..f223f3b5ac7 100644 --- a/client/allocrunner/taskrunner/artifact_hook.go +++ b/client/allocrunner/taskrunner/artifact_hook.go @@ -37,9 +37,20 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar return nil } + // Initialize HookData to store download progress + resp.HookData = make(map[string]string, len(req.Task.Artifacts)) + h.eventEmitter.EmitEvent(structs.NewTaskEvent(structs.TaskDownloadingArtifacts)) for _, artifact := range req.Task.Artifacts { + aid := artifact.Hash() + if req.HookData[aid] != "" { + h.logger.Trace("skipping already downloaded artifact", "artifact", artifact.GetterSource) + resp.HookData[aid] = req.HookData[aid] + continue + } + + h.logger.Debug("downloading artifact", "artifact", artifact.GetterSource) //XXX add ctx to GetArtifact to allow cancelling long downloads if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir.Dir); err != nil { wrapped := structs.NewRecoverableError( @@ -50,6 +61,11 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar return herr } + + // Mark artifact as downloaded to avoid re-downloading due to + // retries caused by subsequent artifacts failing. Any + // non-empty value works. + resp.HookData[aid] = "1" } resp.Done = true diff --git a/client/allocrunner/taskrunner/artifact_hook_test.go b/client/allocrunner/taskrunner/artifact_hook_test.go index a9babf88d76..8e79684e95b 100644 --- a/client/allocrunner/taskrunner/artifact_hook_test.go +++ b/client/allocrunner/taskrunner/artifact_hook_test.go @@ -2,7 +2,12 @@ package taskrunner import ( "context" + "io/ioutil" + "net/http" + "net/http/httptest" "os" + "path/filepath" + "sort" "testing" "github.com/hashicorp/nomad/client/allocdir" @@ -55,3 +60,100 @@ func TestTaskRunner_ArtifactHook_Recoverable(t *testing.T) { require.Len(t, me.events, 1) require.Equal(t, structs.TaskDownloadingArtifacts, me.events[0].Type) } + +// TestTaskRunnerArtifactHook_PartialDone asserts that the artifact hook skips +// already downloaded artifacts when subsequent artifacts fail and cause a +// restart. +func TestTaskRunner_ArtifactHook_PartialDone(t *testing.T) { + t.Parallel() + + me := &mockEmitter{} + artifactHook := newArtifactHook(me, testlog.HCLogger(t)) + + // Create a source directory with 1 of the 2 artifacts + srcdir, err := ioutil.TempDir("", "nomadtest-src") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(srcdir)) + }() + + // Only create one of the 2 artifacts to cause an error on first run. + file1 := filepath.Join(srcdir, "foo.txt") + require.NoError(t, ioutil.WriteFile(file1, []byte{'1'}, 0644)) + + // Test server to serve the artifacts + ts := httptest.NewServer(http.FileServer(http.Dir(srcdir))) + defer ts.Close() + + // Create the target directory. + destdir, err := ioutil.TempDir("", "nomadtest-dest") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(destdir)) + }() + + req := &interfaces.TaskPrestartRequest{ + TaskEnv: taskenv.NewEmptyTaskEnv(), + TaskDir: &allocdir.TaskDir{Dir: destdir}, + Task: &structs.Task{ + Artifacts: []*structs.TaskArtifact{ + { + GetterSource: ts.URL + "/foo.txt", + GetterMode: structs.GetterModeAny, + }, + { + GetterSource: ts.URL + "/bar.txt", + GetterMode: structs.GetterModeAny, + }, + }, + }, + } + + resp := interfaces.TaskPrestartResponse{} + + // On first run file1 (foo) should download but file2 (bar) should + // fail. + err = artifactHook.Prestart(context.Background(), req, &resp) + + require.NotNil(t, err) + require.True(t, structs.IsRecoverable(err)) + require.Len(t, resp.HookData, 1) + require.False(t, resp.Done) + require.Len(t, me.events, 1) + require.Equal(t, structs.TaskDownloadingArtifacts, me.events[0].Type) + + // Remove file1 from the server so it errors if its downloaded again. + require.NoError(t, os.Remove(file1)) + + // Write file2 so artifacts can download successfully + file2 := filepath.Join(srcdir, "bar.txt") + require.NoError(t, ioutil.WriteFile(file2, []byte{'1'}, 0644)) + + // Mock TaskRunner by copying HookData from resp to req and reset resp. + req.HookData = resp.HookData + + resp = interfaces.TaskPrestartResponse{} + + // Retry the download and assert it succeeds + err = artifactHook.Prestart(context.Background(), req, &resp) + + require.NoError(t, err) + require.True(t, resp.Done) + require.Len(t, resp.HookData, 2) + + // Assert both files downloaded properly + files, err := filepath.Glob(filepath.Join(destdir, "*.txt")) + require.NoError(t, err) + sort.Strings(files) + require.Contains(t, files[0], "bar.txt") + require.Contains(t, files[1], "foo.txt") + + // Stop the test server entirely and assert that re-running works + ts.Close() + req.HookData = resp.HookData + resp = interfaces.TaskPrestartResponse{} + err = artifactHook.Prestart(context.Background(), req, &resp) + require.NoError(t, err) + require.True(t, resp.Done) + require.Len(t, resp.HookData, 2) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c7030f415a5..b698e68a048 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -8,6 +8,7 @@ import ( "crypto/sha256" "crypto/sha512" "encoding/base32" + "encoding/base64" "encoding/hex" "errors" "fmt" @@ -6248,6 +6249,32 @@ func (ta *TaskArtifact) GoString() string { return fmt.Sprintf("%+v", ta) } +// Hash creates a unique identifier for a TaskArtifact as the same GetterSource +// may be specified multiple times with different destinations. +func (ta *TaskArtifact) Hash() string { + hash, err := blake2b.New256(nil) + if err != nil { + panic(err) + } + + hash.Write([]byte(ta.GetterSource)) + + // Must iterate over keys in a consistent order + keys := make([]string, 0, len(ta.GetterOptions)) + for k := range ta.GetterOptions { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + hash.Write([]byte(k)) + hash.Write([]byte(ta.GetterOptions[k])) + } + + hash.Write([]byte(ta.GetterMode)) + hash.Write([]byte(ta.RelativeDest)) + return base64.RawStdEncoding.EncodeToString(hash.Sum(nil)) +} + // PathEscapesAllocDir returns if the given path escapes the allocation // directory. The prefix allows adding a prefix if the path will be joined, for // example a "task/local" prefix may be provided if the path will be joined diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 293f4f0cf77..90b33b19827 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/consul/api" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper/uuid" + "github.com/kr/pretty" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -2607,6 +2608,93 @@ func TestTaskArtifact_Validate_Dest(t *testing.T) { } } +// TestTaskArtifact_Hash asserts an artifact's hash changes when any of the +// fields change. +func TestTaskArtifact_Hash(t *testing.T) { + t.Parallel() + + cases := []TaskArtifact{ + {}, + { + GetterSource: "a", + }, + { + GetterSource: "b", + }, + { + GetterSource: "b", + GetterOptions: map[string]string{"c": "c"}, + }, + { + GetterSource: "b", + GetterOptions: map[string]string{ + "c": "c", + "d": "d", + }, + }, + { + GetterSource: "b", + GetterOptions: map[string]string{ + "c": "c", + "d": "e", + }, + }, + { + GetterSource: "b", + GetterOptions: map[string]string{ + "c": "c", + "d": "e", + }, + GetterMode: "f", + }, + { + GetterSource: "b", + GetterOptions: map[string]string{ + "c": "c", + "d": "e", + }, + GetterMode: "g", + }, + { + GetterSource: "b", + GetterOptions: map[string]string{ + "c": "c", + "d": "e", + }, + GetterMode: "g", + RelativeDest: "h", + }, + { + GetterSource: "b", + GetterOptions: map[string]string{ + "c": "c", + "d": "e", + }, + GetterMode: "g", + RelativeDest: "i", + }, + } + + // Map of hash to source + hashes := make(map[string]TaskArtifact, len(cases)) + for _, tc := range cases { + h := tc.Hash() + + // Hash should be deterministic + require.Equal(t, h, tc.Hash()) + + // Hash should be unique + if orig, ok := hashes[h]; ok { + require.Failf(t, "hashes match", "artifact 1: %s\n\n artifact 2: %s\n", + pretty.Sprint(tc), pretty.Sprint(orig), + ) + } + hashes[h] = tc + } + + require.Len(t, hashes, len(cases)) +} + func TestAllocation_ShouldMigrate(t *testing.T) { alloc := Allocation{ PreviousAllocation: "123",