Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomic Snapshotting / Sticky Volume Migration #3563

Merged
merged 5 commits into from
Dec 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ IMPROVEMENTS:
* api: Environment variables are ignored during service name validation [GH-3532]
* cli: Allocation create and modify times are displayed in a human readable
relative format like `6 h ago` [GH-3449]
* client: Sticky volume migrations are now atomic. [GH-3563]
* client: Added metrics to track state transitions of allocations [GH-3061]
* client: When `network_interface` is unspecified use interface attached to
default route [GH-3546]
Expand Down
10 changes: 9 additions & 1 deletion client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ func (r *AllocRunner) Run() {
r.allocDirLock.Unlock()

if err != nil {
r.logger.Printf("[ERR] client: failed to build task directories: %v", err)
r.logger.Printf("[ERR] client: alloc %q failed to build task directories: %v", r.allocID, err)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup))
return
}
Expand All @@ -841,6 +841,14 @@ func (r *AllocRunner) Run() {

// Soft-fail on migration errors
r.logger.Printf("[WARN] client: alloc %q error while migrating data from previous alloc: %v", r.allocID, err)

// Recreate alloc dir to ensure a clean slate
r.allocDir.Destroy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can allocDir ever be nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, allocDir is initialized in NewAllocRunner and RestoreState includes (hopefully sufficient!) checks to prevent restoring a nil alloc dir. (Good question as I believe this did cause issues in the past!)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for error here. Probably can't continue if the destroy fails as well!

if err := r.allocDir.Build(); err != nil {
r.logger.Printf("[ERR] client: alloc %q failed to clean task directories after failed migration: %v", r.allocID, err)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to rebuild task dirs for '%s'", alloc.TaskGroup))
return
}
}

// Check if the allocation is in a terminal status. In this case, we don't
Expand Down
15 changes: 15 additions & 0 deletions client/alloc_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser
}
}

// if we see this file, there was an error on the remote side
errorFilename := allocdir.SnapshotErrorFilename(p.prevAllocID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something obvious, but I can't find where allocdir is initialized. Can it ever be nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allocdir is sneakily referring to a package here... kind of unfortunate I use allocDir a lot as a variable and allocdir a lot as a package... 😬


buf := make([]byte, 1024)
for !canceled() {
// Get the next header
Expand All @@ -478,6 +481,18 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser
p.prevAllocID, p.allocID, err)
}

if hdr.Name == errorFilename {
// Error snapshotting on the remote side, try to read
// the message out of the file and return it.
errBuf := make([]byte, int(hdr.Size))
if _, err := tr.Read(errBuf); err != nil {
return fmt.Errorf("error streaming previous alloc %q for new alloc %q; failed reading error message: %v",
p.prevAllocID, p.allocID, err)
}
return fmt.Errorf("error streaming previous alloc %q for new alloc %q: %s",
p.prevAllocID, p.allocID, string(errBuf))
}

// If the header is for a directory we create the directory
if hdr.Typeflag == tar.TypeDir {
os.MkdirAll(filepath.Join(dest, hdr.Name), os.FileMode(hdr.Mode))
Expand Down
81 changes: 79 additions & 2 deletions client/alloc_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad/mock"
)
Expand Down Expand Up @@ -68,9 +70,9 @@ func TestPrevAlloc_LocalPrevAlloc(t *testing.T) {
}
}

// TestPrevAlloc_StreamAllocDir asserts that streaming a tar to an alloc dir
// TestPrevAlloc_StreamAllocDir_Ok asserts that streaming a tar to an alloc dir
// works.
func TestPrevAlloc_StreamAllocDir(t *testing.T) {
func TestPrevAlloc_StreamAllocDir_Ok(t *testing.T) {
t.Parallel()
dir, err := ioutil.TempDir("", "")
if err != nil {
Expand Down Expand Up @@ -196,3 +198,78 @@ func TestPrevAlloc_StreamAllocDir(t *testing.T) {
t.Fatalf("mode: %v", fi2.Mode())
}
}

// TestPrevAlloc_StreamAllocDir_Error asserts that errors encountered while
// streaming a tar cause the migration to be cancelled and no files are written
// (migrations are atomic).
func TestPrevAlloc_StreamAllocDir_Error(t *testing.T) {
t.Parallel()
dest, err := ioutil.TempDir("", "nomadtest-")
if err != nil {
t.Fatalf("err: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use assert test package- same for all other tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With that said, the tests look great!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually prefer this style as assert uses t.Error not t.Fatal so your test continues and usually panics due to the non-error value being nil.

...but maybe I'm the only one who gets confused by this? 😕

}
defer os.RemoveAll(dest)

// This test only unit tests streamAllocDir so we only need a partially
// complete remotePrevAlloc
prevAlloc := &remotePrevAlloc{
logger: testLogger(),
allocID: "123",
prevAllocID: "abc",
migrate: true,
}

tarBuf := bytes.NewBuffer(nil)
tw := tar.NewWriter(tarBuf)
fooHdr := tar.Header{
Name: "foo.txt",
Mode: 0666,
Size: 1,
ModTime: time.Now(),
Typeflag: tar.TypeReg,
}
err = tw.WriteHeader(&fooHdr)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use assert library.

t.Fatalf("error writing file header: %v", err)
}
if _, err := tw.Write([]byte{'a'}); err != nil {
t.Fatalf("error writing file: %v", err)
}

// Now write the error file
contents := []byte("SENTINEL ERROR")
err = tw.WriteHeader(&tar.Header{
Name: allocdir.SnapshotErrorFilename(prevAlloc.prevAllocID),
Mode: 0666,
Size: int64(len(contents)),
AccessTime: allocdir.SnapshotErrorTime,
ChangeTime: allocdir.SnapshotErrorTime,
ModTime: allocdir.SnapshotErrorTime,
Typeflag: tar.TypeReg,
})
if err != nil {
t.Fatalf("error writing sentinel file header: %v", err)
}
if _, err := tw.Write(contents); err != nil {
t.Fatalf("error writing sentinel file: %v", err)
}

// Assert streamAllocDir fails
err = prevAlloc.streamAllocDir(context.Background(), ioutil.NopCloser(tarBuf), dest)
if err == nil {
t.Fatalf("expected an error from streamAllocDir")
}
if !strings.HasSuffix(err.Error(), string(contents)) {
t.Fatalf("expected error to end with %q but found: %v", string(contents), err)
}

// streamAllocDir leaves cleanup to the caller on error, so assert
// "foo.txt" was written
fi, err := os.Stat(filepath.Join(dest, "foo.txt"))
if err != nil {
t.Fatalf("error reading foo.txt: %v", err)
}
if fi.Size() != fooHdr.Size {
t.Fatalf("expected foo.txt to be size 1 but found %d", fi.Size())
}
}
57 changes: 55 additions & 2 deletions client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ const (
)

var (
// SnapshotErrorTime is the sentinel time that will be used on the
// error file written by Snapshot when it encounters as error.
SnapshotErrorTime = time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should have a IsSnapshotErrorFile method that checks this as well as the file name. Right now this is unused on the validation path


// The name of the directory that is shared across tasks in a task group.
SharedAllocName = "alloc"

Expand Down Expand Up @@ -128,6 +132,10 @@ func (d *AllocDir) NewTaskDir(name string) *TaskDir {

// Snapshot creates an archive of the files and directories in the data dir of
// the allocation and the task local directories
//
// Since a valid tar may have been written even when an error occurs, a special
// file "NOMAD-${ALLOC_ID}-ERROR.log" will be appended to the tar with the
// error message as the contents.
func (d *AllocDir) Snapshot(w io.Writer) error {
allocDataDir := filepath.Join(d.SharedDir, SharedDataDir)
rootPaths := []string{allocDataDir}
Expand All @@ -139,6 +147,10 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
defer tw.Close()

walkFn := func(path string, fileInfo os.FileInfo, err error) error {
if err != nil {
return err
}

// Include the path of the file name relative to the alloc dir
// so that we can put the files in the right directories
relPath, err := filepath.Rel(d.AllocDir, path)
Expand All @@ -158,7 +170,9 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
return fmt.Errorf("error creating file header: %v", err)
}
hdr.Name = relPath
tw.WriteHeader(hdr)
if err := tw.WriteHeader(hdr); err != nil {
return err
}

// If it's a directory or symlink we just write the header into the tar
if fileInfo.IsDir() || (fileInfo.Mode()&os.ModeSymlink != 0) {
Expand All @@ -182,7 +196,16 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
// directories in the archive
for _, path := range rootPaths {
if err := filepath.Walk(path, walkFn); err != nil {
return err
allocID := filepath.Base(d.AllocDir)
if writeErr := writeError(tw, allocID, err); writeErr != nil {
// This could be bad; other side won't know
// snapshotting failed. It could also just mean
// the snapshotting side closed the connect
// prematurely and won't try to use the tar
// anyway.
d.logger.Printf("[WARN] client: snapshotting failed and unable to write error marker: %v", writeErr)
}
return fmt.Errorf("failed to snapshot %s: %v", path, err)
}
}

Expand Down Expand Up @@ -242,6 +265,8 @@ func (d *AllocDir) Destroy() error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("failed to remove alloc dir %q: %v", d.AllocDir, err))
}

// Unset built since the alloc dir has been destroyed.
d.built = false
return mErr.ErrorOrNil()
}

Expand Down Expand Up @@ -557,3 +582,31 @@ func splitPath(path string) ([]fileInfo, error) {
}
return dirs, nil
}

// SnapshotErrorFilename returns the filename which will exist if there was an
// error snapshotting a tar.
func SnapshotErrorFilename(allocID string) string {
return fmt.Sprintf("NOMAD-%s-ERROR.log", allocID)
}

// writeError writes a special file to a tar archive with the error encountered
// during snapshotting. See Snapshot().
func writeError(tw *tar.Writer, allocID string, err error) error {
contents := []byte(fmt.Sprintf("Error snapshotting: %v", err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be at the beginning of the function? Maybe put it closer to where it is actually used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to put its length into the Header, so it has to stay here.

hdr := tar.Header{
Name: SnapshotErrorFilename(allocID),
Mode: 0666,
Size: int64(len(contents)),
AccessTime: SnapshotErrorTime,
ChangeTime: SnapshotErrorTime,
ModTime: SnapshotErrorTime,
Typeflag: tar.TypeReg,
}

if err := tw.WriteHeader(&hdr); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can tw ever be nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's always initialized in the single caller to writeError.

return err
}

_, err = tw.Write(contents)
return err
}
Loading