Skip to content

Commit

Permalink
Merge pull request #2007 from hashicorp/b-stream-framer-panic
Browse files Browse the repository at this point in the history
Fixes race on StreamFramer Destroy
  • Loading branch information
dadgar authored Nov 28, 2016
2 parents 19f2347 + 81e2d53 commit 2048fb2
Showing 1 changed file with 63 additions and 72 deletions.
135 changes: 63 additions & 72 deletions command/agent/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,18 @@ func (s *StreamFrame) IsHeartbeat() bool {

// StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct {
out io.WriteCloser
enc *codec.Encoder
frameSize int
heartbeat *time.Ticker
flusher *time.Ticker
out io.WriteCloser
enc *codec.Encoder
encLock sync.Mutex

frameSize int

heartbeat *time.Ticker
flusher *time.Ticker

shutdownCh chan struct{}
exitCh chan struct{}

outbound chan *StreamFrame

// The mutex protects everything below
l sync.Mutex

Expand Down Expand Up @@ -266,7 +268,6 @@ func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duratio
frameSize: frameSize,
heartbeat: heartbeat,
flusher: flusher,
outbound: make(chan *StreamFrame),
data: bytes.NewBuffer(make([]byte, 0, 2*frameSize)),
shutdownCh: make(chan struct{}),
exitCh: make(chan struct{}),
Expand All @@ -279,10 +280,11 @@ func (s *StreamFramer) Destroy() {
close(s.shutdownCh)
s.heartbeat.Stop()
s.flusher.Stop()
running := s.running
s.l.Unlock()

// Ensure things were flushed
if s.running {
if running {
<-s.exitCh
}
s.out.Close()
Expand All @@ -309,90 +311,60 @@ func (s *StreamFramer) ExitCh() <-chan struct{} {
// run is the internal run method. It exits if Destroy is called or an error
// occurs, in which case the exit channel is closed.
func (s *StreamFramer) run() {
// Store any error and mark it as not running
var err error
defer func() {
close(s.exitCh)

s.l.Lock()
close(s.outbound)
s.Err = err
s.running = false
s.Err = err
s.l.Unlock()
}()

// Start a heartbeat/flusher go-routine. This is done seprately to avoid blocking
// the outbound channel.
go func() {
for {
select {
case <-s.exitCh:
return
case <-s.shutdownCh:
return
case <-s.flusher.C:
// Skip if there is nothing to flush
s.l.Lock()
if s.f == nil {
s.l.Unlock()
continue
}

// Read the data for the frame, and send it
s.f.Data = s.readData()
select {
case s.outbound <- s.f:
s.f = nil
case <-s.exitCh:
}
s.l.Unlock()
case <-s.heartbeat.C:
// Send a heartbeat frame
s.l.Lock()
select {
case s.outbound <- &StreamFrame{}:
default:
}
s.l.Unlock()
}
}
}()

OUTER:
for {
select {
case <-s.shutdownCh:
break OUTER
case o := <-s.outbound:
// Send the frame
if err = s.enc.Encode(o); err != nil {
return
case <-s.flusher.C:
// Skip if there is nothing to flush
s.l.Lock()
if s.f == nil {
s.l.Unlock()
continue
}
}
}

// Flush any existing frames
FLUSH:
for {
select {
case o := <-s.outbound:
// Send the frame and then clear the current working frame
if err = s.enc.Encode(o); err != nil {
// Read the data for the frame, and send it
s.f.Data = s.readData()
err = s.send(s.f)
s.f = nil
s.l.Unlock()
if err != nil {
return
}
case <-s.heartbeat.C:
// Send a heartbeat frame
if err = s.send(&StreamFrame{}); err != nil {
return
}
default:
break FLUSH
}
}

s.l.Lock()
if s.f != nil {
s.f.Data = s.readData()
s.enc.Encode(s.f)
err = s.send(s.f)
s.f = nil
}
s.l.Unlock()
}

// send takes a StreamFrame, encodes and sends it
func (s *StreamFramer) send(f *StreamFrame) error {
s.encLock.Lock()
defer s.encLock.Unlock()
return s.enc.Encode(f)
}

// readData is a helper which reads the buffered data returning up to the frame
// size of data. Must be called with the lock held. The returned value is
// invalid on the next read or write into the StreamFramer buffer
Expand Down Expand Up @@ -424,6 +396,7 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
if s.Err != nil {
return s.Err
}

return fmt.Errorf("StreamFramer not running")
}

Expand All @@ -435,8 +408,12 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
select {
case <-s.exitCh:
return nil
case s.outbound <- &f:
s.f = nil
default:
}
err := s.send(&f)
s.f = nil
if err != nil {
return err
}
}

Expand All @@ -457,11 +434,16 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
select {
case <-s.exitCh:
return nil
case s.outbound <- &StreamFrame{
default:
}

f := &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
}:
}
if err := s.send(f); err != nil {
return err
}
}

Expand All @@ -472,12 +454,17 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
select {
case <-s.exitCh:
return nil
case s.outbound <- &StreamFrame{
default:
}

f := &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
Data: d,
}:
}
if err := s.send(f); err != nil {
return err
}
}

Expand Down Expand Up @@ -866,6 +853,10 @@ func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logT
scanCh := time.Tick(nextLogCheckRate)
for {
select {
case <-t.Dead():
next <- fmt.Errorf("shutdown triggered")
close(next)
return
case err := <-eofCancelCh:
next <- err
close(next)
Expand Down

0 comments on commit 2048fb2

Please sign in to comment.