-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Atomic Snapshotting / Sticky Volume Migration #3563
Changes from all commits
4a49375
a05862d
e3256ec
50b335f
1ffb189
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -819,7 +819,7 @@ func (r *AllocRunner) Run() { | |
r.allocDirLock.Unlock() | ||
|
||
if err != nil { | ||
r.logger.Printf("[ERR] client: failed to build task directories: %v", err) | ||
r.logger.Printf("[ERR] client: alloc %q failed to build task directories: %v", r.allocID, err) | ||
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup)) | ||
return | ||
} | ||
|
@@ -841,6 +841,14 @@ func (r *AllocRunner) Run() { | |
|
||
// Soft-fail on migration errors | ||
r.logger.Printf("[WARN] client: alloc %q error while migrating data from previous alloc: %v", r.allocID, err) | ||
|
||
// Recreate alloc dir to ensure a clean slate | ||
r.allocDir.Destroy() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check for error here. Probably can't continue if the destroy fails as well! |
||
if err := r.allocDir.Build(); err != nil { | ||
r.logger.Printf("[ERR] client: alloc %q failed to clean task directories after failed migration: %v", r.allocID, err) | ||
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to rebuild task dirs for '%s'", alloc.TaskGroup)) | ||
return | ||
} | ||
} | ||
|
||
// Check if the allocation is in a terminal status. In this case, we don't | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -463,6 +463,9 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser | |
} | ||
} | ||
|
||
// if we see this file, there was an error on the remote side | ||
errorFilename := allocdir.SnapshotErrorFilename(p.prevAllocID) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I might be missing something obvious, but I can't find where allocdir is initialized. Can it ever be nil? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
buf := make([]byte, 1024) | ||
for !canceled() { | ||
// Get the next header | ||
|
@@ -478,6 +481,18 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser | |
p.prevAllocID, p.allocID, err) | ||
} | ||
|
||
if hdr.Name == errorFilename { | ||
// Error snapshotting on the remote side, try to read | ||
// the message out of the file and return it. | ||
errBuf := make([]byte, int(hdr.Size)) | ||
if _, err := tr.Read(errBuf); err != nil { | ||
return fmt.Errorf("error streaming previous alloc %q for new alloc %q; failed reading error message: %v", | ||
p.prevAllocID, p.allocID, err) | ||
} | ||
return fmt.Errorf("error streaming previous alloc %q for new alloc %q: %s", | ||
p.prevAllocID, p.allocID, string(errBuf)) | ||
} | ||
|
||
// If the header is for a directory we create the directory | ||
if hdr.Typeflag == tar.TypeDir { | ||
os.MkdirAll(filepath.Join(dest, hdr.Name), os.FileMode(hdr.Mode)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,9 +9,11 @@ import ( | |
"io/ioutil" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
"github.com/hashicorp/nomad/client/allocdir" | ||
"github.com/hashicorp/nomad/client/config" | ||
"github.com/hashicorp/nomad/nomad/mock" | ||
) | ||
|
@@ -68,9 +70,9 @@ func TestPrevAlloc_LocalPrevAlloc(t *testing.T) { | |
} | ||
} | ||
|
||
// TestPrevAlloc_StreamAllocDir asserts that streaming a tar to an alloc dir | ||
// TestPrevAlloc_StreamAllocDir_Ok asserts that streaming a tar to an alloc dir | ||
// works. | ||
func TestPrevAlloc_StreamAllocDir(t *testing.T) { | ||
func TestPrevAlloc_StreamAllocDir_Ok(t *testing.T) { | ||
t.Parallel() | ||
dir, err := ioutil.TempDir("", "") | ||
if err != nil { | ||
|
@@ -196,3 +198,78 @@ func TestPrevAlloc_StreamAllocDir(t *testing.T) { | |
t.Fatalf("mode: %v", fi2.Mode()) | ||
} | ||
} | ||
|
||
// TestPrevAlloc_StreamAllocDir_Error asserts that errors encountered while | ||
// streaming a tar cause the migration to be cancelled and no files are written | ||
// (migrations are atomic). | ||
func TestPrevAlloc_StreamAllocDir_Error(t *testing.T) { | ||
t.Parallel() | ||
dest, err := ioutil.TempDir("", "nomadtest-") | ||
if err != nil { | ||
t.Fatalf("err: %v", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use assert test package- same for all other tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With that said, the tests look great! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually prefer this style as ...but maybe I'm the only one who gets confused by this? 😕 |
||
} | ||
defer os.RemoveAll(dest) | ||
|
||
// This test only unit tests streamAllocDir so we only need a partially | ||
// complete remotePrevAlloc | ||
prevAlloc := &remotePrevAlloc{ | ||
logger: testLogger(), | ||
allocID: "123", | ||
prevAllocID: "abc", | ||
migrate: true, | ||
} | ||
|
||
tarBuf := bytes.NewBuffer(nil) | ||
tw := tar.NewWriter(tarBuf) | ||
fooHdr := tar.Header{ | ||
Name: "foo.txt", | ||
Mode: 0666, | ||
Size: 1, | ||
ModTime: time.Now(), | ||
Typeflag: tar.TypeReg, | ||
} | ||
err = tw.WriteHeader(&fooHdr) | ||
if err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use assert library. |
||
t.Fatalf("error writing file header: %v", err) | ||
} | ||
if _, err := tw.Write([]byte{'a'}); err != nil { | ||
t.Fatalf("error writing file: %v", err) | ||
} | ||
|
||
// Now write the error file | ||
contents := []byte("SENTINEL ERROR") | ||
err = tw.WriteHeader(&tar.Header{ | ||
Name: allocdir.SnapshotErrorFilename(prevAlloc.prevAllocID), | ||
Mode: 0666, | ||
Size: int64(len(contents)), | ||
AccessTime: allocdir.SnapshotErrorTime, | ||
ChangeTime: allocdir.SnapshotErrorTime, | ||
ModTime: allocdir.SnapshotErrorTime, | ||
Typeflag: tar.TypeReg, | ||
}) | ||
if err != nil { | ||
t.Fatalf("error writing sentinel file header: %v", err) | ||
} | ||
if _, err := tw.Write(contents); err != nil { | ||
t.Fatalf("error writing sentinel file: %v", err) | ||
} | ||
|
||
// Assert streamAllocDir fails | ||
err = prevAlloc.streamAllocDir(context.Background(), ioutil.NopCloser(tarBuf), dest) | ||
if err == nil { | ||
t.Fatalf("expected an error from streamAllocDir") | ||
} | ||
if !strings.HasSuffix(err.Error(), string(contents)) { | ||
t.Fatalf("expected error to end with %q but found: %v", string(contents), err) | ||
} | ||
|
||
// streamAllocDir leaves cleanup to the caller on error, so assert | ||
// "foo.txt" was written | ||
fi, err := os.Stat(filepath.Join(dest, "foo.txt")) | ||
if err != nil { | ||
t.Fatalf("error reading foo.txt: %v", err) | ||
} | ||
if fi.Size() != fooHdr.Size { | ||
t.Fatalf("expected foo.txt to be size 1 but found %d", fi.Size()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,10 @@ const ( | |
) | ||
|
||
var ( | ||
// SnapshotErrorTime is the sentinel time that will be used on the | ||
// error file written by Snapshot when it encounters as error. | ||
SnapshotErrorTime = time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably should have a |
||
|
||
// The name of the directory that is shared across tasks in a task group. | ||
SharedAllocName = "alloc" | ||
|
||
|
@@ -128,6 +132,10 @@ func (d *AllocDir) NewTaskDir(name string) *TaskDir { | |
|
||
// Snapshot creates an archive of the files and directories in the data dir of | ||
// the allocation and the task local directories | ||
// | ||
// Since a valid tar may have been written even when an error occurs, a special | ||
// file "NOMAD-${ALLOC_ID}-ERROR.log" will be appended to the tar with the | ||
// error message as the contents. | ||
func (d *AllocDir) Snapshot(w io.Writer) error { | ||
allocDataDir := filepath.Join(d.SharedDir, SharedDataDir) | ||
rootPaths := []string{allocDataDir} | ||
|
@@ -139,6 +147,10 @@ func (d *AllocDir) Snapshot(w io.Writer) error { | |
defer tw.Close() | ||
|
||
walkFn := func(path string, fileInfo os.FileInfo, err error) error { | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Include the path of the file name relative to the alloc dir | ||
// so that we can put the files in the right directories | ||
relPath, err := filepath.Rel(d.AllocDir, path) | ||
|
@@ -158,7 +170,9 @@ func (d *AllocDir) Snapshot(w io.Writer) error { | |
return fmt.Errorf("error creating file header: %v", err) | ||
} | ||
hdr.Name = relPath | ||
tw.WriteHeader(hdr) | ||
if err := tw.WriteHeader(hdr); err != nil { | ||
return err | ||
} | ||
|
||
// If it's a directory or symlink we just write the header into the tar | ||
if fileInfo.IsDir() || (fileInfo.Mode()&os.ModeSymlink != 0) { | ||
|
@@ -182,7 +196,16 @@ func (d *AllocDir) Snapshot(w io.Writer) error { | |
// directories in the archive | ||
for _, path := range rootPaths { | ||
if err := filepath.Walk(path, walkFn); err != nil { | ||
return err | ||
allocID := filepath.Base(d.AllocDir) | ||
if writeErr := writeError(tw, allocID, err); writeErr != nil { | ||
// This could be bad; other side won't know | ||
// snapshotting failed. It could also just mean | ||
// the snapshotting side closed the connect | ||
// prematurely and won't try to use the tar | ||
// anyway. | ||
d.logger.Printf("[WARN] client: snapshotting failed and unable to write error marker: %v", writeErr) | ||
} | ||
return fmt.Errorf("failed to snapshot %s: %v", path, err) | ||
} | ||
} | ||
|
||
|
@@ -242,6 +265,8 @@ func (d *AllocDir) Destroy() error { | |
mErr.Errors = append(mErr.Errors, fmt.Errorf("failed to remove alloc dir %q: %v", d.AllocDir, err)) | ||
} | ||
|
||
// Unset built since the alloc dir has been destroyed. | ||
d.built = false | ||
return mErr.ErrorOrNil() | ||
} | ||
|
||
|
@@ -557,3 +582,31 @@ func splitPath(path string) ([]fileInfo, error) { | |
} | ||
return dirs, nil | ||
} | ||
|
||
// SnapshotErrorFilename returns the filename which will exist if there was an | ||
// error snapshotting a tar. | ||
func SnapshotErrorFilename(allocID string) string { | ||
return fmt.Sprintf("NOMAD-%s-ERROR.log", allocID) | ||
} | ||
|
||
// writeError writes a special file to a tar archive with the error encountered | ||
// during snapshotting. See Snapshot(). | ||
func writeError(tw *tar.Writer, allocID string, err error) error { | ||
contents := []byte(fmt.Sprintf("Error snapshotting: %v", err)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this need to be at the beginning of the function? Maybe put it closer to where it is actually used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to put its length into the Header, so it has to stay here. |
||
hdr := tar.Header{ | ||
Name: SnapshotErrorFilename(allocID), | ||
Mode: 0666, | ||
Size: int64(len(contents)), | ||
AccessTime: SnapshotErrorTime, | ||
ChangeTime: SnapshotErrorTime, | ||
ModTime: SnapshotErrorTime, | ||
Typeflag: tar.TypeReg, | ||
} | ||
|
||
if err := tw.WriteHeader(&hdr); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can tw ever be nil? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, it's always initialized in the single caller to writeError. |
||
return err | ||
} | ||
|
||
_, err = tw.Write(contents) | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can allocDir ever be nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, allocDir is initialized in NewAllocRunner and RestoreState includes (hopefully sufficient!) checks to prevent restoring a nil alloc dir. (Good question as I believe this did cause issues in the past!)