Skip to content

Commit

Permalink
Merge pull request #1444 from hashicorp/f-log-streaming
Browse files Browse the repository at this point in the history
Nomad logs command to stream task logs
  • Loading branch information
dadgar authored Jul 25, 2016
2 parents d50167c + 0b9449f commit 4a4efdf
Show file tree
Hide file tree
Showing 15 changed files with 1,604 additions and 116 deletions.
147 changes: 120 additions & 27 deletions api/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"net/url"
"strconv"
"sync"
"time"
)

Expand Down Expand Up @@ -204,6 +205,7 @@ func (a *AllocFS) getErrorMsg(resp *http.Response) error {
// * path: path to file to stream.
// * offset: The offset to start streaming data at.
// * origin: Either "start" or "end" and defines from where the offset is applied.
// * cancel: A channel that when closed, streaming will end.
//
// The return value is a channel that will emit StreamFrames as they are read.
func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
Expand Down Expand Up @@ -275,19 +277,101 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
return frames, nil, nil
}

// Logs streams the content of a tasks logs blocking on EOF.
// The parameters are:
// * allocation: the allocation to stream from.
// * follow: Whether the logs should be followed.
// * task: the tasks name to stream logs for.
// * logType: Either "stdout" or "stderr"
// * origin: Either "start" or "end" and defines from where the offset is applied.
// * offset: The offset to start streaming data at.
// * cancel: A channel that when closed, streaming will end.
//
// The return value is a channel that will emit StreamFrames as they are read.
func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string,
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) {

node, _, err := a.client.Nodes().Info(alloc.NodeID, q)
if err != nil {
return nil, nil, err
}

if node.HTTPAddr == "" {
return nil, nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", alloc.ID)
}
u := &url.URL{
Scheme: "http",
Host: node.HTTPAddr,
Path: fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID),
}
v := url.Values{}
v.Set("follow", strconv.FormatBool(follow))
v.Set("task", task)
v.Set("type", logType)
v.Set("origin", origin)
v.Set("offset", strconv.FormatInt(offset, 10))
u.RawQuery = v.Encode()
req := &http.Request{
Method: "GET",
URL: u,
Cancel: cancel,
}
c := http.Client{}
resp, err := c.Do(req)
if err != nil {
return nil, nil, err
}

// Create the output channel
frames := make(chan *StreamFrame, 10)

go func() {
// Close the body
defer resp.Body.Close()

// Create a decoder
dec := json.NewDecoder(resp.Body)

for {
// Check if we have been cancelled
select {
case <-cancel:
return
default:
}

// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
close(frames)
return
}

// Discard heartbeat frames
if frame.IsHeartbeat() {
continue
}

frames <- &frame
}
}()

return frames, nil, nil
}

// FrameReader is used to convert a stream of frames into a read closer.
type FrameReader struct {
frames <-chan *StreamFrame
cancelCh chan struct{}
closed bool

closedLock sync.Mutex
closed bool

unblockTime time.Duration

frame *StreamFrame
frameOffset int

// To handle printing the file events
fileEventOffset int
fileEvent []byte

byteOffset int
}

Expand All @@ -300,6 +384,12 @@ func NewFrameReader(frames <-chan *StreamFrame, cancelCh chan struct{}) *FrameRe
}
}

// SetUnblockTime sets the time to unblock and return zero bytes read. If the
// duration is unset or is zero or less, the read will block til data is read.
func (f *FrameReader) SetUnblockTime(d time.Duration) {
f.unblockTime = d
}

// Offset returns the offset into the stream.
func (f *FrameReader) Offset() int {
return f.byteOffset
Expand All @@ -308,32 +398,33 @@ func (f *FrameReader) Offset() int {
// Read reads the data of the incoming frames into the bytes buffer. Returns EOF
// when there are no more frames.
func (f *FrameReader) Read(p []byte) (n int, err error) {
if f.frame == nil {
frame, ok := <-f.frames
if !ok {
return 0, io.EOF
}
f.frame = frame

// Store the total offset into the file
f.byteOffset = int(f.frame.Offset)
f.closedLock.Lock()
closed := f.closed
f.closedLock.Unlock()
if closed {
return 0, io.EOF
}

if f.frame.FileEvent != "" && len(f.fileEvent) == 0 {
f.fileEvent = []byte(fmt.Sprintf("\nnomad: %q\n", f.frame.FileEvent))
f.fileEventOffset = 0
}
if f.frame == nil {
var unblock <-chan time.Time
if f.unblockTime.Nanoseconds() > 0 {
unblock = time.After(f.unblockTime)
}

// If there is a file event we inject it into the read stream
if l := len(f.fileEvent); l != 0 && l != f.fileEventOffset {
n = copy(p, f.fileEvent[f.fileEventOffset:])
f.fileEventOffset += n
return n, nil
}
select {
case frame, ok := <-f.frames:
if !ok {
return 0, io.EOF
}
f.frame = frame

if len(f.fileEvent) == f.fileEventOffset {
f.fileEvent = nil
f.fileEventOffset = 0
// Store the total offset into the file
f.byteOffset = int(f.frame.Offset)
case <-unblock:
return 0, nil
case <-f.cancelCh:
return 0, io.EOF
}
}

// Copy the data out of the frame and update our offset
Expand All @@ -351,6 +442,8 @@ func (f *FrameReader) Read(p []byte) (n int, err error) {

// Close cancels the stream of frames
func (f *FrameReader) Close() error {
f.closedLock.Lock()
defer f.closedLock.Unlock()
if f.closed {
return nil
}
Expand Down
37 changes: 37 additions & 0 deletions api/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"reflect"
"testing"
"time"
)

func TestFS_FrameReader(t *testing.T) {
Expand Down Expand Up @@ -73,3 +74,39 @@ func TestFS_FrameReader(t *testing.T) {
t.Fatalf("offset %d, wanted %d", r.Offset(), len(expected))
}
}

func TestFS_FrameReader_Unblock(t *testing.T) {
// Create a channel of the frames and a cancel channel
framesCh := make(chan *StreamFrame, 3)
cancelCh := make(chan struct{})

r := NewFrameReader(framesCh, cancelCh)
r.SetUnblockTime(10 * time.Millisecond)

// Read a little
p := make([]byte, 12)

n, err := r.Read(p)
if err != nil {
t.Fatalf("Read failed: %v", err)
}

if n != 0 {
t.Fatalf("should have unblocked")
}

// Unset the unblock
r.SetUnblockTime(0)

resultCh := make(chan struct{})
go func() {
r.Read(p)
close(resultCh)
}()

select {
case <-resultCh:
t.Fatalf("shouldn't have unblocked")
case <-time.After(300 * time.Millisecond):
}
}
11 changes: 8 additions & 3 deletions client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type AllocDirFS interface {
List(path string) ([]*AllocFileInfo, error)
Stat(path string) (*AllocFileInfo, error)
ReadAt(path string, offset int64) (io.ReadCloser, error)
BlockUntilExists(path string, t *tomb.Tomb) error
BlockUntilExists(path string, t *tomb.Tomb) chan error
ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error)
}

Expand Down Expand Up @@ -343,11 +343,16 @@ func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) {

// BlockUntilExists blocks until the passed file relative the allocation
// directory exists. The block can be cancelled with the passed tomb.
func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) error {
func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) chan error {
// Get the path relative to the alloc directory
p := filepath.Join(d.AllocDir, path)
watcher := getFileWatcher(p)
return watcher.BlockUntilExists(t)
returnCh := make(chan error, 1)
go func() {
returnCh <- watcher.BlockUntilExists(t)
close(returnCh)
}()
return returnCh
}

// ChangeEvents watches for changes to the passed path relative to the
Expand Down
Loading

0 comments on commit 4a4efdf

Please sign in to comment.