diff --git a/CHANGELOG.md b/CHANGELOG.md index d9375a3da30..104dad6823e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,7 @@ IMPROVEMENTS: * client: Extend timeout to 60 seconds for Windows CPU fingerprinting [[GH-4441](https://github.com/hashicorp/nomad/pull/4441)] * client: Refactor client to support plugins and improve state handling [[GH-4792](https://github.com/hashicorp/nomad/pull/4792)] * client: Updated consul-template library to pick up recent fixes and improvements[[GH-4885](https://github.com/hashicorp/nomad/pull/4885)] + * client: When retrying a failed artifact, do not download any successfully downloaded artifacts again [[GH-5322](https://github.com/hashicorp/nomad/issues/5322)] * client: Added service metadata tag that enables the Consul UI to show a Nomad icon for services registered by Nomad [[GH-4889](https://github.com/hashicorp/nomad/issues/4889)] * driver/docker: Support logs when using Docker for Mac [[GH-4758](https://github.com/hashicorp/nomad/issues/4758)] * driver/docker: Added support for specifying `storage_opt` in the Docker driver [[GH-4908](https://github.com/hashicorp/nomad/pull/4908)] diff --git a/client/allocrunner/taskrunner/artifact_hook.go b/client/allocrunner/taskrunner/artifact_hook.go index fd4efc71873..f223f3b5ac7 100644 --- a/client/allocrunner/taskrunner/artifact_hook.go +++ b/client/allocrunner/taskrunner/artifact_hook.go @@ -37,9 +37,20 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar return nil } + // Initialize HookData to store download progress + resp.HookData = make(map[string]string, len(req.Task.Artifacts)) + h.eventEmitter.EmitEvent(structs.NewTaskEvent(structs.TaskDownloadingArtifacts)) for _, artifact := range req.Task.Artifacts { + aid := artifact.Hash() + if req.HookData[aid] != "" { + h.logger.Trace("skipping already downloaded artifact", "artifact", artifact.GetterSource) + resp.HookData[aid] = req.HookData[aid] + continue + } + + h.logger.Debug("downloading artifact", "artifact", artifact.GetterSource) //XXX add ctx to GetArtifact to allow cancelling long downloads if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir.Dir); err != nil { wrapped := structs.NewRecoverableError( @@ -50,6 +61,11 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar return herr } + + // Mark artifact as downloaded to avoid re-downloading due to + // retries caused by subsequent artifacts failing. Any + // non-empty value works. + resp.HookData[aid] = "1" } resp.Done = true diff --git a/client/allocrunner/taskrunner/artifact_hook_test.go b/client/allocrunner/taskrunner/artifact_hook_test.go index a9babf88d76..8e79684e95b 100644 --- a/client/allocrunner/taskrunner/artifact_hook_test.go +++ b/client/allocrunner/taskrunner/artifact_hook_test.go @@ -2,7 +2,12 @@ package taskrunner import ( "context" + "io/ioutil" + "net/http" + "net/http/httptest" "os" + "path/filepath" + "sort" "testing" "github.com/hashicorp/nomad/client/allocdir" @@ -55,3 +60,100 @@ func TestTaskRunner_ArtifactHook_Recoverable(t *testing.T) { require.Len(t, me.events, 1) require.Equal(t, structs.TaskDownloadingArtifacts, me.events[0].Type) } + +// TestTaskRunnerArtifactHook_PartialDone asserts that the artifact hook skips +// already downloaded artifacts when subsequent artifacts fail and cause a +// restart. +func TestTaskRunner_ArtifactHook_PartialDone(t *testing.T) { + t.Parallel() + + me := &mockEmitter{} + artifactHook := newArtifactHook(me, testlog.HCLogger(t)) + + // Create a source directory with 1 of the 2 artifacts + srcdir, err := ioutil.TempDir("", "nomadtest-src") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(srcdir)) + }() + + // Only create one of the 2 artifacts to cause an error on first run. + file1 := filepath.Join(srcdir, "foo.txt") + require.NoError(t, ioutil.WriteFile(file1, []byte{'1'}, 0644)) + + // Test server to serve the artifacts + ts := httptest.NewServer(http.FileServer(http.Dir(srcdir))) + defer ts.Close() + + // Create the target directory. + destdir, err := ioutil.TempDir("", "nomadtest-dest") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(destdir)) + }() + + req := &interfaces.TaskPrestartRequest{ + TaskEnv: taskenv.NewEmptyTaskEnv(), + TaskDir: &allocdir.TaskDir{Dir: destdir}, + Task: &structs.Task{ + Artifacts: []*structs.TaskArtifact{ + { + GetterSource: ts.URL + "/foo.txt", + GetterMode: structs.GetterModeAny, + }, + { + GetterSource: ts.URL + "/bar.txt", + GetterMode: structs.GetterModeAny, + }, + }, + }, + } + + resp := interfaces.TaskPrestartResponse{} + + // On first run file1 (foo) should download but file2 (bar) should + // fail. + err = artifactHook.Prestart(context.Background(), req, &resp) + + require.NotNil(t, err) + require.True(t, structs.IsRecoverable(err)) + require.Len(t, resp.HookData, 1) + require.False(t, resp.Done) + require.Len(t, me.events, 1) + require.Equal(t, structs.TaskDownloadingArtifacts, me.events[0].Type) + + // Remove file1 from the server so it errors if its downloaded again. + require.NoError(t, os.Remove(file1)) + + // Write file2 so artifacts can download successfully + file2 := filepath.Join(srcdir, "bar.txt") + require.NoError(t, ioutil.WriteFile(file2, []byte{'1'}, 0644)) + + // Mock TaskRunner by copying HookData from resp to req and reset resp. + req.HookData = resp.HookData + + resp = interfaces.TaskPrestartResponse{} + + // Retry the download and assert it succeeds + err = artifactHook.Prestart(context.Background(), req, &resp) + + require.NoError(t, err) + require.True(t, resp.Done) + require.Len(t, resp.HookData, 2) + + // Assert both files downloaded properly + files, err := filepath.Glob(filepath.Join(destdir, "*.txt")) + require.NoError(t, err) + sort.Strings(files) + require.Contains(t, files[0], "bar.txt") + require.Contains(t, files[1], "foo.txt") + + // Stop the test server entirely and assert that re-running works + ts.Close() + req.HookData = resp.HookData + resp = interfaces.TaskPrestartResponse{} + err = artifactHook.Prestart(context.Background(), req, &resp) + require.NoError(t, err) + require.True(t, resp.Done) + require.Len(t, resp.HookData, 2) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c7030f415a5..b698e68a048 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -8,6 +8,7 @@ import ( "crypto/sha256" "crypto/sha512" "encoding/base32" + "encoding/base64" "encoding/hex" "errors" "fmt" @@ -6248,6 +6249,32 @@ func (ta *TaskArtifact) GoString() string { return fmt.Sprintf("%+v", ta) } +// Hash creates a unique identifier for a TaskArtifact as the same GetterSource +// may be specified multiple times with different destinations. +func (ta *TaskArtifact) Hash() string { + hash, err := blake2b.New256(nil) + if err != nil { + panic(err) + } + + hash.Write([]byte(ta.GetterSource)) + + // Must iterate over keys in a consistent order + keys := make([]string, 0, len(ta.GetterOptions)) + for k := range ta.GetterOptions { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + hash.Write([]byte(k)) + hash.Write([]byte(ta.GetterOptions[k])) + } + + hash.Write([]byte(ta.GetterMode)) + hash.Write([]byte(ta.RelativeDest)) + return base64.RawStdEncoding.EncodeToString(hash.Sum(nil)) +} + // PathEscapesAllocDir returns if the given path escapes the allocation // directory. The prefix allows adding a prefix if the path will be joined, for // example a "task/local" prefix may be provided if the path will be joined diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 293f4f0cf77..90b33b19827 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/consul/api" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper/uuid" + "github.com/kr/pretty" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -2607,6 +2608,93 @@ func TestTaskArtifact_Validate_Dest(t *testing.T) { } } +// TestTaskArtifact_Hash asserts an artifact's hash changes when any of the +// fields change. +func TestTaskArtifact_Hash(t *testing.T) { + t.Parallel() + + cases := []TaskArtifact{ + {}, + { + GetterSource: "a", + }, + { + GetterSource: "b", + }, + { + GetterSource: "b", + GetterOptions: map[string]string{"c": "c"}, + }, + { + GetterSource: "b", + GetterOptions: map[string]string{ + "c": "c", + "d": "d", + }, + }, + { + GetterSource: "b", + GetterOptions: map[string]string{ + "c": "c", + "d": "e", + }, + }, + { + GetterSource: "b", + GetterOptions: map[string]string{ + "c": "c", + "d": "e", + }, + GetterMode: "f", + }, + { + GetterSource: "b", + GetterOptions: map[string]string{ + "c": "c", + "d": "e", + }, + GetterMode: "g", + }, + { + GetterSource: "b", + GetterOptions: map[string]string{ + "c": "c", + "d": "e", + }, + GetterMode: "g", + RelativeDest: "h", + }, + { + GetterSource: "b", + GetterOptions: map[string]string{ + "c": "c", + "d": "e", + }, + GetterMode: "g", + RelativeDest: "i", + }, + } + + // Map of hash to source + hashes := make(map[string]TaskArtifact, len(cases)) + for _, tc := range cases { + h := tc.Hash() + + // Hash should be deterministic + require.Equal(t, h, tc.Hash()) + + // Hash should be unique + if orig, ok := hashes[h]; ok { + require.Failf(t, "hashes match", "artifact 1: %s\n\n artifact 2: %s\n", + pretty.Sprint(tc), pretty.Sprint(orig), + ) + } + hashes[h] = tc + } + + require.Len(t, hashes, len(cases)) +} + func TestAllocation_ShouldMigrate(t *testing.T) { alloc := Allocation{ PreviousAllocation: "123",