From e2972ced4e4b51d1a8d63dd5dfceda56c34d15c3 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 16 Nov 2017 17:48:06 -0800 Subject: [PATCH] wip - still needs alloc watcher tests --- client/alloc_watcher.go | 15 +++++++++ client/allocdir/alloc_dir.go | 49 +++++++++++++++++++++++++++- command/agent/alloc_endpoint_test.go | 43 +++++++++++++++++++----- 3 files changed, 98 insertions(+), 9 deletions(-) diff --git a/client/alloc_watcher.go b/client/alloc_watcher.go index 4cf8bab7268..03aa948f22b 100644 --- a/client/alloc_watcher.go +++ b/client/alloc_watcher.go @@ -463,6 +463,9 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser } } + // if we see this file, there was an error on the remote side + errorFilename := allocdir.SnapshotErrorFilename(p.prevAllocID) + buf := make([]byte, 1024) for !canceled() { // Get the next header @@ -478,6 +481,18 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser p.prevAllocID, p.allocID, err) } + if hdr.Name == errorFilename { + // Error snapshotting on the remote side, try to read + // the message out of the file and return it. + errBuf := make([]byte, int(hdr.Size)) + if _, err := tr.Read(errBuf); err != nil { + return fmt.Errorf("error streaming previous alloc %q for new alloc %q; failed reading error message: %v", + p.prevAllocID, p.allocID, err) + } + return fmt.Errorf("error streaming previous alloc %q for new alloc %q: %s", + p.prevAllocID, p.allocID, string(errBuf)) + } + // If the header is for a directory we create the directory if hdr.Typeflag == tar.TypeDir { os.MkdirAll(filepath.Join(dest, hdr.Name), os.FileMode(hdr.Mode)) diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index 16d1dec7d7c..8026b9e66b0 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -24,6 +24,10 @@ const ( ) var ( + // SnapshotErrorTime is the sentinel time that will be used on the + // error file written by Snapshot when it encounters as error. + SnapshotErrorTime = time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC) + // The name of the directory that is shared across tasks in a task group. SharedAllocName = "alloc" @@ -128,6 +132,10 @@ func (d *AllocDir) NewTaskDir(name string) *TaskDir { // Snapshot creates an archive of the files and directories in the data dir of // the allocation and the task local directories +// +// Since a valid tar may have been written even when an error occurs, a special +// file "NOMAD-${ALLOC_ID}-ERROR.log" will be appended to the tar with the +// error message as the contents. func (d *AllocDir) Snapshot(w io.Writer) error { allocDataDir := filepath.Join(d.SharedDir, SharedDataDir) rootPaths := []string{allocDataDir} @@ -162,7 +170,9 @@ func (d *AllocDir) Snapshot(w io.Writer) error { return fmt.Errorf("error creating file header: %v", err) } hdr.Name = relPath - tw.WriteHeader(hdr) + if err := tw.WriteHeader(hdr); err != nil { + return err + } // If it's a directory or symlink we just write the header into the tar if fileInfo.IsDir() || (fileInfo.Mode()&os.ModeSymlink != 0) { @@ -186,6 +196,15 @@ func (d *AllocDir) Snapshot(w io.Writer) error { // directories in the archive for _, path := range rootPaths { if err := filepath.Walk(path, walkFn); err != nil { + allocID := filepath.Base(d.AllocDir) + if writeErr := writeError(tw, allocID, err); writeErr != nil { + // This could be bad; other side won't know + // snapshotting failed. It could also just mean + // the snapshotting side closed the connect + // prematurely and won't try to use the tar + // anyway. + d.logger.Printf("[WARN] client: snapshotting failed and unable to write error marker: %v", writeErr) + } return fmt.Errorf("failed to snapshot %s: %v", path, err) } } @@ -562,3 +581,31 @@ func splitPath(path string) ([]fileInfo, error) { } return dirs, nil } + +// SnapshotErrorFilename returns the filename which will exist if there was an +// error snapshotting a tar. +func SnapshotErrorFilename(allocID string) string { + return fmt.Sprintf("NOMAD-%s-ERROR.log", allocID) +} + +// writeError writes a special file to a tar archive with the error encountered +// during snapshotting. See Snapshot(). +func writeError(tw *tar.Writer, allocID string, err error) error { + contents := []byte(fmt.Sprintf("Error snapshotting: %v", err)) + hdr := tar.Header{ + Name: SnapshotErrorFilename(allocID), + Mode: 0666, + Size: int64(len(contents)), + AccessTime: SnapshotErrorTime, + ChangeTime: SnapshotErrorTime, + ModTime: SnapshotErrorTime, + Typeflag: tar.TypeReg, + } + + if err := tw.WriteHeader(&hdr); err != nil { + return err + } + + _, err = tw.Write(contents) + return err +} diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index 79d5e2899ed..e4426530be2 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -408,8 +408,9 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) { } // Make the HTTP request to ensure the Snapshot error is - // propagated through to the HTTP layer and that a valid tar - // just isn't emitted. + // propagated through to the HTTP layer. Since the tar is + // streamed over a 200 HTTP response the only way to signal an + // error is by writing a marker file. respW := httptest.NewRecorder() req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/snapshot", alloc.ID), nil) if err != nil { @@ -420,19 +421,45 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) { // by Snapshot is properly propogated via HTTP s.Server.mux.ServeHTTP(respW, req) resp := respW.Result() - t.Logf("HTTP Response Status Code: %d", resp.StatusCode) r := tar.NewReader(resp.Body) + errorFilename := allocdir.SnapshotErrorFilename(alloc.ID) + markerFound := false + markerContents := "" for { header, err := r.Next() if err != nil { - if err == io.EOF { - t.Fatalf("Looks like a valid tar file to me?") + if err != io.EOF { + // Huh, I wonder how a non-EOF error can happen? + t.Errorf("Unexpected error while streaming: %v", err) } - t.Logf("Yay! An error: %v", err) - return + break } - t.Logf("Valid file returned: %s", header.Name) + if markerFound { + // No more files should be found after the failure marker + t.Errorf("Next file found after error marker: %s", header.Name) + break + } + + if header.Name == errorFilename { + // Found it! + markerFound = true + buf := make([]byte, int(header.Size)) + if _, err := r.Read(buf); err != nil { + t.Errorf("Unexpected error reading error marker %s: %v", errorFilename, err) + } else { + markerContents = string(buf) + } + } + } + + if !markerFound { + t.Fatalf("marker file %s not written; bad tar will be treated as good!", errorFilename) + } + if markerContents == "" { + t.Fatalf("marker file %s empty", markerContents) + } else { + t.Logf("EXPECTED snapshot error: %s", markerContents) } }) }