From 478a734bfd9101ad40ffd0720e3b767ccd737d10 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 22 Jul 2016 16:02:46 -0700 Subject: [PATCH] better flush and connection closed handling --- command/agent/fs_endpoint.go | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index f0f7dd0abaa..3a1ca361a97 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "math" + "net" "net/http" "os" "path/filepath" @@ -312,9 +313,9 @@ func (s *StreamFramer) run() { var err error defer func() { close(s.exitCh) - close(s.outbound) s.l.Lock() + close(s.outbound) s.Err = err s.running = false s.l.Unlock() @@ -342,7 +343,7 @@ func (s *StreamFramer) run() { select { case s.outbound <- s.f: s.f = nil - default: + case <-s.exitCh: } s.l.Unlock() case <-s.heartbeat.C: @@ -369,13 +370,17 @@ OUTER: } // Flush any existing frames - select { - case o := <-s.outbound: - // Send the frame and then clear the current working frame - if err = s.enc.Encode(o); err != nil { - return +FLUSH: + for { + select { + case o := <-s.outbound: + // Send the frame and then clear the current working frame + if err = s.enc.Encode(o); err != nil { + return + } + default: + break FLUSH } - default: } s.l.Lock() @@ -602,10 +607,20 @@ OUTER: if err := framer.Send(path, lastEvent, data[:n], offset); err != nil { // Check if the connection has been closed - if err == io.ErrClosedPipe || strings.Contains(err.Error(), syscall.EPIPE.Error()) { + if err == io.ErrClosedPipe { + // The pipe check is for tests return syscall.EPIPE } + operr, ok := err.(*net.OpError) + if ok { + // The connection was closed by our peer + e := operr.Err.Error() + if strings.Contains(e, syscall.EPIPE.Error()) || strings.Contains(e, syscall.ECONNRESET.Error()) { + return syscall.EPIPE + } + } + return err } } @@ -813,7 +828,7 @@ func (s *HTTPServer) logs(follow bool, offset int64, } // Check if the connection was closed - if strings.Contains(err.Error(), syscall.EPIPE.Error()) { + if err == syscall.EPIPE { return nil }