From e7191e27e773928c5cbd283adb81835060553332 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 16 Nov 2017 17:48:06 -0800 Subject: [PATCH] Check for error file when receiving snapshots --- client/alloc_watcher.go | 15 ++++++ client/alloc_watcher_test.go | 81 +++++++++++++++++++++++++++- client/allocdir/alloc_dir.go | 49 ++++++++++++++++- command/agent/alloc_endpoint_test.go | 43 ++++++++++++--- 4 files changed, 177 insertions(+), 11 deletions(-) diff --git a/client/alloc_watcher.go b/client/alloc_watcher.go index 4cf8bab7268..03aa948f22b 100644 --- a/client/alloc_watcher.go +++ b/client/alloc_watcher.go @@ -463,6 +463,9 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser } } + // if we see this file, there was an error on the remote side + errorFilename := allocdir.SnapshotErrorFilename(p.prevAllocID) + buf := make([]byte, 1024) for !canceled() { // Get the next header @@ -478,6 +481,18 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser p.prevAllocID, p.allocID, err) } + if hdr.Name == errorFilename { + // Error snapshotting on the remote side, try to read + // the message out of the file and return it. + errBuf := make([]byte, int(hdr.Size)) + if _, err := tr.Read(errBuf); err != nil { + return fmt.Errorf("error streaming previous alloc %q for new alloc %q; failed reading error message: %v", + p.prevAllocID, p.allocID, err) + } + return fmt.Errorf("error streaming previous alloc %q for new alloc %q: %s", + p.prevAllocID, p.allocID, string(errBuf)) + } + // If the header is for a directory we create the directory if hdr.Typeflag == tar.TypeDir { os.MkdirAll(filepath.Join(dest, hdr.Name), os.FileMode(hdr.Mode)) diff --git a/client/alloc_watcher_test.go b/client/alloc_watcher_test.go index fb7e938191b..0ecb4686510 100644 --- a/client/alloc_watcher_test.go +++ b/client/alloc_watcher_test.go @@ -9,9 +9,11 @@ import ( "io/ioutil" "os" "path/filepath" + "strings" "testing" "time" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/nomad/mock" ) @@ -68,9 +70,9 @@ func TestPrevAlloc_LocalPrevAlloc(t *testing.T) { } } -// TestPrevAlloc_StreamAllocDir asserts that streaming a tar to an alloc dir +// TestPrevAlloc_StreamAllocDir_Ok asserts that streaming a tar to an alloc dir // works. -func TestPrevAlloc_StreamAllocDir(t *testing.T) { +func TestPrevAlloc_StreamAllocDir_Ok(t *testing.T) { t.Parallel() dir, err := ioutil.TempDir("", "") if err != nil { @@ -196,3 +198,78 @@ func TestPrevAlloc_StreamAllocDir(t *testing.T) { t.Fatalf("mode: %v", fi2.Mode()) } } + +// TestPrevAlloc_StreamAllocDir_Error asserts that errors encountered while +// streaming a tar cause the migration to be cancelled and no files are written +// (migrations are atomic). +func TestPrevAlloc_StreamAllocDir_Error(t *testing.T) { + t.Parallel() + dest, err := ioutil.TempDir("", "nomadtest-") + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dest) + + // This test only unit tests streamAllocDir so we only need a partially + // complete remotePrevAlloc + prevAlloc := &remotePrevAlloc{ + logger: testLogger(), + allocID: "123", + prevAllocID: "abc", + migrate: true, + } + + tarBuf := bytes.NewBuffer(nil) + tw := tar.NewWriter(tarBuf) + fooHdr := tar.Header{ + Name: "foo.txt", + Mode: 0666, + Size: 1, + ModTime: time.Now(), + Typeflag: tar.TypeReg, + } + err = tw.WriteHeader(&fooHdr) + if err != nil { + t.Fatalf("error writing file header: %v", err) + } + if _, err := tw.Write([]byte{'a'}); err != nil { + t.Fatalf("error writing file: %v", err) + } + + // Now write the error file + contents := []byte("SENTINEL ERROR") + err = tw.WriteHeader(&tar.Header{ + Name: allocdir.SnapshotErrorFilename(prevAlloc.prevAllocID), + Mode: 0666, + Size: int64(len(contents)), + AccessTime: allocdir.SnapshotErrorTime, + ChangeTime: allocdir.SnapshotErrorTime, + ModTime: allocdir.SnapshotErrorTime, + Typeflag: tar.TypeReg, + }) + if err != nil { + t.Fatalf("error writing sentinel file header: %v", err) + } + if _, err := tw.Write(contents); err != nil { + t.Fatalf("error writing sentinel file: %v", err) + } + + // Assert streamAllocDir fails + err = prevAlloc.streamAllocDir(context.Background(), ioutil.NopCloser(tarBuf), dest) + if err == nil { + t.Fatalf("expected an error from streamAllocDir") + } + if !strings.HasSuffix(err.Error(), string(contents)) { + t.Fatalf("expected error to end with %q but found: %v", string(contents), err) + } + + // streamAllocDir leaves cleanup to the caller on error, so assert + // "foo.txt" was written + fi, err := os.Stat(filepath.Join(dest, "foo.txt")) + if err != nil { + t.Fatalf("error reading foo.txt: %v", err) + } + if fi.Size() != fooHdr.Size { + t.Fatalf("expected foo.txt to be size 1 but found %d", fi.Size()) + } +} diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index 16d1dec7d7c..8026b9e66b0 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -24,6 +24,10 @@ const ( ) var ( + // SnapshotErrorTime is the sentinel time that will be used on the + // error file written by Snapshot when it encounters as error. + SnapshotErrorTime = time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC) + // The name of the directory that is shared across tasks in a task group. SharedAllocName = "alloc" @@ -128,6 +132,10 @@ func (d *AllocDir) NewTaskDir(name string) *TaskDir { // Snapshot creates an archive of the files and directories in the data dir of // the allocation and the task local directories +// +// Since a valid tar may have been written even when an error occurs, a special +// file "NOMAD-${ALLOC_ID}-ERROR.log" will be appended to the tar with the +// error message as the contents. func (d *AllocDir) Snapshot(w io.Writer) error { allocDataDir := filepath.Join(d.SharedDir, SharedDataDir) rootPaths := []string{allocDataDir} @@ -162,7 +170,9 @@ func (d *AllocDir) Snapshot(w io.Writer) error { return fmt.Errorf("error creating file header: %v", err) } hdr.Name = relPath - tw.WriteHeader(hdr) + if err := tw.WriteHeader(hdr); err != nil { + return err + } // If it's a directory or symlink we just write the header into the tar if fileInfo.IsDir() || (fileInfo.Mode()&os.ModeSymlink != 0) { @@ -186,6 +196,15 @@ func (d *AllocDir) Snapshot(w io.Writer) error { // directories in the archive for _, path := range rootPaths { if err := filepath.Walk(path, walkFn); err != nil { + allocID := filepath.Base(d.AllocDir) + if writeErr := writeError(tw, allocID, err); writeErr != nil { + // This could be bad; other side won't know + // snapshotting failed. It could also just mean + // the snapshotting side closed the connect + // prematurely and won't try to use the tar + // anyway. + d.logger.Printf("[WARN] client: snapshotting failed and unable to write error marker: %v", writeErr) + } return fmt.Errorf("failed to snapshot %s: %v", path, err) } } @@ -562,3 +581,31 @@ func splitPath(path string) ([]fileInfo, error) { } return dirs, nil } + +// SnapshotErrorFilename returns the filename which will exist if there was an +// error snapshotting a tar. +func SnapshotErrorFilename(allocID string) string { + return fmt.Sprintf("NOMAD-%s-ERROR.log", allocID) +} + +// writeError writes a special file to a tar archive with the error encountered +// during snapshotting. See Snapshot(). +func writeError(tw *tar.Writer, allocID string, err error) error { + contents := []byte(fmt.Sprintf("Error snapshotting: %v", err)) + hdr := tar.Header{ + Name: SnapshotErrorFilename(allocID), + Mode: 0666, + Size: int64(len(contents)), + AccessTime: SnapshotErrorTime, + ChangeTime: SnapshotErrorTime, + ModTime: SnapshotErrorTime, + Typeflag: tar.TypeReg, + } + + if err := tw.WriteHeader(&hdr); err != nil { + return err + } + + _, err = tw.Write(contents) + return err +} diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index 79d5e2899ed..e4426530be2 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -408,8 +408,9 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) { } // Make the HTTP request to ensure the Snapshot error is - // propagated through to the HTTP layer and that a valid tar - // just isn't emitted. + // propagated through to the HTTP layer. Since the tar is + // streamed over a 200 HTTP response the only way to signal an + // error is by writing a marker file. respW := httptest.NewRecorder() req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/snapshot", alloc.ID), nil) if err != nil { @@ -420,19 +421,45 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) { // by Snapshot is properly propogated via HTTP s.Server.mux.ServeHTTP(respW, req) resp := respW.Result() - t.Logf("HTTP Response Status Code: %d", resp.StatusCode) r := tar.NewReader(resp.Body) + errorFilename := allocdir.SnapshotErrorFilename(alloc.ID) + markerFound := false + markerContents := "" for { header, err := r.Next() if err != nil { - if err == io.EOF { - t.Fatalf("Looks like a valid tar file to me?") + if err != io.EOF { + // Huh, I wonder how a non-EOF error can happen? + t.Errorf("Unexpected error while streaming: %v", err) } - t.Logf("Yay! An error: %v", err) - return + break } - t.Logf("Valid file returned: %s", header.Name) + if markerFound { + // No more files should be found after the failure marker + t.Errorf("Next file found after error marker: %s", header.Name) + break + } + + if header.Name == errorFilename { + // Found it! + markerFound = true + buf := make([]byte, int(header.Size)) + if _, err := r.Read(buf); err != nil { + t.Errorf("Unexpected error reading error marker %s: %v", errorFilename, err) + } else { + markerContents = string(buf) + } + } + } + + if !markerFound { + t.Fatalf("marker file %s not written; bad tar will be treated as good!", errorFilename) + } + if markerContents == "" { + t.Fatalf("marker file %s empty", markerContents) + } else { + t.Logf("EXPECTED snapshot error: %s", markerContents) } }) }