Skip to content

Commit

Permalink
client: don't redownload completed artifacts on retries
Browse files Browse the repository at this point in the history
Track the download status of each artifact independently so that if only
one of many artifacts fails to download, completed artifacts aren't
downloaded again.
  • Loading branch information
schmichael committed Feb 20, 2019
1 parent c51a54c commit 1acd4ca
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ IMPROVEMENTS:
* client: Extend timeout to 60 seconds for Windows CPU fingerprinting [[GH-4441](https://github.com/hashicorp/nomad/pull/4441)]
* client: Refactor client to support plugins and improve state handling [[GH-4792](https://github.com/hashicorp/nomad/pull/4792)]
* client: Updated consul-template library to pick up recent fixes and improvements[[GH-4885](https://github.com/hashicorp/nomad/pull/4885)]
* client: When retrying a failed artifact, do not download any successfully downloaded artifacts again [[GH-5322](https://github.com/hashicorp/nomad/issues/5322)]
* client: Added service metadata tag that enables the Consul UI to show a Nomad icon for services registered by Nomad [[GH-4889](https://github.com/hashicorp/nomad/issues/4889)]
* driver/docker: Support logs when using Docker for Mac [[GH-4758](https://github.com/hashicorp/nomad/issues/4758)]
* driver/docker: Added support for specifying `storage_opt` in the Docker driver [[GH-4908](https://github.com/hashicorp/nomad/pull/4908)]
Expand Down
16 changes: 16 additions & 0 deletions client/allocrunner/taskrunner/artifact_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,20 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar
return nil
}

// Initialize HookData to store download progress
resp.HookData = make(map[string]string, len(req.Task.Artifacts))

h.eventEmitter.EmitEvent(structs.NewTaskEvent(structs.TaskDownloadingArtifacts))

for _, artifact := range req.Task.Artifacts {
aid := artifact.Hash()
if req.HookData[aid] != "" {
h.logger.Trace("skipping already downloaded artifact", "artifact", artifact.GetterSource)
resp.HookData[aid] = req.HookData[aid]
continue
}

h.logger.Debug("downloading artifact", "artifact", artifact.GetterSource)
//XXX add ctx to GetArtifact to allow cancelling long downloads
if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir.Dir); err != nil {
wrapped := structs.NewRecoverableError(
Expand All @@ -50,6 +61,11 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar

return herr
}

// Mark artifact as downloaded to avoid re-downloading due to
// retries caused by subsequent artifacts failing. Any
// non-empty value works.
resp.HookData[aid] = "1"
}

resp.Done = true
Expand Down
102 changes: 102 additions & 0 deletions client/allocrunner/taskrunner/artifact_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ package taskrunner

import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"sort"
"testing"

"github.com/hashicorp/nomad/client/allocdir"
Expand Down Expand Up @@ -55,3 +60,100 @@ func TestTaskRunner_ArtifactHook_Recoverable(t *testing.T) {
require.Len(t, me.events, 1)
require.Equal(t, structs.TaskDownloadingArtifacts, me.events[0].Type)
}

// TestTaskRunnerArtifactHook_PartialDone asserts that the artifact hook skips
// already downloaded artifacts when subsequent artifacts fail and cause a
// restart.
func TestTaskRunner_ArtifactHook_PartialDone(t *testing.T) {
t.Parallel()

me := &mockEmitter{}
artifactHook := newArtifactHook(me, testlog.HCLogger(t))

// Create a source directory with 1 of the 2 artifacts
srcdir, err := ioutil.TempDir("", "nomadtest-src")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(srcdir))
}()

// Only create one of the 2 artifacts to cause an error on first run.
file1 := filepath.Join(srcdir, "foo.txt")
require.NoError(t, ioutil.WriteFile(file1, []byte{'1'}, 0644))

// Test server to serve the artifacts
ts := httptest.NewServer(http.FileServer(http.Dir(srcdir)))
defer ts.Close()

// Create the target directory.
destdir, err := ioutil.TempDir("", "nomadtest-dest")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(destdir))
}()

req := &interfaces.TaskPrestartRequest{
TaskEnv: taskenv.NewEmptyTaskEnv(),
TaskDir: &allocdir.TaskDir{Dir: destdir},
Task: &structs.Task{
Artifacts: []*structs.TaskArtifact{
{
GetterSource: ts.URL + "/foo.txt",
GetterMode: structs.GetterModeAny,
},
{
GetterSource: ts.URL + "/bar.txt",
GetterMode: structs.GetterModeAny,
},
},
},
}

resp := interfaces.TaskPrestartResponse{}

// On first run file1 (foo) should download but file2 (bar) should
// fail.
err = artifactHook.Prestart(context.Background(), req, &resp)

require.NotNil(t, err)
require.True(t, structs.IsRecoverable(err))
require.Len(t, resp.HookData, 1)
require.False(t, resp.Done)
require.Len(t, me.events, 1)
require.Equal(t, structs.TaskDownloadingArtifacts, me.events[0].Type)

// Remove file1 from the server so it errors if its downloaded again.
require.NoError(t, os.Remove(file1))

// Write file2 so artifacts can download successfully
file2 := filepath.Join(srcdir, "bar.txt")
require.NoError(t, ioutil.WriteFile(file2, []byte{'1'}, 0644))

// Mock TaskRunner by copying HookData from resp to req and reset resp.
req.HookData = resp.HookData

resp = interfaces.TaskPrestartResponse{}

// Retry the download and assert it succeeds
err = artifactHook.Prestart(context.Background(), req, &resp)

require.NoError(t, err)
require.True(t, resp.Done)
require.Len(t, resp.HookData, 2)

// Assert both files downloaded properly
files, err := filepath.Glob(filepath.Join(destdir, "*.txt"))
require.NoError(t, err)
sort.Strings(files)
require.Contains(t, files[0], "bar.txt")
require.Contains(t, files[1], "foo.txt")

// Stop the test server entirely and assert that re-running works
ts.Close()
req.HookData = resp.HookData
resp = interfaces.TaskPrestartResponse{}
err = artifactHook.Prestart(context.Background(), req, &resp)
require.NoError(t, err)
require.True(t, resp.Done)
require.Len(t, resp.HookData, 2)
}
27 changes: 27 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/sha256"
"crypto/sha512"
"encoding/base32"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
Expand Down Expand Up @@ -6248,6 +6249,32 @@ func (ta *TaskArtifact) GoString() string {
return fmt.Sprintf("%+v", ta)
}

// Hash creates a unique identifier for a TaskArtifact as the same GetterSource
// may be specified multiple times with different destinations.
func (ta *TaskArtifact) Hash() string {
hash, err := blake2b.New256(nil)
if err != nil {
panic(err)
}

hash.Write([]byte(ta.GetterSource))

// Must iterate over keys in a consistent order
keys := make([]string, 0, len(ta.GetterOptions))
for k := range ta.GetterOptions {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
hash.Write([]byte(k))
hash.Write([]byte(ta.GetterOptions[k]))
}

hash.Write([]byte(ta.GetterMode))
hash.Write([]byte(ta.RelativeDest))
return base64.RawStdEncoding.EncodeToString(hash.Sum(nil))
}

// PathEscapesAllocDir returns if the given path escapes the allocation
// directory. The prefix allows adding a prefix if the path will be joined, for
// example a "task/local" prefix may be provided if the path will be joined
Expand Down
88 changes: 88 additions & 0 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/api"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -2607,6 +2608,93 @@ func TestTaskArtifact_Validate_Dest(t *testing.T) {
}
}

// TestTaskArtifact_Hash asserts an artifact's hash changes when any of the
// fields change.
func TestTaskArtifact_Hash(t *testing.T) {
t.Parallel()

cases := []TaskArtifact{
{},
{
GetterSource: "a",
},
{
GetterSource: "b",
},
{
GetterSource: "b",
GetterOptions: map[string]string{"c": "c"},
},
{
GetterSource: "b",
GetterOptions: map[string]string{
"c": "c",
"d": "d",
},
},
{
GetterSource: "b",
GetterOptions: map[string]string{
"c": "c",
"d": "e",
},
},
{
GetterSource: "b",
GetterOptions: map[string]string{
"c": "c",
"d": "e",
},
GetterMode: "f",
},
{
GetterSource: "b",
GetterOptions: map[string]string{
"c": "c",
"d": "e",
},
GetterMode: "g",
},
{
GetterSource: "b",
GetterOptions: map[string]string{
"c": "c",
"d": "e",
},
GetterMode: "g",
RelativeDest: "h",
},
{
GetterSource: "b",
GetterOptions: map[string]string{
"c": "c",
"d": "e",
},
GetterMode: "g",
RelativeDest: "i",
},
}

// Map of hash to source
hashes := make(map[string]TaskArtifact, len(cases))
for _, tc := range cases {
h := tc.Hash()

// Hash should be deterministic
require.Equal(t, h, tc.Hash())

// Hash should be unique
if orig, ok := hashes[h]; ok {
require.Failf(t, "hashes match", "artifact 1: %s\n\n artifact 2: %s\n",
pretty.Sprint(tc), pretty.Sprint(orig),
)
}
hashes[h] = tc
}

require.Len(t, hashes, len(cases))
}

func TestAllocation_ShouldMigrate(t *testing.T) {
alloc := Allocation{
PreviousAllocation: "123",
Expand Down

0 comments on commit 1acd4ca

Please sign in to comment.