Skip to content

Commit

Permalink
framer: fix early exit/truncation in framer
Browse files Browse the repository at this point in the history
  • Loading branch information
schmichael committed May 2, 2018
1 parent 361db26 commit 6858c52
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 18 deletions.
3 changes: 2 additions & 1 deletion api/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -145,7 +146,7 @@ func TestFS_Logs(t *testing.T) {
}

// Check length
require.Equal(input.Len(), result.Len())
assert.Equal(t, input.Len(), result.Len(), "file size mismatch")

// Check complete ordering
for i := 0; i < lines; i++ {
Expand Down
39 changes: 25 additions & 14 deletions client/lib/streamframer/framer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func (s *StreamFrame) Copy() *StreamFrame {

// StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct {
// out is where frames are sent and is closed when no more frames will
// be sent.
out chan<- *StreamFrame

frameSize int
Expand All @@ -75,10 +77,12 @@ type StreamFramer struct {
// shutdown is true when a shutdown is triggered
shutdown bool

// shutdownCh is closed when a shutdown is triggered
// shutdownCh is closed when no more Send()s will be called and run()
// should flush pending frames before closing exitCh
shutdownCh chan struct{}

// exitCh is closed when the run() goroutine exits
// exitCh is closed when the run() goroutine exits and no more frames
// will be sent.
exitCh chan struct{}

// The mutex protects everything below
Expand Down Expand Up @@ -122,7 +126,6 @@ func (s *StreamFramer) Destroy() {

if !wasShutdown {
close(s.shutdownCh)
close(s.out)
}

s.heartbeat.Stop()
Expand All @@ -134,6 +137,11 @@ func (s *StreamFramer) Destroy() {
if running {
<-s.exitCh
}

// Close out chan only after exitCh has exited
if !wasShutdown {
close(s.out)
}
}

// Run starts a long lived goroutine that handles sending data as well as
Expand Down Expand Up @@ -163,11 +171,6 @@ func (s *StreamFramer) run() {
s.l.Unlock()
close(s.exitCh)
}()
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered in f", r)
}
}()

OUTER:
for {
Expand Down Expand Up @@ -195,8 +198,16 @@ OUTER:
}

s.l.Lock()
// Send() may have left a partial frame. Send it now.
if !s.f.IsCleared() {
s.send()
s.f.Data = s.readData()

// Only send if there's actually data left
if len(s.f.Data) > 0 {
// Cannot select on shutdownCh as it's already closed
// Cannot select on exitCh as it's only closed after this exits
s.out <- s.f.Copy()
}
}
s.l.Unlock()
}
Expand All @@ -205,7 +216,7 @@ OUTER:
func (s *StreamFramer) send() {
// Ensure s.out has not already been closd by Destroy
select {
case <-s.shutdownCh:
case <-s.exitCh:
return
default:
}
Expand All @@ -214,7 +225,7 @@ func (s *StreamFramer) send() {
select {
case s.out <- s.f.Copy():
s.f.Clear()
case <-s.shutdownCh:
case <-s.exitCh:
}
}

Expand Down Expand Up @@ -274,9 +285,9 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
force = false
}

// Ensure s.out has not already been closd by Destroy
// Ensure s.out has not already been closed by Destroy
select {
case <-s.shutdownCh:
case <-s.exitCh:
return nil
default:
}
Expand All @@ -285,7 +296,7 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
s.f.Data = s.readData()
select {
case s.out <- s.f.Copy():
case <-s.shutdownCh:
case <-s.exitCh:
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions client/lib/streamframer/framer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
)

// This test checks, that even if the frame size has not been hit, a flush will
Expand Down Expand Up @@ -132,8 +133,8 @@ func TestStreamFramer_Batch(t *testing.T) {
t.Fatalf("exit channel should close")
}

if _, ok := <-frames; ok {
t.Fatal("out channel should be closed")
if f, ok := <-frames; ok {
t.Fatalf("out channel should be closed. recv: %s", pretty.Sprint(f))
}
}

Expand Down
3 changes: 2 additions & 1 deletion command/agent/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter,
errCh <- CodedError(500, err.Error())
return
}
encoder.Reset(httpPipe)

for {
select {
Expand Down Expand Up @@ -397,6 +396,8 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter,
handler(handlerPipe)
cancel()
codedErr := <-errCh

// Ignore EOF and ErrClosedPipe errors.
if codedErr != nil &&
(codedErr == io.EOF ||
strings.Contains(codedErr.Error(), "closed") ||
Expand Down

0 comments on commit 6858c52

Please sign in to comment.