Skip to content

Commit

Permalink
Merge pull request #3563 from hashicorp/b-snapshot-atomic
Browse files Browse the repository at this point in the history
Atomic Snapshotting / Sticky Volume Migration
  • Loading branch information
schmichael authored Dec 5, 2017
2 parents 104e0e5 + 1ffb189 commit b53f24b
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ IMPROVEMENTS:
* api: Environment variables are ignored during service name validation [GH-3532]
* cli: Allocation create and modify times are displayed in a human readable
relative format like `6 h ago` [GH-3449]
* client: Sticky volume migrations are now atomic. [GH-3563]
* client: Added metrics to track state transitions of allocations [GH-3061]
* client: When `network_interface` is unspecified use interface attached to
default route [GH-3546]
Expand Down
10 changes: 9 additions & 1 deletion client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ func (r *AllocRunner) Run() {
r.allocDirLock.Unlock()

if err != nil {
r.logger.Printf("[ERR] client: failed to build task directories: %v", err)
r.logger.Printf("[ERR] client: alloc %q failed to build task directories: %v", r.allocID, err)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup))
return
}
Expand All @@ -841,6 +841,14 @@ func (r *AllocRunner) Run() {

// Soft-fail on migration errors
r.logger.Printf("[WARN] client: alloc %q error while migrating data from previous alloc: %v", r.allocID, err)

// Recreate alloc dir to ensure a clean slate
r.allocDir.Destroy()
if err := r.allocDir.Build(); err != nil {
r.logger.Printf("[ERR] client: alloc %q failed to clean task directories after failed migration: %v", r.allocID, err)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to rebuild task dirs for '%s'", alloc.TaskGroup))
return
}
}

// Check if the allocation is in a terminal status. In this case, we don't
Expand Down
15 changes: 15 additions & 0 deletions client/alloc_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser
}
}

// if we see this file, there was an error on the remote side
errorFilename := allocdir.SnapshotErrorFilename(p.prevAllocID)

buf := make([]byte, 1024)
for !canceled() {
// Get the next header
Expand All @@ -478,6 +481,18 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser
p.prevAllocID, p.allocID, err)
}

if hdr.Name == errorFilename {
// Error snapshotting on the remote side, try to read
// the message out of the file and return it.
errBuf := make([]byte, int(hdr.Size))
if _, err := tr.Read(errBuf); err != nil {
return fmt.Errorf("error streaming previous alloc %q for new alloc %q; failed reading error message: %v",
p.prevAllocID, p.allocID, err)
}
return fmt.Errorf("error streaming previous alloc %q for new alloc %q: %s",
p.prevAllocID, p.allocID, string(errBuf))
}

// If the header is for a directory we create the directory
if hdr.Typeflag == tar.TypeDir {
os.MkdirAll(filepath.Join(dest, hdr.Name), os.FileMode(hdr.Mode))
Expand Down
81 changes: 79 additions & 2 deletions client/alloc_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad/mock"
)
Expand Down Expand Up @@ -68,9 +70,9 @@ func TestPrevAlloc_LocalPrevAlloc(t *testing.T) {
}
}

// TestPrevAlloc_StreamAllocDir asserts that streaming a tar to an alloc dir
// TestPrevAlloc_StreamAllocDir_Ok asserts that streaming a tar to an alloc dir
// works.
func TestPrevAlloc_StreamAllocDir(t *testing.T) {
func TestPrevAlloc_StreamAllocDir_Ok(t *testing.T) {
t.Parallel()
dir, err := ioutil.TempDir("", "")
if err != nil {
Expand Down Expand Up @@ -196,3 +198,78 @@ func TestPrevAlloc_StreamAllocDir(t *testing.T) {
t.Fatalf("mode: %v", fi2.Mode())
}
}

// TestPrevAlloc_StreamAllocDir_Error asserts that errors encountered while
// streaming a tar cause the migration to be cancelled and no files are written
// (migrations are atomic).
func TestPrevAlloc_StreamAllocDir_Error(t *testing.T) {
t.Parallel()
dest, err := ioutil.TempDir("", "nomadtest-")
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(dest)

// This test only unit tests streamAllocDir so we only need a partially
// complete remotePrevAlloc
prevAlloc := &remotePrevAlloc{
logger: testLogger(),
allocID: "123",
prevAllocID: "abc",
migrate: true,
}

tarBuf := bytes.NewBuffer(nil)
tw := tar.NewWriter(tarBuf)
fooHdr := tar.Header{
Name: "foo.txt",
Mode: 0666,
Size: 1,
ModTime: time.Now(),
Typeflag: tar.TypeReg,
}
err = tw.WriteHeader(&fooHdr)
if err != nil {
t.Fatalf("error writing file header: %v", err)
}
if _, err := tw.Write([]byte{'a'}); err != nil {
t.Fatalf("error writing file: %v", err)
}

// Now write the error file
contents := []byte("SENTINEL ERROR")
err = tw.WriteHeader(&tar.Header{
Name: allocdir.SnapshotErrorFilename(prevAlloc.prevAllocID),
Mode: 0666,
Size: int64(len(contents)),
AccessTime: allocdir.SnapshotErrorTime,
ChangeTime: allocdir.SnapshotErrorTime,
ModTime: allocdir.SnapshotErrorTime,
Typeflag: tar.TypeReg,
})
if err != nil {
t.Fatalf("error writing sentinel file header: %v", err)
}
if _, err := tw.Write(contents); err != nil {
t.Fatalf("error writing sentinel file: %v", err)
}

// Assert streamAllocDir fails
err = prevAlloc.streamAllocDir(context.Background(), ioutil.NopCloser(tarBuf), dest)
if err == nil {
t.Fatalf("expected an error from streamAllocDir")
}
if !strings.HasSuffix(err.Error(), string(contents)) {
t.Fatalf("expected error to end with %q but found: %v", string(contents), err)
}

// streamAllocDir leaves cleanup to the caller on error, so assert
// "foo.txt" was written
fi, err := os.Stat(filepath.Join(dest, "foo.txt"))
if err != nil {
t.Fatalf("error reading foo.txt: %v", err)
}
if fi.Size() != fooHdr.Size {
t.Fatalf("expected foo.txt to be size 1 but found %d", fi.Size())
}
}
57 changes: 55 additions & 2 deletions client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ const (
)

var (
// SnapshotErrorTime is the sentinel time that will be used on the
// error file written by Snapshot when it encounters as error.
SnapshotErrorTime = time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC)

// The name of the directory that is shared across tasks in a task group.
SharedAllocName = "alloc"

Expand Down Expand Up @@ -128,6 +132,10 @@ func (d *AllocDir) NewTaskDir(name string) *TaskDir {

// Snapshot creates an archive of the files and directories in the data dir of
// the allocation and the task local directories
//
// Since a valid tar may have been written even when an error occurs, a special
// file "NOMAD-${ALLOC_ID}-ERROR.log" will be appended to the tar with the
// error message as the contents.
func (d *AllocDir) Snapshot(w io.Writer) error {
allocDataDir := filepath.Join(d.SharedDir, SharedDataDir)
rootPaths := []string{allocDataDir}
Expand All @@ -139,6 +147,10 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
defer tw.Close()

walkFn := func(path string, fileInfo os.FileInfo, err error) error {
if err != nil {
return err
}

// Include the path of the file name relative to the alloc dir
// so that we can put the files in the right directories
relPath, err := filepath.Rel(d.AllocDir, path)
Expand All @@ -158,7 +170,9 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
return fmt.Errorf("error creating file header: %v", err)
}
hdr.Name = relPath
tw.WriteHeader(hdr)
if err := tw.WriteHeader(hdr); err != nil {
return err
}

// If it's a directory or symlink we just write the header into the tar
if fileInfo.IsDir() || (fileInfo.Mode()&os.ModeSymlink != 0) {
Expand All @@ -182,7 +196,16 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
// directories in the archive
for _, path := range rootPaths {
if err := filepath.Walk(path, walkFn); err != nil {
return err
allocID := filepath.Base(d.AllocDir)
if writeErr := writeError(tw, allocID, err); writeErr != nil {
// This could be bad; other side won't know
// snapshotting failed. It could also just mean
// the snapshotting side closed the connect
// prematurely and won't try to use the tar
// anyway.
d.logger.Printf("[WARN] client: snapshotting failed and unable to write error marker: %v", writeErr)
}
return fmt.Errorf("failed to snapshot %s: %v", path, err)
}
}

Expand Down Expand Up @@ -242,6 +265,8 @@ func (d *AllocDir) Destroy() error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("failed to remove alloc dir %q: %v", d.AllocDir, err))
}

// Unset built since the alloc dir has been destroyed.
d.built = false
return mErr.ErrorOrNil()
}

Expand Down Expand Up @@ -557,3 +582,31 @@ func splitPath(path string) ([]fileInfo, error) {
}
return dirs, nil
}

// SnapshotErrorFilename returns the filename which will exist if there was an
// error snapshotting a tar.
func SnapshotErrorFilename(allocID string) string {
return fmt.Sprintf("NOMAD-%s-ERROR.log", allocID)
}

// writeError writes a special file to a tar archive with the error encountered
// during snapshotting. See Snapshot().
func writeError(tw *tar.Writer, allocID string, err error) error {
contents := []byte(fmt.Sprintf("Error snapshotting: %v", err))
hdr := tar.Header{
Name: SnapshotErrorFilename(allocID),
Mode: 0666,
Size: int64(len(contents)),
AccessTime: SnapshotErrorTime,
ChangeTime: SnapshotErrorTime,
ModTime: SnapshotErrorTime,
Typeflag: tar.TypeReg,
}

if err := tw.WriteHeader(&hdr); err != nil {
return err
}

_, err = tw.Write(contents)
return err
}
Loading

0 comments on commit b53f24b

Please sign in to comment.