diff --git a/api/fs_test.go b/api/fs_test.go index e1101c92d7d..790d2ca7c79 100644 --- a/api/fs_test.go +++ b/api/fs_test.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/testutil" "github.com/kr/pretty" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -145,7 +146,7 @@ func TestFS_Logs(t *testing.T) { } // Check length - require.Equal(input.Len(), result.Len()) + assert.Equal(t, input.Len(), result.Len(), "file size mismatch") // Check complete ordering for i := 0; i < lines; i++ { diff --git a/client/lib/streamframer/framer.go b/client/lib/streamframer/framer.go index 5f2eeb626c2..812f775c643 100644 --- a/client/lib/streamframer/framer.go +++ b/client/lib/streamframer/framer.go @@ -65,6 +65,8 @@ func (s *StreamFrame) Copy() *StreamFrame { // StreamFramer is used to buffer and send frames as well as heartbeat. type StreamFramer struct { + // out is where frames are sent and is closed when no more frames will + // be sent. out chan<- *StreamFrame frameSize int @@ -75,10 +77,12 @@ type StreamFramer struct { // shutdown is true when a shutdown is triggered shutdown bool - // shutdownCh is closed when a shutdown is triggered + // shutdownCh is closed when no more Send()s will be called and run() + // should flush pending frames before closing exitCh shutdownCh chan struct{} - // exitCh is closed when the run() goroutine exits + // exitCh is closed when the run() goroutine exits and no more frames + // will be sent. exitCh chan struct{} // The mutex protects everything below @@ -122,7 +126,6 @@ func (s *StreamFramer) Destroy() { if !wasShutdown { close(s.shutdownCh) - close(s.out) } s.heartbeat.Stop() @@ -134,6 +137,11 @@ func (s *StreamFramer) Destroy() { if running { <-s.exitCh } + + // Close out chan only after exitCh has exited + if !wasShutdown { + close(s.out) + } } // Run starts a long lived goroutine that handles sending data as well as @@ -163,11 +171,6 @@ func (s *StreamFramer) run() { s.l.Unlock() close(s.exitCh) }() - defer func() { - if r := recover(); r != nil { - fmt.Println("Recovered in f", r) - } - }() OUTER: for { @@ -195,8 +198,16 @@ OUTER: } s.l.Lock() + // Send() may have left a partial frame. Send it now. if !s.f.IsCleared() { - s.send() + s.f.Data = s.readData() + + // Only send if there's actually data left + if len(s.f.Data) > 0 { + // Cannot select on shutdownCh as it's already closed + // Cannot select on exitCh as it's only closed after this exits + s.out <- s.f.Copy() + } } s.l.Unlock() } @@ -205,7 +216,7 @@ OUTER: func (s *StreamFramer) send() { // Ensure s.out has not already been closd by Destroy select { - case <-s.shutdownCh: + case <-s.exitCh: return default: } @@ -214,7 +225,7 @@ func (s *StreamFramer) send() { select { case s.out <- s.f.Copy(): s.f.Clear() - case <-s.shutdownCh: + case <-s.exitCh: } } @@ -274,9 +285,9 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e force = false } - // Ensure s.out has not already been closd by Destroy + // Ensure s.out has not already been closed by Destroy select { - case <-s.shutdownCh: + case <-s.exitCh: return nil default: } @@ -285,7 +296,7 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e s.f.Data = s.readData() select { case s.out <- s.f.Copy(): - case <-s.shutdownCh: + case <-s.exitCh: return nil } diff --git a/client/lib/streamframer/framer_test.go b/client/lib/streamframer/framer_test.go index 13be32141ab..12d5fefaa88 100644 --- a/client/lib/streamframer/framer_test.go +++ b/client/lib/streamframer/framer_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/hashicorp/nomad/testutil" + "github.com/kr/pretty" ) // This test checks, that even if the frame size has not been hit, a flush will @@ -132,8 +133,8 @@ func TestStreamFramer_Batch(t *testing.T) { t.Fatalf("exit channel should close") } - if _, ok := <-frames; ok { - t.Fatal("out channel should be closed") + if f, ok := <-frames; ok { + t.Fatalf("out channel should be closed. recv: %s", pretty.Sprint(f)) } } diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index d8b7877b1f2..9da2c4f6c47 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -363,7 +363,6 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, errCh <- CodedError(500, err.Error()) return } - encoder.Reset(httpPipe) for { select { @@ -397,6 +396,8 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, handler(handlerPipe) cancel() codedErr := <-errCh + + // Ignore EOF and ErrClosedPipe errors. if codedErr != nil && (codedErr == io.EOF || strings.Contains(codedErr.Error(), "closed") ||