From 4a493759884ee59b3f7caa595679114ebcbbf30c Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 6 Nov 2017 11:26:29 -0800 Subject: [PATCH 1/5] Handle errors during snapshotting If an alloc dir is being GC'd (removed) during snapshotting the walk func will be passed an error. Previously we didn't check for an error so a panic would occur when we'd try to use a nil `fileInfo`. --- client/allocdir/alloc_dir.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index 780e1b8ccc3..19c53cbcb87 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -139,6 +139,10 @@ func (d *AllocDir) Snapshot(w io.Writer) error { defer tw.Close() walkFn := func(path string, fileInfo os.FileInfo, err error) error { + if err != nil { + return err + } + // Include the path of the file name relative to the alloc dir // so that we can put the files in the right directories relPath, err := filepath.Rel(d.AllocDir, path) @@ -182,7 +186,7 @@ func (d *AllocDir) Snapshot(w io.Writer) error { // directories in the archive for _, path := range rootPaths { if err := filepath.Walk(path, walkFn); err != nil { - return err + return fmt.Errorf("failed to snapshot %s: %v", path, err) } } From a05862dbdf154c7d046d4462237ce647b2a66d26 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 15 Nov 2017 17:44:57 -0800 Subject: [PATCH 2/5] Destroy partially migrated alloc dirs Test that snapshot errors don't return a valid tar currently fails. --- client/alloc_runner.go | 10 +++- client/allocdir/alloc_dir.go | 1 + command/agent/alloc_endpoint_test.go | 85 ++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 1 deletion(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index b45a553a617..1595050ac49 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -819,7 +819,7 @@ func (r *AllocRunner) Run() { r.allocDirLock.Unlock() if err != nil { - r.logger.Printf("[ERR] client: failed to build task directories: %v", err) + r.logger.Printf("[ERR] client: alloc %q failed to build task directories: %v", r.allocID, err) r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup)) return } @@ -841,6 +841,14 @@ func (r *AllocRunner) Run() { // Soft-fail on migration errors r.logger.Printf("[WARN] client: alloc %q error while migrating data from previous alloc: %v", r.allocID, err) + + // Recreate alloc dir to ensure a clean slate + r.allocDir.Destroy() + if err := r.allocDir.Build(); err != nil { + r.logger.Printf("[ERR] client: alloc %q failed to clean task directories after failed migration: %v", r.allocID, err) + r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to rebuild task dirs for '%s'", alloc.TaskGroup)) + return + } } // Check if the allocation is in a terminal status. In this case, we don't diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index 19c53cbcb87..16d1dec7d7c 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -246,6 +246,7 @@ func (d *AllocDir) Destroy() error { mErr.Errors = append(mErr.Errors, fmt.Errorf("failed to remove alloc dir %q: %v", d.AllocDir, err)) } + d.built = false return mErr.ErrorOrNil() } diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index e81c897c9f1..9ef08d69af3 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -1,18 +1,24 @@ package agent import ( + "archive/tar" "fmt" + "io" + "io/ioutil" "net/http" "net/http/httptest" + "os" "reflect" "strings" "testing" "github.com/golang/snappy" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" ) @@ -379,6 +385,85 @@ func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) { }) } +// TestHTTP_AllocSnapshot_Atomic ensures that when a client encounters an error +// snapshotting a valid tar is not returned. +func TestHTTP_AllocSnapshot_Atomic(t *testing.T) { + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + // Create an alloc + state := s.server.State() + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + alloc.Job.TaskGroups[0].Tasks[0].Config["run_for"] = "30s" + alloc.NodeID = s.client.NodeID() + state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)) + if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc.Copy()}); err != nil { + t.Fatalf("error upserting alloc: %v", err) + } + + // Wait for the client to run it + testutil.WaitForResult(func() (bool, error) { + if _, err := s.client.GetClientAlloc(alloc.ID); err != nil { + return false, err + } + + serverAlloc, err := state.AllocByID(nil, alloc.ID) + if err != nil { + return false, err + } + + return serverAlloc.ClientStatus == structs.AllocClientStatusRunning, fmt.Errorf(serverAlloc.ClientStatus) + }, func(err error) { + t.Fatalf("client not running alloc: %v", err) + }) + + // Now write to its shared dir + allocDirI, err := s.client.GetAllocFS(alloc.ID) + if err != nil { + t.Fatalf("unable to find alloc dir: %v", err) + } + allocDir := allocDirI.(*allocdir.AllocDir) + + // Remove the task dir to break Snapshot + os.RemoveAll(allocDir.TaskDirs["web"].LocalDir) + + // Assert Snapshot fails + if err := allocDir.Snapshot(ioutil.Discard); err == nil { + t.Errorf("expected Snapshot() to fail but it did not") + } else { + s.logger.Printf("[DEBUG] agent.test: snapshot returned error: %v", err) + } + + // Make the HTTP request to ensure the Snapshot error is + // propagated through to the HTTP layer and that a valid tar + // just isn't emitted. + respW := httptest.NewRecorder() + req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/snapshot", alloc.ID), nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Make the request via the mux to make sure the error returned + // by Snapshot is properly propogated via HTTP + s.Server.mux.ServeHTTP(respW, req) + resp := respW.Result() + t.Logf("HTTP Response Status Code: %d", resp.StatusCode) + r := tar.NewReader(resp.Body) + for { + header, err := r.Next() + if err != nil { + if err == io.EOF { + t.Fatalf("Looks like a valid tar file to me?") + } + t.Logf("Yay! An error: %v", err) + return + } + + t.Logf("Valid file returned: %s", header.Name) + } + }) +} + func TestHTTP_AllocGC(t *testing.T) { t.Parallel() httpTest(t, nil, func(s *TestAgent) { From e3256ec4ee077a953e88bc94093ce818e0cb36db Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 16 Nov 2017 17:48:06 -0800 Subject: [PATCH 3/5] Check for error file when receiving snapshots --- client/alloc_watcher.go | 15 ++++++ client/alloc_watcher_test.go | 81 +++++++++++++++++++++++++++- client/allocdir/alloc_dir.go | 49 ++++++++++++++++- command/agent/alloc_endpoint_test.go | 43 ++++++++++++--- 4 files changed, 177 insertions(+), 11 deletions(-) diff --git a/client/alloc_watcher.go b/client/alloc_watcher.go index 4cf8bab7268..03aa948f22b 100644 --- a/client/alloc_watcher.go +++ b/client/alloc_watcher.go @@ -463,6 +463,9 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser } } + // if we see this file, there was an error on the remote side + errorFilename := allocdir.SnapshotErrorFilename(p.prevAllocID) + buf := make([]byte, 1024) for !canceled() { // Get the next header @@ -478,6 +481,18 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser p.prevAllocID, p.allocID, err) } + if hdr.Name == errorFilename { + // Error snapshotting on the remote side, try to read + // the message out of the file and return it. + errBuf := make([]byte, int(hdr.Size)) + if _, err := tr.Read(errBuf); err != nil { + return fmt.Errorf("error streaming previous alloc %q for new alloc %q; failed reading error message: %v", + p.prevAllocID, p.allocID, err) + } + return fmt.Errorf("error streaming previous alloc %q for new alloc %q: %s", + p.prevAllocID, p.allocID, string(errBuf)) + } + // If the header is for a directory we create the directory if hdr.Typeflag == tar.TypeDir { os.MkdirAll(filepath.Join(dest, hdr.Name), os.FileMode(hdr.Mode)) diff --git a/client/alloc_watcher_test.go b/client/alloc_watcher_test.go index fb7e938191b..0ecb4686510 100644 --- a/client/alloc_watcher_test.go +++ b/client/alloc_watcher_test.go @@ -9,9 +9,11 @@ import ( "io/ioutil" "os" "path/filepath" + "strings" "testing" "time" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/nomad/mock" ) @@ -68,9 +70,9 @@ func TestPrevAlloc_LocalPrevAlloc(t *testing.T) { } } -// TestPrevAlloc_StreamAllocDir asserts that streaming a tar to an alloc dir +// TestPrevAlloc_StreamAllocDir_Ok asserts that streaming a tar to an alloc dir // works. -func TestPrevAlloc_StreamAllocDir(t *testing.T) { +func TestPrevAlloc_StreamAllocDir_Ok(t *testing.T) { t.Parallel() dir, err := ioutil.TempDir("", "") if err != nil { @@ -196,3 +198,78 @@ func TestPrevAlloc_StreamAllocDir(t *testing.T) { t.Fatalf("mode: %v", fi2.Mode()) } } + +// TestPrevAlloc_StreamAllocDir_Error asserts that errors encountered while +// streaming a tar cause the migration to be cancelled and no files are written +// (migrations are atomic). +func TestPrevAlloc_StreamAllocDir_Error(t *testing.T) { + t.Parallel() + dest, err := ioutil.TempDir("", "nomadtest-") + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dest) + + // This test only unit tests streamAllocDir so we only need a partially + // complete remotePrevAlloc + prevAlloc := &remotePrevAlloc{ + logger: testLogger(), + allocID: "123", + prevAllocID: "abc", + migrate: true, + } + + tarBuf := bytes.NewBuffer(nil) + tw := tar.NewWriter(tarBuf) + fooHdr := tar.Header{ + Name: "foo.txt", + Mode: 0666, + Size: 1, + ModTime: time.Now(), + Typeflag: tar.TypeReg, + } + err = tw.WriteHeader(&fooHdr) + if err != nil { + t.Fatalf("error writing file header: %v", err) + } + if _, err := tw.Write([]byte{'a'}); err != nil { + t.Fatalf("error writing file: %v", err) + } + + // Now write the error file + contents := []byte("SENTINEL ERROR") + err = tw.WriteHeader(&tar.Header{ + Name: allocdir.SnapshotErrorFilename(prevAlloc.prevAllocID), + Mode: 0666, + Size: int64(len(contents)), + AccessTime: allocdir.SnapshotErrorTime, + ChangeTime: allocdir.SnapshotErrorTime, + ModTime: allocdir.SnapshotErrorTime, + Typeflag: tar.TypeReg, + }) + if err != nil { + t.Fatalf("error writing sentinel file header: %v", err) + } + if _, err := tw.Write(contents); err != nil { + t.Fatalf("error writing sentinel file: %v", err) + } + + // Assert streamAllocDir fails + err = prevAlloc.streamAllocDir(context.Background(), ioutil.NopCloser(tarBuf), dest) + if err == nil { + t.Fatalf("expected an error from streamAllocDir") + } + if !strings.HasSuffix(err.Error(), string(contents)) { + t.Fatalf("expected error to end with %q but found: %v", string(contents), err) + } + + // streamAllocDir leaves cleanup to the caller on error, so assert + // "foo.txt" was written + fi, err := os.Stat(filepath.Join(dest, "foo.txt")) + if err != nil { + t.Fatalf("error reading foo.txt: %v", err) + } + if fi.Size() != fooHdr.Size { + t.Fatalf("expected foo.txt to be size 1 but found %d", fi.Size()) + } +} diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index 16d1dec7d7c..8026b9e66b0 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -24,6 +24,10 @@ const ( ) var ( + // SnapshotErrorTime is the sentinel time that will be used on the + // error file written by Snapshot when it encounters as error. + SnapshotErrorTime = time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC) + // The name of the directory that is shared across tasks in a task group. SharedAllocName = "alloc" @@ -128,6 +132,10 @@ func (d *AllocDir) NewTaskDir(name string) *TaskDir { // Snapshot creates an archive of the files and directories in the data dir of // the allocation and the task local directories +// +// Since a valid tar may have been written even when an error occurs, a special +// file "NOMAD-${ALLOC_ID}-ERROR.log" will be appended to the tar with the +// error message as the contents. func (d *AllocDir) Snapshot(w io.Writer) error { allocDataDir := filepath.Join(d.SharedDir, SharedDataDir) rootPaths := []string{allocDataDir} @@ -162,7 +170,9 @@ func (d *AllocDir) Snapshot(w io.Writer) error { return fmt.Errorf("error creating file header: %v", err) } hdr.Name = relPath - tw.WriteHeader(hdr) + if err := tw.WriteHeader(hdr); err != nil { + return err + } // If it's a directory or symlink we just write the header into the tar if fileInfo.IsDir() || (fileInfo.Mode()&os.ModeSymlink != 0) { @@ -186,6 +196,15 @@ func (d *AllocDir) Snapshot(w io.Writer) error { // directories in the archive for _, path := range rootPaths { if err := filepath.Walk(path, walkFn); err != nil { + allocID := filepath.Base(d.AllocDir) + if writeErr := writeError(tw, allocID, err); writeErr != nil { + // This could be bad; other side won't know + // snapshotting failed. It could also just mean + // the snapshotting side closed the connect + // prematurely and won't try to use the tar + // anyway. + d.logger.Printf("[WARN] client: snapshotting failed and unable to write error marker: %v", writeErr) + } return fmt.Errorf("failed to snapshot %s: %v", path, err) } } @@ -562,3 +581,31 @@ func splitPath(path string) ([]fileInfo, error) { } return dirs, nil } + +// SnapshotErrorFilename returns the filename which will exist if there was an +// error snapshotting a tar. +func SnapshotErrorFilename(allocID string) string { + return fmt.Sprintf("NOMAD-%s-ERROR.log", allocID) +} + +// writeError writes a special file to a tar archive with the error encountered +// during snapshotting. See Snapshot(). +func writeError(tw *tar.Writer, allocID string, err error) error { + contents := []byte(fmt.Sprintf("Error snapshotting: %v", err)) + hdr := tar.Header{ + Name: SnapshotErrorFilename(allocID), + Mode: 0666, + Size: int64(len(contents)), + AccessTime: SnapshotErrorTime, + ChangeTime: SnapshotErrorTime, + ModTime: SnapshotErrorTime, + Typeflag: tar.TypeReg, + } + + if err := tw.WriteHeader(&hdr); err != nil { + return err + } + + _, err = tw.Write(contents) + return err +} diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index 9ef08d69af3..e1ead2ed719 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -435,8 +435,9 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) { } // Make the HTTP request to ensure the Snapshot error is - // propagated through to the HTTP layer and that a valid tar - // just isn't emitted. + // propagated through to the HTTP layer. Since the tar is + // streamed over a 200 HTTP response the only way to signal an + // error is by writing a marker file. respW := httptest.NewRecorder() req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/snapshot", alloc.ID), nil) if err != nil { @@ -447,19 +448,45 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) { // by Snapshot is properly propogated via HTTP s.Server.mux.ServeHTTP(respW, req) resp := respW.Result() - t.Logf("HTTP Response Status Code: %d", resp.StatusCode) r := tar.NewReader(resp.Body) + errorFilename := allocdir.SnapshotErrorFilename(alloc.ID) + markerFound := false + markerContents := "" for { header, err := r.Next() if err != nil { - if err == io.EOF { - t.Fatalf("Looks like a valid tar file to me?") + if err != io.EOF { + // Huh, I wonder how a non-EOF error can happen? + t.Errorf("Unexpected error while streaming: %v", err) } - t.Logf("Yay! An error: %v", err) - return + break } - t.Logf("Valid file returned: %s", header.Name) + if markerFound { + // No more files should be found after the failure marker + t.Errorf("Next file found after error marker: %s", header.Name) + break + } + + if header.Name == errorFilename { + // Found it! + markerFound = true + buf := make([]byte, int(header.Size)) + if _, err := r.Read(buf); err != nil { + t.Errorf("Unexpected error reading error marker %s: %v", errorFilename, err) + } else { + markerContents = string(buf) + } + } + } + + if !markerFound { + t.Fatalf("marker file %s not written; bad tar will be treated as good!", errorFilename) + } + if markerContents == "" { + t.Fatalf("marker file %s empty", markerContents) + } else { + t.Logf("EXPECTED snapshot error: %s", markerContents) } }) } From 50b335f3f025c2679616045b14c13f7c7033143a Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 29 Nov 2017 17:24:58 -0800 Subject: [PATCH 4/5] Add comment and normalize err check ordering as per PR comments --- client/allocdir/alloc_dir.go | 1 + command/agent/alloc_endpoint_test.go | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index 8026b9e66b0..d34c8b8ea1c 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -265,6 +265,7 @@ func (d *AllocDir) Destroy() error { mErr.Errors = append(mErr.Errors, fmt.Errorf("failed to remove alloc dir %q: %v", d.AllocDir, err)) } + // Unset built since the alloc dir has been destroyed. d.built = false return mErr.ErrorOrNil() } diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index e1ead2ed719..957c354d8f0 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -428,10 +428,10 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) { os.RemoveAll(allocDir.TaskDirs["web"].LocalDir) // Assert Snapshot fails - if err := allocDir.Snapshot(ioutil.Discard); err == nil { - t.Errorf("expected Snapshot() to fail but it did not") - } else { + if err := allocDir.Snapshot(ioutil.Discard); err != nil { s.logger.Printf("[DEBUG] agent.test: snapshot returned error: %v", err) + } else { + t.Errorf("expected Snapshot() to fail but it did not") } // Make the HTTP request to ensure the Snapshot error is From 1ffb189ed2a1c7bf82c2890048cd1759562b62b6 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 29 Nov 2017 17:25:24 -0800 Subject: [PATCH 5/5] Add atomic migrations to changelog and docs --- CHANGELOG.md | 1 + website/source/docs/job-specification/ephemeral_disk.html.md | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c06034d4d1..ce8c49d28c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ IMPROVEMENTS: * api: Environment variables are ignored during service name validation [GH-3532] * cli: Allocation create and modify times are displayed in a human readable relative format like `6 h ago` [GH-3449] + * client: Sticky volume migrations are now atomic. [GH-3563] * client: Added metrics to track state transitions of allocations [GH-3061] * client: When `network_interface` is unspecified use interface attached to default route [GH-3546] diff --git a/website/source/docs/job-specification/ephemeral_disk.html.md b/website/source/docs/job-specification/ephemeral_disk.html.md index 6f630c3b96c..c3e8fe2074c 100644 --- a/website/source/docs/job-specification/ephemeral_disk.html.md +++ b/website/source/docs/job-specification/ephemeral_disk.html.md @@ -41,7 +41,9 @@ job "docs" { - `migrate` `(bool: false)` - When `sticky` is true, this specifies that the Nomad client should make a best-effort attempt to migrate the data from a remote machine if placement cannot be made on the original node. During data - migration, the task will block starting until the data migration has completed. + migration, the task will block starting until the data migration has + completed. Migration is atomic and any partially migrated data will be + removed if an error is encountered. - `size` `(int: 300)` - Specifies the size of the ephemeral disk in MB. The current Nomad ephemeral storage implementation does not enforce this limit;