diff --git a/client/client.go b/client/client.go index 06506fcf176..e975ea8b88e 100644 --- a/client/client.go +++ b/client/client.go @@ -1578,6 +1578,18 @@ func (c *Client) migrateRemoteAllocDir(alloc *structs.Allocation, allocID string return nil, fmt.Errorf("error getting snapshot for alloc %v: %v", alloc.ID, err) } + if err := c.unarchiveAllocDir(resp, allocID, pathToAllocDir); err != nil { + return nil, err + } + + // If there were no errors then we create the allocdir + prevAllocDir := allocdir.NewAllocDir(pathToAllocDir) + return prevAllocDir, nil +} + +// unarchiveAllocDir reads the stream of a compressed allocation directory and +// writes them to the disk. +func (c *Client) unarchiveAllocDir(resp io.ReadCloser, allocID string, pathToAllocDir string) error { tr := tar.NewReader(resp) defer resp.Close() @@ -1586,40 +1598,38 @@ func (c *Client) migrateRemoteAllocDir(alloc *structs.Allocation, allocID string stopMigrating, ok := c.migratingAllocs[allocID] if !ok { os.RemoveAll(pathToAllocDir) - return nil, fmt.Errorf("couldn't find a migration validity notifier for alloc: %v", alloc.ID) + return fmt.Errorf("Allocation %q is not marked for remote migration: %v", allocID) } for { // See if the alloc still needs migration select { case <-stopMigrating: os.RemoveAll(pathToAllocDir) - c.logger.Printf("[INFO] client: stopping migration of allocdir for alloc: %v", alloc.ID) - return nil, nil + c.logger.Printf("[INFO] client: stopping migration of allocdir for alloc: %v", allocID) + return nil case <-c.shutdownCh: os.RemoveAll(pathToAllocDir) - c.logger.Printf("[INFO] client: stopping migration of alloc %q since client is shutting down", alloc.ID) - return nil, nil + c.logger.Printf("[INFO] client: stopping migration of alloc %q since client is shutting down", allocID) + return nil default: } // Get the next header hdr, err := tr.Next() - // If the snapshot has ended then we create the previous - // allocdir + // Snapshot has ended if err == io.EOF { - prevAllocDir := allocdir.NewAllocDir(pathToAllocDir) - return prevAllocDir, nil + return nil } // If there is an error then we avoid creating the alloc dir if err != nil { os.RemoveAll(pathToAllocDir) - return nil, fmt.Errorf("error creating alloc dir for alloc %q: %v", alloc.ID, err) + return fmt.Errorf("error creating alloc dir for alloc %q: %v", allocID, err) } // If the header is for a directory we create the directory if hdr.Typeflag == tar.TypeDir { - os.MkdirAll(filepath.Join(pathToAllocDir, hdr.Name), 0777) + os.MkdirAll(filepath.Join(pathToAllocDir, hdr.Name), os.FileMode(hdr.Mode)) continue } // If the header is a file, we write to a file @@ -1630,33 +1640,47 @@ func (c *Client) migrateRemoteAllocDir(alloc *structs.Allocation, allocID string continue } + // Setting the permissions of the file as the origin. + if err := f.Chmod(os.FileMode(hdr.Mode)); err != nil { + f.Close() + c.logger.Printf("[ERR] client: error chmod-ing file %s: %v", f.Name(), err) + return fmt.Errorf("error chmoding file %v", err) + } + if err := f.Chown(hdr.Uid, hdr.Gid); err != nil { + f.Close() + c.logger.Printf("[ERR] client: error chown-ing file %s: %v", f.Name(), err) + return fmt.Errorf("error chowning file %v", err) + } + // We write in chunks of 32 bytes so that we can test if // the client is still alive for { if c.shutdown { f.Close() os.RemoveAll(pathToAllocDir) - c.logger.Printf("[INFO] client: stopping migration of alloc %q because client is shutting down", alloc.ID) - return nil, nil + c.logger.Printf("[INFO] client: stopping migration of alloc %q because client is shutting down", allocID) + return nil } n, err := tr.Read(buf) if err != nil { f.Close() if err != io.EOF { - return nil, fmt.Errorf("error reading snapshot: %v", err) + return fmt.Errorf("error reading snapshot: %v", err) } break } if _, err := f.Write(buf[:n]); err != nil { f.Close() os.RemoveAll(pathToAllocDir) - return nil, fmt.Errorf("error writing to file %q: %v", f.Name(), err) + return fmt.Errorf("error writing to file %q: %v", f.Name(), err) } } } } + + return nil } // getNode gets the node from the server with the given Node ID diff --git a/client/client_test.go b/client/client_test.go index 549f77e7aea..74fb63842cf 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1,7 +1,10 @@ package client import ( + "archive/tar" + "bytes" "fmt" + "io" "io/ioutil" "log" "net" @@ -804,3 +807,111 @@ func TestClient_BlockedAllocations(t *testing.T) { c1.allocLock.Unlock() } + +func TestClient_UnarchiveAllocDir(t *testing.T) { + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dir) + + if err := os.Mkdir(filepath.Join(dir, "foo"), 0777); err != nil { + t.Fatalf("err: %v", err) + } + dirInfo, err := os.Stat(filepath.Join(dir, "foo")) + if err != nil { + t.Fatalf("err: %v", err) + } + f, err := os.Create(filepath.Join(dir, "foo", "bar")) + if err != nil { + t.Fatalf("err: %v", err) + } + if _, err := f.WriteString("foo"); err != nil { + t.Fatalf("err: %v", err) + } + if err := f.Chmod(0644); err != nil { + t.Fatalf("err: %v", err) + } + fInfo, err := f.Stat() + if err != nil { + t.Fatalf("err: %v", err) + } + f.Close() + + buf := new(bytes.Buffer) + tw := tar.NewWriter(buf) + + walkFn := func(path string, fileInfo os.FileInfo, err error) error { + // Ignore if the file is a symlink + if fileInfo.Mode() == os.ModeSymlink { + return nil + } + + // Include the path of the file name relative to the alloc dir + // so that we can put the files in the right directories + hdr, err := tar.FileInfoHeader(fileInfo, "") + if err != nil { + return fmt.Errorf("error creating file header: %v", err) + } + hdr.Name = fileInfo.Name() + tw.WriteHeader(hdr) + + // If it's a directory we just write the header into the tar + if fileInfo.IsDir() { + return nil + } + + // Write the file into the archive + file, err := os.Open(path) + if err != nil { + return err + } + defer file.Close() + + if _, err := io.Copy(tw, file); err != nil { + return err + } + + return nil + } + + if err := filepath.Walk(dir, walkFn); err != nil { + t.Fatalf("err: %v", err) + } + tw.Close() + + dir1, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dir1) + + c1 := testClient(t, func(c *config.Config) { + c.RPCHandler = nil + }) + defer c1.Shutdown() + + rc := ioutil.NopCloser(buf) + + c1.migratingAllocs["123"] = make(chan struct{}) + if err := c1.unarchiveAllocDir(rc, "123", dir1); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure foo is present + fi, err := os.Stat(filepath.Join(dir1, "foo")) + if err != nil { + t.Fatalf("err: %v", err) + } + if fi.Mode() != dirInfo.Mode() { + t.Fatalf("mode: %v", fi.Mode()) + } + + fi1, err := os.Stat(filepath.Join(dir1, "bar")) + if err != nil { + t.Fatalf("err: %v", err) + } + if fi1.Mode() != fInfo.Mode() { + t.Fatalf("mode: %v", fi1.Mode()) + } +}