From ac4be1f7aa7b9f74be371bd55796284fdd0a9b44 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 13 Jan 2017 13:12:36 -0800 Subject: [PATCH 1/5] plain w --- command/agent/fs_endpoint.go | 50 ++++++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index d84411e2b75..10111553439 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -255,6 +255,8 @@ func (s *StreamFrame) IsCleared() bool { // StreamFramer is used to buffer and send frames as well as heartbeat. type StreamFramer struct { + plainTxt bool + out io.WriteCloser enc *codec.Encoder encLock sync.Mutex @@ -282,15 +284,20 @@ type StreamFramer struct { // NewStreamFramer creates a new stream framer that will output StreamFrames to // the passed output. -func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer { +func NewStreamFramer(out io.WriteCloser, plainTxt bool, heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer { // Create a JSON encoder enc := codec.NewEncoder(out, jsonHandle) // Create the heartbeat and flush ticker - heartbeat := time.NewTicker(heartbeatRate) + var heartbeat *time.Ticker + if !plainTxt { + heartbeat = time.NewTicker(heartbeatRate) + } + flusher := time.NewTicker(batchWindow) return &StreamFramer{ + plainTxt: plainTxt, out: out, enc: enc, frameSize: frameSize, @@ -306,7 +313,9 @@ func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duratio func (s *StreamFramer) Destroy() { s.l.Lock() close(s.shutdownCh) - s.heartbeat.Stop() + if s.heartbeat != nil { + s.heartbeat.Stop() + } s.flusher.Stop() running := s.running s.l.Unlock() @@ -348,6 +357,11 @@ func (s *StreamFramer) run() { s.l.Unlock() }() + var heartbeat <-chan time.Time + if s.heartbeat != nil { + heartbeat = s.heartbeat.C + } + OUTER: for { select { @@ -369,7 +383,7 @@ OUTER: if err != nil { return } - case <-s.heartbeat.C: + case <-heartbeat: // Send a heartbeat frame if err = s.send(HeartbeatStreamFrame); err != nil { return @@ -390,6 +404,10 @@ OUTER: func (s *StreamFramer) send(f *StreamFrame) error { s.encLock.Lock() defer s.encLock.Unlock() + if s.plainTxt { + _, err := io.Copy(s.out, bytes.NewReader(f.Data)) + return err + } return s.enc.Encode(f) } @@ -549,7 +567,7 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf output := ioutils.NewWriteFlusher(resp) // Create the framer - framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(output, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() @@ -697,7 +715,7 @@ OUTER: // applied. Defaults to "start". func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var allocID, task, logType string - var follow bool + var plain, follow bool var err error q := req.URL.Query() @@ -710,8 +728,18 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac return nil, taskNotPresentErr } - if follow, err = strconv.ParseBool(q.Get("follow")); err != nil { - return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err) + followStr := q.Get("follow") + if followStr != "" { + if follow, err = strconv.ParseBool(followStr); err != nil { + return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err) + } + } + + plainStr := q.Get("plain") + if plainStr != "" { + if plain, err = strconv.ParseBool(plainStr); err != nil { + return nil, fmt.Errorf("Failed to parse plain field to boolean: %v", err) + } } logType = q.Get("type") @@ -747,15 +775,15 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac // Create an output that gets flushed on every write output := ioutils.NewWriteFlusher(resp) - return nil, s.logs(follow, offset, origin, task, logType, fs, output) + return nil, s.logs(follow, plain, offset, origin, task, logType, fs, output) } -func (s *HTTPServer) logs(follow bool, offset int64, +func (s *HTTPServer) logs(follow, plain bool, offset int64, origin, task, logType string, fs allocdir.AllocDirFS, output io.WriteCloser) error { // Create the framer - framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(output, plain, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() From 28e0718d83f9eaff56350f098d53c4f68316dcb2 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 23 Jan 2017 15:12:09 -0800 Subject: [PATCH 2/5] disallow follow --- command/agent/fs_endpoint.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 10111553439..48adb310e07 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -740,6 +740,10 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac if plain, err = strconv.ParseBool(plainStr); err != nil { return nil, fmt.Errorf("Failed to parse plain field to boolean: %v", err) } + + if plain { + follow = false + } } logType = q.Get("type") From 1960070a24fb4a6797ac66508398956ca13b229e Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 23 Jan 2017 16:04:50 -0800 Subject: [PATCH 3/5] test --- command/agent/fs_endpoint.go | 17 +---- command/agent/fs_endpoint_test.go | 119 +++++++++++++++++++++++++++--- 2 files changed, 111 insertions(+), 25 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 48adb310e07..4fa0f6362e1 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -289,11 +289,7 @@ func NewStreamFramer(out io.WriteCloser, plainTxt bool, heartbeatRate, batchWind enc := codec.NewEncoder(out, jsonHandle) // Create the heartbeat and flush ticker - var heartbeat *time.Ticker - if !plainTxt { - heartbeat = time.NewTicker(heartbeatRate) - } - + heartbeat := time.NewTicker(heartbeatRate) flusher := time.NewTicker(batchWindow) return &StreamFramer{ @@ -313,9 +309,7 @@ func NewStreamFramer(out io.WriteCloser, plainTxt bool, heartbeatRate, batchWind func (s *StreamFramer) Destroy() { s.l.Lock() close(s.shutdownCh) - if s.heartbeat != nil { - s.heartbeat.Stop() - } + s.heartbeat.Stop() s.flusher.Stop() running := s.running s.l.Unlock() @@ -357,11 +351,6 @@ func (s *StreamFramer) run() { s.l.Unlock() }() - var heartbeat <-chan time.Time - if s.heartbeat != nil { - heartbeat = s.heartbeat.C - } - OUTER: for { select { @@ -383,7 +372,7 @@ OUTER: if err != nil { return } - case <-heartbeat: + case <-s.heartbeat.C: // Send a heartbeat frame if err = s.send(HeartbeatStreamFrame); err != nil { return diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index abb9a974b33..0e9b732547c 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -14,6 +14,7 @@ import ( "reflect" "runtime" "strconv" + "strings" "testing" "time" @@ -118,7 +119,7 @@ func TestStreamFramer_Flush(t *testing.T) { r, w := io.Pipe() wrappedW := &WriteCloseChecker{WriteCloser: w} hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond - sf := NewStreamFramer(wrappedW, hRate, bWindow, 100) + sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100) sf.Run() // Create a decoder @@ -186,7 +187,7 @@ func TestStreamFramer_Batch(t *testing.T) { wrappedW := &WriteCloseChecker{WriteCloser: w} // Ensure the batch window doesn't get hit hRate, bWindow := 100*time.Millisecond, 500*time.Millisecond - sf := NewStreamFramer(wrappedW, hRate, bWindow, 3) + sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 3) sf.Run() // Create a decoder @@ -263,7 +264,7 @@ func TestStreamFramer_Heartbeat(t *testing.T) { r, w := io.Pipe() wrappedW := &WriteCloseChecker{WriteCloser: w} hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond - sf := NewStreamFramer(wrappedW, hRate, bWindow, 100) + sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100) sf.Run() // Create a decoder @@ -315,7 +316,7 @@ func TestStreamFramer_Order(t *testing.T) { wrappedW := &WriteCloseChecker{WriteCloser: w} // Ensure the batch window doesn't get hit hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond - sf := NewStreamFramer(wrappedW, hRate, bWindow, 10) + sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 10) sf.Run() // Create a decoder @@ -401,6 +402,102 @@ func TestStreamFramer_Order(t *testing.T) { } } +// This test checks that frames are received in order +func TestStreamFramer_Order_PlainText(t *testing.T) { + // Create the stream framer + r, w := io.Pipe() + wrappedW := &WriteCloseChecker{WriteCloser: w} + // Ensure the batch window doesn't get hit + hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond + sf := NewStreamFramer(wrappedW, true, hRate, bWindow, 10) + sf.Run() + + files := []string{"1", "2", "3", "4", "5"} + input := bytes.NewBuffer(make([]byte, 0, 100000)) + for i := 0; i <= 1000; i++ { + str := strconv.Itoa(i) + "," + input.WriteString(str) + } + + expected := bytes.NewBuffer(make([]byte, 0, 100000)) + for _, _ = range files { + expected.Write(input.Bytes()) + } + receivedBuf := bytes.NewBuffer(make([]byte, 0, 100000)) + + // Start the reader + resultCh := make(chan struct{}) + go func() { + OUTER: + for { + if _, err := receivedBuf.ReadFrom(r); err != nil { + if strings.Contains(err.Error(), "closed pipe") { + resultCh <- struct{}{} + return + } + t.Fatalf("bad read: %v", err) + } + + if expected.Len() != receivedBuf.Len() { + continue + } + expectedBytes := expected.Bytes() + actualBytes := receivedBuf.Bytes() + for i, e := range expectedBytes { + if a := actualBytes[i]; a != e { + continue OUTER + } + } + resultCh <- struct{}{} + return + + } + }() + + // Send the data + b := input.Bytes() + shards := 10 + each := len(b) / shards + for _, f := range files { + for i := 0; i < shards; i++ { + l, r := each*i, each*(i+1) + if i == shards-1 { + r = len(b) + } + + if err := sf.Send(f, "", b[l:r], 0); err != nil { + t.Fatalf("Send() failed %v", err) + } + } + } + + // Ensure we get data + select { + case <-resultCh: + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow): + if expected.Len() != receivedBuf.Len() { + t.Fatalf("Got %v; want %v", expected.Len(), receivedBuf.Len()) + } + expectedBytes := expected.Bytes() + actualBytes := receivedBuf.Bytes() + for i, e := range expectedBytes { + if a := actualBytes[i]; a != e { + t.Fatalf("Index %d; Got %q; want %q", i, a, e) + } + } + } + + // Close the reader and wait. This should cause the runner to exit + if err := r.Close(); err != nil { + t.Fatalf("failed to close reader") + } + + sf.Destroy() + if !wrappedW.Closed { + t.Fatalf("writer not closed") + } +} + func TestHTTP_Stream_MissingParams(t *testing.T) { httpTest(t, nil, func(s *TestServer) { req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil) @@ -467,7 +564,7 @@ func TestHTTP_Stream_NoFile(t *testing.T) { ad := tempAllocDir(t) defer os.RemoveAll(ad.AllocDir) - framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() @@ -526,7 +623,7 @@ func TestHTTP_Stream_Modify(t *testing.T) { t.Fatalf("write failed: %v", err) } - framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() @@ -607,7 +704,7 @@ func TestHTTP_Stream_Truncate(t *testing.T) { t.Fatalf("write failed: %v", err) } - framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() @@ -710,7 +807,7 @@ func TestHTTP_Stream_Delete(t *testing.T) { t.Fatalf("write failed: %v", err) } - framer := NewStreamFramer(wrappedW, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(wrappedW, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() // Start streaming @@ -804,7 +901,7 @@ func TestHTTP_Logs_NoFollow(t *testing.T) { // Start streaming logs go func() { - if err := s.Server.logs(false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { + if err := s.Server.logs(false, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { t.Fatalf("logs() failed: %v", err) } }() @@ -893,7 +990,7 @@ func TestHTTP_Logs_Follow(t *testing.T) { // Start streaming logs go func() { - if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil { + if err := s.Server.logs(true, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { t.Fatalf("logs() failed: %v", err) } }() @@ -1006,7 +1103,7 @@ func BenchmarkHTTP_Logs_Follow(t *testing.B) { // Start streaming logs go func() { - if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil { + if err := s.Server.logs(true, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { t.Fatalf("logs() failed: %v", err) } }() From d276b3fcc1b1b4a1a316468e8f0da6316af0f381 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 23 Jan 2017 16:06:31 -0800 Subject: [PATCH 4/5] docs --- command/agent/fs_endpoint.go | 8 ++++++-- website/source/docs/http/client-fs.html.md | 6 ++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 4fa0f6362e1..638072f89c8 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -255,6 +255,7 @@ func (s *StreamFrame) IsCleared() bool { // StreamFramer is used to buffer and send frames as well as heartbeat. type StreamFramer struct { + // plainTxt determines whether we frame or just send plain text data. plainTxt bool out io.WriteCloser @@ -283,8 +284,11 @@ type StreamFramer struct { } // NewStreamFramer creates a new stream framer that will output StreamFrames to -// the passed output. -func NewStreamFramer(out io.WriteCloser, plainTxt bool, heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer { +// the passed output. If plainTxt is set we do not frame and just batch plain +// text data. +func NewStreamFramer(out io.WriteCloser, plainTxt bool, + heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer { + // Create a JSON encoder enc := codec.NewEncoder(out, jsonHandle) diff --git a/website/source/docs/http/client-fs.html.md b/website/source/docs/http/client-fs.html.md index 2278c6d1a84..26d22e89753 100644 --- a/website/source/docs/http/client-fs.html.md +++ b/website/source/docs/http/client-fs.html.md @@ -227,6 +227,12 @@ allocation was placed. Origin can be either "start" or "end" and applies the offset relative to either the start or end of the logs respectively. Defaults to "start". +
  • + plain + A boolean of whether to return just the plain text without framing. If + set, follow is automatically unset. This can be usef when viewing logs + in a browser. +
  • From 6082b4619e9746b919c043057038cfcb4efa0af1 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 23 Jan 2017 16:58:53 -0800 Subject: [PATCH 5/5] Review feedback --- command/agent/fs_endpoint.go | 10 ++-------- website/source/docs/http/client-fs.html.md | 5 ++--- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 638072f89c8..379d737e7ec 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -721,22 +721,16 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac return nil, taskNotPresentErr } - followStr := q.Get("follow") - if followStr != "" { + if followStr := q.Get("follow"); followStr != "" { if follow, err = strconv.ParseBool(followStr); err != nil { return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err) } } - plainStr := q.Get("plain") - if plainStr != "" { + if plainStr := q.Get("plain"); plainStr != "" { if plain, err = strconv.ParseBool(plainStr); err != nil { return nil, fmt.Errorf("Failed to parse plain field to boolean: %v", err) } - - if plain { - follow = false - } } logType = q.Get("type") diff --git a/website/source/docs/http/client-fs.html.md b/website/source/docs/http/client-fs.html.md index 26d22e89753..0c6c7140b1e 100644 --- a/website/source/docs/http/client-fs.html.md +++ b/website/source/docs/http/client-fs.html.md @@ -229,9 +229,8 @@ allocation was placed.
  • plain - A boolean of whether to return just the plain text without framing. If - set, follow is automatically unset. This can be usef when viewing logs - in a browser. + A boolean of whether to return just the plain text without framing. + This can be usef when viewing logs in a browser.