diff --git a/CHANGELOG.md b/CHANGELOG.md index bf4efeaef8a..49cf30441ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ IMPROVEMENTS: * api: Environment variables are ignored during service name validation [GH-3532] * cli: Allocation create and modify times are displayed in a human readable relative format like `6 h ago` [GH-3449] + * client: Sticky volume migrations are now atomic. [GH-3563] * client: Added metrics to track state transitions of allocations [GH-3061] * client: When `network_interface` is unspecified use interface attached to default route [GH-3546] diff --git a/client/alloc_runner.go b/client/alloc_runner.go index b45a553a617..1595050ac49 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -819,7 +819,7 @@ func (r *AllocRunner) Run() { r.allocDirLock.Unlock() if err != nil { - r.logger.Printf("[ERR] client: failed to build task directories: %v", err) + r.logger.Printf("[ERR] client: alloc %q failed to build task directories: %v", r.allocID, err) r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup)) return } @@ -841,6 +841,14 @@ func (r *AllocRunner) Run() { // Soft-fail on migration errors r.logger.Printf("[WARN] client: alloc %q error while migrating data from previous alloc: %v", r.allocID, err) + + // Recreate alloc dir to ensure a clean slate + r.allocDir.Destroy() + if err := r.allocDir.Build(); err != nil { + r.logger.Printf("[ERR] client: alloc %q failed to clean task directories after failed migration: %v", r.allocID, err) + r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to rebuild task dirs for '%s'", alloc.TaskGroup)) + return + } } // Check if the allocation is in a terminal status. In this case, we don't diff --git a/client/alloc_watcher.go b/client/alloc_watcher.go index 4cf8bab7268..03aa948f22b 100644 --- a/client/alloc_watcher.go +++ b/client/alloc_watcher.go @@ -463,6 +463,9 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser } } + // if we see this file, there was an error on the remote side + errorFilename := allocdir.SnapshotErrorFilename(p.prevAllocID) + buf := make([]byte, 1024) for !canceled() { // Get the next header @@ -478,6 +481,18 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser p.prevAllocID, p.allocID, err) } + if hdr.Name == errorFilename { + // Error snapshotting on the remote side, try to read + // the message out of the file and return it. + errBuf := make([]byte, int(hdr.Size)) + if _, err := tr.Read(errBuf); err != nil { + return fmt.Errorf("error streaming previous alloc %q for new alloc %q; failed reading error message: %v", + p.prevAllocID, p.allocID, err) + } + return fmt.Errorf("error streaming previous alloc %q for new alloc %q: %s", + p.prevAllocID, p.allocID, string(errBuf)) + } + // If the header is for a directory we create the directory if hdr.Typeflag == tar.TypeDir { os.MkdirAll(filepath.Join(dest, hdr.Name), os.FileMode(hdr.Mode)) diff --git a/client/alloc_watcher_test.go b/client/alloc_watcher_test.go index fb7e938191b..0ecb4686510 100644 --- a/client/alloc_watcher_test.go +++ b/client/alloc_watcher_test.go @@ -9,9 +9,11 @@ import ( "io/ioutil" "os" "path/filepath" + "strings" "testing" "time" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/nomad/mock" ) @@ -68,9 +70,9 @@ func TestPrevAlloc_LocalPrevAlloc(t *testing.T) { } } -// TestPrevAlloc_StreamAllocDir asserts that streaming a tar to an alloc dir +// TestPrevAlloc_StreamAllocDir_Ok asserts that streaming a tar to an alloc dir // works. -func TestPrevAlloc_StreamAllocDir(t *testing.T) { +func TestPrevAlloc_StreamAllocDir_Ok(t *testing.T) { t.Parallel() dir, err := ioutil.TempDir("", "") if err != nil { @@ -196,3 +198,78 @@ func TestPrevAlloc_StreamAllocDir(t *testing.T) { t.Fatalf("mode: %v", fi2.Mode()) } } + +// TestPrevAlloc_StreamAllocDir_Error asserts that errors encountered while +// streaming a tar cause the migration to be cancelled and no files are written +// (migrations are atomic). +func TestPrevAlloc_StreamAllocDir_Error(t *testing.T) { + t.Parallel() + dest, err := ioutil.TempDir("", "nomadtest-") + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dest) + + // This test only unit tests streamAllocDir so we only need a partially + // complete remotePrevAlloc + prevAlloc := &remotePrevAlloc{ + logger: testLogger(), + allocID: "123", + prevAllocID: "abc", + migrate: true, + } + + tarBuf := bytes.NewBuffer(nil) + tw := tar.NewWriter(tarBuf) + fooHdr := tar.Header{ + Name: "foo.txt", + Mode: 0666, + Size: 1, + ModTime: time.Now(), + Typeflag: tar.TypeReg, + } + err = tw.WriteHeader(&fooHdr) + if err != nil { + t.Fatalf("error writing file header: %v", err) + } + if _, err := tw.Write([]byte{'a'}); err != nil { + t.Fatalf("error writing file: %v", err) + } + + // Now write the error file + contents := []byte("SENTINEL ERROR") + err = tw.WriteHeader(&tar.Header{ + Name: allocdir.SnapshotErrorFilename(prevAlloc.prevAllocID), + Mode: 0666, + Size: int64(len(contents)), + AccessTime: allocdir.SnapshotErrorTime, + ChangeTime: allocdir.SnapshotErrorTime, + ModTime: allocdir.SnapshotErrorTime, + Typeflag: tar.TypeReg, + }) + if err != nil { + t.Fatalf("error writing sentinel file header: %v", err) + } + if _, err := tw.Write(contents); err != nil { + t.Fatalf("error writing sentinel file: %v", err) + } + + // Assert streamAllocDir fails + err = prevAlloc.streamAllocDir(context.Background(), ioutil.NopCloser(tarBuf), dest) + if err == nil { + t.Fatalf("expected an error from streamAllocDir") + } + if !strings.HasSuffix(err.Error(), string(contents)) { + t.Fatalf("expected error to end with %q but found: %v", string(contents), err) + } + + // streamAllocDir leaves cleanup to the caller on error, so assert + // "foo.txt" was written + fi, err := os.Stat(filepath.Join(dest, "foo.txt")) + if err != nil { + t.Fatalf("error reading foo.txt: %v", err) + } + if fi.Size() != fooHdr.Size { + t.Fatalf("expected foo.txt to be size 1 but found %d", fi.Size()) + } +} diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index 780e1b8ccc3..d34c8b8ea1c 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -24,6 +24,10 @@ const ( ) var ( + // SnapshotErrorTime is the sentinel time that will be used on the + // error file written by Snapshot when it encounters as error. + SnapshotErrorTime = time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC) + // The name of the directory that is shared across tasks in a task group. SharedAllocName = "alloc" @@ -128,6 +132,10 @@ func (d *AllocDir) NewTaskDir(name string) *TaskDir { // Snapshot creates an archive of the files and directories in the data dir of // the allocation and the task local directories +// +// Since a valid tar may have been written even when an error occurs, a special +// file "NOMAD-${ALLOC_ID}-ERROR.log" will be appended to the tar with the +// error message as the contents. func (d *AllocDir) Snapshot(w io.Writer) error { allocDataDir := filepath.Join(d.SharedDir, SharedDataDir) rootPaths := []string{allocDataDir} @@ -139,6 +147,10 @@ func (d *AllocDir) Snapshot(w io.Writer) error { defer tw.Close() walkFn := func(path string, fileInfo os.FileInfo, err error) error { + if err != nil { + return err + } + // Include the path of the file name relative to the alloc dir // so that we can put the files in the right directories relPath, err := filepath.Rel(d.AllocDir, path) @@ -158,7 +170,9 @@ func (d *AllocDir) Snapshot(w io.Writer) error { return fmt.Errorf("error creating file header: %v", err) } hdr.Name = relPath - tw.WriteHeader(hdr) + if err := tw.WriteHeader(hdr); err != nil { + return err + } // If it's a directory or symlink we just write the header into the tar if fileInfo.IsDir() || (fileInfo.Mode()&os.ModeSymlink != 0) { @@ -182,7 +196,16 @@ func (d *AllocDir) Snapshot(w io.Writer) error { // directories in the archive for _, path := range rootPaths { if err := filepath.Walk(path, walkFn); err != nil { - return err + allocID := filepath.Base(d.AllocDir) + if writeErr := writeError(tw, allocID, err); writeErr != nil { + // This could be bad; other side won't know + // snapshotting failed. It could also just mean + // the snapshotting side closed the connect + // prematurely and won't try to use the tar + // anyway. + d.logger.Printf("[WARN] client: snapshotting failed and unable to write error marker: %v", writeErr) + } + return fmt.Errorf("failed to snapshot %s: %v", path, err) } } @@ -242,6 +265,8 @@ func (d *AllocDir) Destroy() error { mErr.Errors = append(mErr.Errors, fmt.Errorf("failed to remove alloc dir %q: %v", d.AllocDir, err)) } + // Unset built since the alloc dir has been destroyed. + d.built = false return mErr.ErrorOrNil() } @@ -557,3 +582,31 @@ func splitPath(path string) ([]fileInfo, error) { } return dirs, nil } + +// SnapshotErrorFilename returns the filename which will exist if there was an +// error snapshotting a tar. +func SnapshotErrorFilename(allocID string) string { + return fmt.Sprintf("NOMAD-%s-ERROR.log", allocID) +} + +// writeError writes a special file to a tar archive with the error encountered +// during snapshotting. See Snapshot(). +func writeError(tw *tar.Writer, allocID string, err error) error { + contents := []byte(fmt.Sprintf("Error snapshotting: %v", err)) + hdr := tar.Header{ + Name: SnapshotErrorFilename(allocID), + Mode: 0666, + Size: int64(len(contents)), + AccessTime: SnapshotErrorTime, + ChangeTime: SnapshotErrorTime, + ModTime: SnapshotErrorTime, + Typeflag: tar.TypeReg, + } + + if err := tw.WriteHeader(&hdr); err != nil { + return err + } + + _, err = tw.Write(contents) + return err +} diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index e81c897c9f1..957c354d8f0 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -1,18 +1,24 @@ package agent import ( + "archive/tar" "fmt" + "io" + "io/ioutil" "net/http" "net/http/httptest" + "os" "reflect" "strings" "testing" "github.com/golang/snappy" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" ) @@ -379,6 +385,112 @@ func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) { }) } +// TestHTTP_AllocSnapshot_Atomic ensures that when a client encounters an error +// snapshotting a valid tar is not returned. +func TestHTTP_AllocSnapshot_Atomic(t *testing.T) { + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + // Create an alloc + state := s.server.State() + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + alloc.Job.TaskGroups[0].Tasks[0].Config["run_for"] = "30s" + alloc.NodeID = s.client.NodeID() + state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)) + if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc.Copy()}); err != nil { + t.Fatalf("error upserting alloc: %v", err) + } + + // Wait for the client to run it + testutil.WaitForResult(func() (bool, error) { + if _, err := s.client.GetClientAlloc(alloc.ID); err != nil { + return false, err + } + + serverAlloc, err := state.AllocByID(nil, alloc.ID) + if err != nil { + return false, err + } + + return serverAlloc.ClientStatus == structs.AllocClientStatusRunning, fmt.Errorf(serverAlloc.ClientStatus) + }, func(err error) { + t.Fatalf("client not running alloc: %v", err) + }) + + // Now write to its shared dir + allocDirI, err := s.client.GetAllocFS(alloc.ID) + if err != nil { + t.Fatalf("unable to find alloc dir: %v", err) + } + allocDir := allocDirI.(*allocdir.AllocDir) + + // Remove the task dir to break Snapshot + os.RemoveAll(allocDir.TaskDirs["web"].LocalDir) + + // Assert Snapshot fails + if err := allocDir.Snapshot(ioutil.Discard); err != nil { + s.logger.Printf("[DEBUG] agent.test: snapshot returned error: %v", err) + } else { + t.Errorf("expected Snapshot() to fail but it did not") + } + + // Make the HTTP request to ensure the Snapshot error is + // propagated through to the HTTP layer. Since the tar is + // streamed over a 200 HTTP response the only way to signal an + // error is by writing a marker file. + respW := httptest.NewRecorder() + req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/snapshot", alloc.ID), nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Make the request via the mux to make sure the error returned + // by Snapshot is properly propogated via HTTP + s.Server.mux.ServeHTTP(respW, req) + resp := respW.Result() + r := tar.NewReader(resp.Body) + errorFilename := allocdir.SnapshotErrorFilename(alloc.ID) + markerFound := false + markerContents := "" + for { + header, err := r.Next() + if err != nil { + if err != io.EOF { + // Huh, I wonder how a non-EOF error can happen? + t.Errorf("Unexpected error while streaming: %v", err) + } + break + } + + if markerFound { + // No more files should be found after the failure marker + t.Errorf("Next file found after error marker: %s", header.Name) + break + } + + if header.Name == errorFilename { + // Found it! + markerFound = true + buf := make([]byte, int(header.Size)) + if _, err := r.Read(buf); err != nil { + t.Errorf("Unexpected error reading error marker %s: %v", errorFilename, err) + } else { + markerContents = string(buf) + } + } + } + + if !markerFound { + t.Fatalf("marker file %s not written; bad tar will be treated as good!", errorFilename) + } + if markerContents == "" { + t.Fatalf("marker file %s empty", markerContents) + } else { + t.Logf("EXPECTED snapshot error: %s", markerContents) + } + }) +} + func TestHTTP_AllocGC(t *testing.T) { t.Parallel() httpTest(t, nil, func(s *TestAgent) { diff --git a/website/source/docs/job-specification/ephemeral_disk.html.md b/website/source/docs/job-specification/ephemeral_disk.html.md index 6f630c3b96c..c3e8fe2074c 100644 --- a/website/source/docs/job-specification/ephemeral_disk.html.md +++ b/website/source/docs/job-specification/ephemeral_disk.html.md @@ -41,7 +41,9 @@ job "docs" { - `migrate` `(bool: false)` - When `sticky` is true, this specifies that the Nomad client should make a best-effort attempt to migrate the data from a remote machine if placement cannot be made on the original node. During data - migration, the task will block starting until the data migration has completed. + migration, the task will block starting until the data migration has + completed. Migration is atomic and any partially migrated data will be + removed if an error is encountered. - `size` `(int: 300)` - Specifies the size of the ephemeral disk in MB. The current Nomad ephemeral storage implementation does not enforce this limit;