Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed permissions of migrated allocation directories #2061

Merged
merged 1 commit into from
Dec 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 39 additions & 15 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1578,6 +1578,18 @@ func (c *Client) migrateRemoteAllocDir(alloc *structs.Allocation, allocID string
return nil, fmt.Errorf("error getting snapshot for alloc %v: %v", alloc.ID, err)
}

if err := c.unarchiveAllocDir(resp, allocID, pathToAllocDir); err != nil {
return nil, err
}

// If there were no errors then we create the allocdir
prevAllocDir := allocdir.NewAllocDir(pathToAllocDir)
return prevAllocDir, nil
}

// unarchiveAllocDir reads the stream of a compressed allocation directory and
// writes them to the disk.
func (c *Client) unarchiveAllocDir(resp io.ReadCloser, allocID string, pathToAllocDir string) error {
tr := tar.NewReader(resp)
defer resp.Close()

Expand All @@ -1586,40 +1598,38 @@ func (c *Client) migrateRemoteAllocDir(alloc *structs.Allocation, allocID string
stopMigrating, ok := c.migratingAllocs[allocID]
if !ok {
os.RemoveAll(pathToAllocDir)
return nil, fmt.Errorf("couldn't find a migration validity notifier for alloc: %v", alloc.ID)
return fmt.Errorf("Allocation %q is not marked for remote migration: %v", allocID)
}
for {
// See if the alloc still needs migration
select {
case <-stopMigrating:
os.RemoveAll(pathToAllocDir)
c.logger.Printf("[INFO] client: stopping migration of allocdir for alloc: %v", alloc.ID)
return nil, nil
c.logger.Printf("[INFO] client: stopping migration of allocdir for alloc: %v", allocID)
return nil
case <-c.shutdownCh:
os.RemoveAll(pathToAllocDir)
c.logger.Printf("[INFO] client: stopping migration of alloc %q since client is shutting down", alloc.ID)
return nil, nil
c.logger.Printf("[INFO] client: stopping migration of alloc %q since client is shutting down", allocID)
return nil
default:
}

// Get the next header
hdr, err := tr.Next()

// If the snapshot has ended then we create the previous
// allocdir
// Snapshot has ended
if err == io.EOF {
prevAllocDir := allocdir.NewAllocDir(pathToAllocDir)
return prevAllocDir, nil
return nil
}
// If there is an error then we avoid creating the alloc dir
if err != nil {
os.RemoveAll(pathToAllocDir)
return nil, fmt.Errorf("error creating alloc dir for alloc %q: %v", alloc.ID, err)
return fmt.Errorf("error creating alloc dir for alloc %q: %v", allocID, err)
}

// If the header is for a directory we create the directory
if hdr.Typeflag == tar.TypeDir {
os.MkdirAll(filepath.Join(pathToAllocDir, hdr.Name), 0777)
os.MkdirAll(filepath.Join(pathToAllocDir, hdr.Name), os.FileMode(hdr.Mode))
continue
}
// If the header is a file, we write to a file
Expand All @@ -1630,33 +1640,47 @@ func (c *Client) migrateRemoteAllocDir(alloc *structs.Allocation, allocID string
continue
}

// Setting the permissions of the file as the origin.
if err := f.Chmod(os.FileMode(hdr.Mode)); err != nil {
f.Close()
c.logger.Printf("[ERR] client: error chmod-ing file %s: %v", f.Name(), err)
return fmt.Errorf("error chmoding file %v", err)
}
if err := f.Chown(hdr.Uid, hdr.Gid); err != nil {
f.Close()
c.logger.Printf("[ERR] client: error chown-ing file %s: %v", f.Name(), err)
return fmt.Errorf("error chowning file %v", err)
}

// We write in chunks of 32 bytes so that we can test if
// the client is still alive
for {
if c.shutdown {
f.Close()
os.RemoveAll(pathToAllocDir)
c.logger.Printf("[INFO] client: stopping migration of alloc %q because client is shutting down", alloc.ID)
return nil, nil
c.logger.Printf("[INFO] client: stopping migration of alloc %q because client is shutting down", allocID)
return nil
}

n, err := tr.Read(buf)
if err != nil {
f.Close()
if err != io.EOF {
return nil, fmt.Errorf("error reading snapshot: %v", err)
return fmt.Errorf("error reading snapshot: %v", err)
}
break
}
if _, err := f.Write(buf[:n]); err != nil {
f.Close()
os.RemoveAll(pathToAllocDir)
return nil, fmt.Errorf("error writing to file %q: %v", f.Name(), err)
return fmt.Errorf("error writing to file %q: %v", f.Name(), err)
}
}

}
}

return nil
}

// getNode gets the node from the server with the given Node ID
Expand Down
111 changes: 111 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package client

import (
"archive/tar"
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"net"
Expand Down Expand Up @@ -804,3 +807,111 @@ func TestClient_BlockedAllocations(t *testing.T) {
c1.allocLock.Unlock()

}

func TestClient_UnarchiveAllocDir(t *testing.T) {
dir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(dir)

if err := os.Mkdir(filepath.Join(dir, "foo"), 0777); err != nil {
t.Fatalf("err: %v", err)
}
dirInfo, err := os.Stat(filepath.Join(dir, "foo"))
if err != nil {
t.Fatalf("err: %v", err)
}
f, err := os.Create(filepath.Join(dir, "foo", "bar"))
if err != nil {
t.Fatalf("err: %v", err)
}
if _, err := f.WriteString("foo"); err != nil {
t.Fatalf("err: %v", err)
}
if err := f.Chmod(0644); err != nil {
t.Fatalf("err: %v", err)
}
fInfo, err := f.Stat()
if err != nil {
t.Fatalf("err: %v", err)
}
f.Close()

buf := new(bytes.Buffer)
tw := tar.NewWriter(buf)

walkFn := func(path string, fileInfo os.FileInfo, err error) error {
// Ignore if the file is a symlink
if fileInfo.Mode() == os.ModeSymlink {
return nil
}

// Include the path of the file name relative to the alloc dir
// so that we can put the files in the right directories
hdr, err := tar.FileInfoHeader(fileInfo, "")
if err != nil {
return fmt.Errorf("error creating file header: %v", err)
}
hdr.Name = fileInfo.Name()
tw.WriteHeader(hdr)

// If it's a directory we just write the header into the tar
if fileInfo.IsDir() {
return nil
}

// Write the file into the archive
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()

if _, err := io.Copy(tw, file); err != nil {
return err
}

return nil
}

if err := filepath.Walk(dir, walkFn); err != nil {
t.Fatalf("err: %v", err)
}
tw.Close()

dir1, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(dir1)

c1 := testClient(t, func(c *config.Config) {
c.RPCHandler = nil
})
defer c1.Shutdown()

rc := ioutil.NopCloser(buf)

c1.migratingAllocs["123"] = make(chan struct{})
if err := c1.unarchiveAllocDir(rc, "123", dir1); err != nil {
t.Fatalf("err: %v", err)
}

// Ensure foo is present
fi, err := os.Stat(filepath.Join(dir1, "foo"))
if err != nil {
t.Fatalf("err: %v", err)
}
if fi.Mode() != dirInfo.Mode() {
t.Fatalf("mode: %v", fi.Mode())
}

fi1, err := os.Stat(filepath.Join(dir1, "bar"))
if err != nil {
t.Fatalf("err: %v", err)
}
if fi1.Mode() != fInfo.Mode() {
t.Fatalf("mode: %v", fi1.Mode())
}
}