Skip to content

Commit

Permalink
Merge pull request #1404 from hashicorp/f-streaming
Browse files Browse the repository at this point in the history
Implement a streaming API and tail in the fs command
  • Loading branch information
dadgar authored Jul 12, 2016
2 parents 1d717ea + 19ca6e7 commit 887a2e9
Show file tree
Hide file tree
Showing 30 changed files with 4,342 additions and 22 deletions.
100 changes: 99 additions & 1 deletion api/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ import (
"time"
)

const (
// OriginStart and OriginEnd are the available parameters for the origin
// argument when streaming a file. They respectively offset from the start
// and end of a file.
OriginStart = "start"
OriginEnd = "end"
)

// AllocFileInfo holds information about a file inside the AllocDir
type AllocFileInfo struct {
Name string
Expand All @@ -20,6 +28,19 @@ type AllocFileInfo struct {
ModTime time.Time
}

// StreamFrame is used to frame data of a file when streaming
type StreamFrame struct {
Offset int64 `json:",omitempty"`
Data []byte `json:",omitempty"`
File string `json:",omitempty"`
FileEvent string `json:",omitempty"`
}

// IsHeartbeat returns if the frame is a heartbeat frame
func (s *StreamFrame) IsHeartbeat() bool {
return len(s.Data) == 0 && s.FileEvent == "" && s.File == "" && s.Offset == 0
}

// AllocFS is used to introspect an allocation directory on a Nomad client
type AllocFS struct {
client *Client
Expand Down Expand Up @@ -107,7 +128,7 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
}

// ReadAt is used to read bytes at a given offset until limit at the given path
// in an allocation directory
// in an allocation directory. If limit is <= 0, there is no limit.
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.Reader, *QueryMeta, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, &QueryOptions{})
if err != nil {
Expand Down Expand Up @@ -177,3 +198,80 @@ func (a *AllocFS) getErrorMsg(resp *http.Response) error {
return err
}
}

// Stream streams the content of a file blocking on EOF.
// The parameters are:
// * path: path to file to stream.
// * offset: The offset to start streaming data at.
// * origin: Either "start" or "end" and defines from where the offset is applied.
// * cancel: A channel which when closed will stop streaming.
//
// The return value is a channel that will emit StreamFrames as they are read.
func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) {

node, _, err := a.client.Nodes().Info(alloc.NodeID, q)
if err != nil {
return nil, nil, err
}

if node.HTTPAddr == "" {
return nil, nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", alloc.ID)
}
u := &url.URL{
Scheme: "http",
Host: node.HTTPAddr,
Path: fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID),
}
v := url.Values{}
v.Set("path", path)
v.Set("origin", origin)
v.Set("offset", strconv.FormatInt(offset, 10))
u.RawQuery = v.Encode()
req := &http.Request{
Method: "GET",
URL: u,
Cancel: cancel,
}
c := http.Client{}
resp, err := c.Do(req)
if err != nil {
return nil, nil, err
}

// Create the output channel
frames := make(chan *StreamFrame, 10)

go func() {
// Close the body
defer resp.Body.Close()

// Create a decoder
dec := json.NewDecoder(resp.Body)

for {
// Check if we have been cancelled
select {
case <-cancel:
return
default:
}

// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
close(frames)
return
}

// Discard heartbeat frames
if frame.IsHeartbeat() {
continue
}

frames <- &frame
}
}()

return frames, nil, nil
}
47 changes: 37 additions & 10 deletions client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ import (
"io/ioutil"
"os"
"path/filepath"
"runtime"
"time"

"gopkg.in/tomb.v1"

"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hpcloud/tail/watch"
)

var (
Expand Down Expand Up @@ -56,7 +60,9 @@ type AllocFileInfo struct {
type AllocDirFS interface {
List(path string) ([]*AllocFileInfo, error)
Stat(path string) (*AllocFileInfo, error)
ReadAt(path string, offset int64, limit int64) (io.ReadCloser, error)
ReadAt(path string, offset int64) (io.ReadCloser, error)
BlockUntilExists(path string, t *tomb.Tomb) error
ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error)
}

func NewAllocDir(allocDir string) *AllocDir {
Expand Down Expand Up @@ -322,9 +328,8 @@ func (d *AllocDir) Stat(path string) (*AllocFileInfo, error) {
}, nil
}

// ReadAt returns a reader for a file at the path relative to the alloc dir
// which will read a chunk of bytes at a particular offset
func (d *AllocDir) ReadAt(path string, offset int64, limit int64) (io.ReadCloser, error) {
// ReadAt returns a reader for a file at the path relative to the alloc dir
func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) {
p := filepath.Join(d.AllocDir, path)
f, err := os.Open(p)
if err != nil {
Expand All @@ -333,14 +338,36 @@ func (d *AllocDir) ReadAt(path string, offset int64, limit int64) (io.ReadCloser
if _, err := f.Seek(offset, 0); err != nil {
return nil, fmt.Errorf("can't seek to offset %q: %v", offset, err)
}
return &ReadCloserWrapper{Reader: io.LimitReader(f, limit), Closer: f}, nil
return f, nil
}

// BlockUntilExists blocks until the passed file relative the allocation
// directory exists. The block can be cancelled with the passed tomb.
func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) error {
// Get the path relative to the alloc directory
p := filepath.Join(d.AllocDir, path)
watcher := getFileWatcher(p)
return watcher.BlockUntilExists(t)
}

// ReadCloserWrapper wraps a LimitReader so that a file is closed once it has been
// read
type ReadCloserWrapper struct {
io.Reader
io.Closer
// ChangeEvents watches for changes to the passed path relative to the
// allocation directory. The offset should be the last read offset. The tomb is
// used to clean up the watch.
func (d *AllocDir) ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error) {
// Get the path relative to the alloc directory
p := filepath.Join(d.AllocDir, path)
watcher := getFileWatcher(p)
return watcher.ChangeEvents(t, curOffset)
}

// getFileWatcher returns a FileWatcher for the given path.
func getFileWatcher(path string) watch.FileWatcher {
if runtime.GOOS == "windows" {
// There are some deadlock issues with the inotify implementation on
// windows. Use polling watcher for now.
return watch.NewPollingFileWatcher(path)
}
return watch.NewInotifyFileWatcher(path)
}

func fileCopy(src, dst string, perm os.FileMode) error {
Expand Down
Loading

0 comments on commit 887a2e9

Please sign in to comment.