Skip to content

Commit

Permalink
Merge pull request #2235 from hashicorp/f-http-logs
Browse files Browse the repository at this point in the history
Logs via a browser
  • Loading branch information
dadgar authored Jan 24, 2017
2 parents 0b46f7c + 6082b46 commit 0be1b32
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 20 deletions.
37 changes: 28 additions & 9 deletions command/agent/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ func (s *StreamFrame) IsCleared() bool {

// StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct {
// plainTxt determines whether we frame or just send plain text data.
plainTxt bool

out io.WriteCloser
enc *codec.Encoder
encLock sync.Mutex
Expand All @@ -281,8 +284,11 @@ type StreamFramer struct {
}

// NewStreamFramer creates a new stream framer that will output StreamFrames to
// the passed output.
func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer {
// the passed output. If plainTxt is set we do not frame and just batch plain
// text data.
func NewStreamFramer(out io.WriteCloser, plainTxt bool,
heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer {

// Create a JSON encoder
enc := codec.NewEncoder(out, jsonHandle)

Expand All @@ -291,6 +297,7 @@ func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duratio
flusher := time.NewTicker(batchWindow)

return &StreamFramer{
plainTxt: plainTxt,
out: out,
enc: enc,
frameSize: frameSize,
Expand Down Expand Up @@ -390,6 +397,10 @@ OUTER:
func (s *StreamFramer) send(f *StreamFrame) error {
s.encLock.Lock()
defer s.encLock.Unlock()
if s.plainTxt {
_, err := io.Copy(s.out, bytes.NewReader(f.Data))
return err
}
return s.enc.Encode(f)
}

Expand Down Expand Up @@ -549,7 +560,7 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf
output := ioutils.NewWriteFlusher(resp)

// Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := NewStreamFramer(output, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()

Expand Down Expand Up @@ -697,7 +708,7 @@ OUTER:
// applied. Defaults to "start".
func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, task, logType string
var follow bool
var plain, follow bool
var err error

q := req.URL.Query()
Expand All @@ -710,8 +721,16 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac
return nil, taskNotPresentErr
}

if follow, err = strconv.ParseBool(q.Get("follow")); err != nil {
return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err)
if followStr := q.Get("follow"); followStr != "" {
if follow, err = strconv.ParseBool(followStr); err != nil {
return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err)
}
}

if plainStr := q.Get("plain"); plainStr != "" {
if plain, err = strconv.ParseBool(plainStr); err != nil {
return nil, fmt.Errorf("Failed to parse plain field to boolean: %v", err)
}
}

logType = q.Get("type")
Expand Down Expand Up @@ -747,15 +766,15 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac
// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)

return nil, s.logs(follow, offset, origin, task, logType, fs, output)
return nil, s.logs(follow, plain, offset, origin, task, logType, fs, output)
}

func (s *HTTPServer) logs(follow bool, offset int64,
func (s *HTTPServer) logs(follow, plain bool, offset int64,
origin, task, logType string,
fs allocdir.AllocDirFS, output io.WriteCloser) error {

// Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := NewStreamFramer(output, plain, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()

Expand Down
119 changes: 108 additions & 11 deletions command/agent/fs_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"reflect"
"runtime"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -118,7 +119,7 @@ func TestStreamFramer_Flush(t *testing.T) {
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
sf := NewStreamFramer(wrappedW, hRate, bWindow, 100)
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100)
sf.Run()

// Create a decoder
Expand Down Expand Up @@ -186,7 +187,7 @@ func TestStreamFramer_Batch(t *testing.T) {
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate, bWindow := 100*time.Millisecond, 500*time.Millisecond
sf := NewStreamFramer(wrappedW, hRate, bWindow, 3)
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 3)
sf.Run()

// Create a decoder
Expand Down Expand Up @@ -263,7 +264,7 @@ func TestStreamFramer_Heartbeat(t *testing.T) {
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
sf := NewStreamFramer(wrappedW, hRate, bWindow, 100)
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100)
sf.Run()

// Create a decoder
Expand Down Expand Up @@ -315,7 +316,7 @@ func TestStreamFramer_Order(t *testing.T) {
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond
sf := NewStreamFramer(wrappedW, hRate, bWindow, 10)
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 10)
sf.Run()

// Create a decoder
Expand Down Expand Up @@ -401,6 +402,102 @@ func TestStreamFramer_Order(t *testing.T) {
}
}

// This test checks that frames are received in order
func TestStreamFramer_Order_PlainText(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond
sf := NewStreamFramer(wrappedW, true, hRate, bWindow, 10)
sf.Run()

files := []string{"1", "2", "3", "4", "5"}
input := bytes.NewBuffer(make([]byte, 0, 100000))
for i := 0; i <= 1000; i++ {
str := strconv.Itoa(i) + ","
input.WriteString(str)
}

expected := bytes.NewBuffer(make([]byte, 0, 100000))
for _, _ = range files {
expected.Write(input.Bytes())
}
receivedBuf := bytes.NewBuffer(make([]byte, 0, 100000))

// Start the reader
resultCh := make(chan struct{})
go func() {
OUTER:
for {
if _, err := receivedBuf.ReadFrom(r); err != nil {
if strings.Contains(err.Error(), "closed pipe") {
resultCh <- struct{}{}
return
}
t.Fatalf("bad read: %v", err)
}

if expected.Len() != receivedBuf.Len() {
continue
}
expectedBytes := expected.Bytes()
actualBytes := receivedBuf.Bytes()
for i, e := range expectedBytes {
if a := actualBytes[i]; a != e {
continue OUTER
}
}
resultCh <- struct{}{}
return

}
}()

// Send the data
b := input.Bytes()
shards := 10
each := len(b) / shards
for _, f := range files {
for i := 0; i < shards; i++ {
l, r := each*i, each*(i+1)
if i == shards-1 {
r = len(b)
}

if err := sf.Send(f, "", b[l:r], 0); err != nil {
t.Fatalf("Send() failed %v", err)
}
}
}

// Ensure we get data
select {
case <-resultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow):
if expected.Len() != receivedBuf.Len() {
t.Fatalf("Got %v; want %v", expected.Len(), receivedBuf.Len())
}
expectedBytes := expected.Bytes()
actualBytes := receivedBuf.Bytes()
for i, e := range expectedBytes {
if a := actualBytes[i]; a != e {
t.Fatalf("Index %d; Got %q; want %q", i, a, e)
}
}
}

// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}

sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}

func TestHTTP_Stream_MissingParams(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil)
Expand Down Expand Up @@ -467,7 +564,7 @@ func TestHTTP_Stream_NoFile(t *testing.T) {
ad := tempAllocDir(t)
defer os.RemoveAll(ad.AllocDir)

framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()

Expand Down Expand Up @@ -526,7 +623,7 @@ func TestHTTP_Stream_Modify(t *testing.T) {
t.Fatalf("write failed: %v", err)
}

framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()

Expand Down Expand Up @@ -607,7 +704,7 @@ func TestHTTP_Stream_Truncate(t *testing.T) {
t.Fatalf("write failed: %v", err)
}

framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()

Expand Down Expand Up @@ -710,7 +807,7 @@ func TestHTTP_Stream_Delete(t *testing.T) {
t.Fatalf("write failed: %v", err)
}

framer := NewStreamFramer(wrappedW, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := NewStreamFramer(wrappedW, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()

// Start streaming
Expand Down Expand Up @@ -804,7 +901,7 @@ func TestHTTP_Logs_NoFollow(t *testing.T) {

// Start streaming logs
go func() {
if err := s.Server.logs(false, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
if err := s.Server.logs(false, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
t.Fatalf("logs() failed: %v", err)
}
}()
Expand Down Expand Up @@ -893,7 +990,7 @@ func TestHTTP_Logs_Follow(t *testing.T) {

// Start streaming logs
go func() {
if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
if err := s.Server.logs(true, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
t.Fatalf("logs() failed: %v", err)
}
}()
Expand Down Expand Up @@ -1006,7 +1103,7 @@ func BenchmarkHTTP_Logs_Follow(t *testing.B) {

// Start streaming logs
go func() {
if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
if err := s.Server.logs(true, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
t.Fatalf("logs() failed: %v", err)
}
}()
Expand Down
5 changes: 5 additions & 0 deletions website/source/docs/http/client-fs.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ allocation was placed.
Origin can be either "start" or "end" and applies the offset relative to
either the start or end of the logs respectively. Defaults to "start".
</li>
<li>
<span class="param">plain</span>
A boolean of whether to return just the plain text without framing.
This can be usef when viewing logs in a browser.
</li>
</ul>
</dd>

Expand Down

0 comments on commit 0be1b32

Please sign in to comment.