diff --git a/CHANGELOG.md b/CHANGELOG.md index 655373ce393..4bc44f67542 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ IMPROVEMENTS: BUG FIXES: * agent/config: Fix use of IPv6 addresses [GH-2036] + * api: Fix file descriptor leak and high CPU usage when using the logs + endpoint [GH-2079] * cli: Improve parsing error when a job without a name is specified [GH-2030] * client: Fixed permissions of migrated allocation directory [GH-2061] * client: Ensuring allocations are not blocked more than once [GH-2040] diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 48cc5a16e78..d84411e2b75 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -1,11 +1,12 @@ package agent +//go:generate codecgen -o fs_endpoint.generated.go fs_endpoint.go + import ( "bytes" "fmt" "io" "math" - "net" "net/http" "os" "path/filepath" @@ -204,6 +205,12 @@ func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request) return nil, r.Close() } +var ( + // HeartbeatStreamFrame is the StreamFrame to send as a heartbeat, avoiding + // creating many instances of the empty StreamFrame + HeartbeatStreamFrame = &StreamFrame{} +) + // StreamFrame is used to frame data of a file when streaming type StreamFrame struct { // Offset is the offset the data was read from @@ -225,6 +232,27 @@ func (s *StreamFrame) IsHeartbeat() bool { return s.Offset == 0 && len(s.Data) == 0 && s.File == "" && s.FileEvent == "" } +func (s *StreamFrame) Clear() { + s.Offset = 0 + s.Data = nil + s.File = "" + s.FileEvent = "" +} + +func (s *StreamFrame) IsCleared() bool { + if s.Offset != 0 { + return false + } else if s.Data != nil { + return false + } else if s.File != "" { + return false + } else if s.FileEvent != "" { + return false + } else { + return true + } +} + // StreamFramer is used to buffer and send frames as well as heartbeat. type StreamFramer struct { out io.WriteCloser @@ -243,7 +271,7 @@ type StreamFramer struct { l sync.Mutex // The current working frame - f *StreamFrame + f StreamFrame data *bytes.Buffer // Captures whether the framer is running and any error that occurred to @@ -328,32 +356,32 @@ OUTER: case <-s.flusher.C: // Skip if there is nothing to flush s.l.Lock() - if s.f == nil { + if s.f.IsCleared() { s.l.Unlock() continue } // Read the data for the frame, and send it s.f.Data = s.readData() - err = s.send(s.f) - s.f = nil + err = s.send(&s.f) + s.f.Clear() s.l.Unlock() if err != nil { return } case <-s.heartbeat.C: // Send a heartbeat frame - if err = s.send(&StreamFrame{}); err != nil { + if err = s.send(HeartbeatStreamFrame); err != nil { return } } } s.l.Lock() - if s.f != nil { + if !s.f.IsCleared() { s.f.Data = s.readData() - err = s.send(s.f) - s.f = nil + err = s.send(&s.f) + s.f.Clear() } s.l.Unlock() } @@ -378,9 +406,7 @@ func (s *StreamFramer) readData() []byte { return nil } d := s.data.Next(size) - b := make([]byte, size) - copy(b, d) - return b + return d } // Send creates and sends a StreamFrame based on the passed parameters. An error @@ -401,75 +427,62 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e } // Check if not mergeable - if s.f != nil && (s.f.File != file || s.f.FileEvent != fileEvent) { + if !s.f.IsCleared() && (s.f.File != file || s.f.FileEvent != fileEvent) { // Flush the old frame - f := *s.f - f.Data = s.readData() + s.f.Data = s.readData() select { case <-s.exitCh: return nil default: } - err := s.send(&f) - s.f = nil + err := s.send(&s.f) + s.f.Clear() if err != nil { return err } } // Store the new data as the current frame. - if s.f == nil { - s.f = &StreamFrame{ - Offset: offset, - File: file, - FileEvent: fileEvent, - } + if s.f.IsCleared() { + s.f.Offset = offset + s.f.File = file + s.f.FileEvent = fileEvent } // Write the data to the buffer s.data.Write(data) // Handle the delete case in which there is no data + force := false if s.data.Len() == 0 && s.f.FileEvent != "" { - select { - case <-s.exitCh: - return nil - default: - } - - f := &StreamFrame{ - Offset: s.f.Offset, - File: s.f.File, - FileEvent: s.f.FileEvent, - } - if err := s.send(f); err != nil { - return err - } + force = true } // Flush till we are under the max frame size - for s.data.Len() >= s.frameSize { + for s.data.Len() >= s.frameSize || force { + // Clear + if force { + force = false + } + // Create a new frame to send it - d := s.readData() + s.f.Data = s.readData() select { case <-s.exitCh: return nil default: } - f := &StreamFrame{ - Offset: s.f.Offset, - File: s.f.File, - FileEvent: s.f.FileEvent, - Data: d, - } - if err := s.send(f); err != nil { + if err := s.send(&s.f); err != nil { return err } + + // Update the offset + s.f.Offset += int64(len(s.f.Data)) } if s.data.Len() == 0 { - s.f = nil + s.f.Clear() } return nil @@ -569,6 +582,26 @@ func (s *HTTPServer) stream(offset int64, path string, t.Done() }() + // parseFramerErr takes an error and returns an error. The error will + // potentially change if it was caused by the connection being closed. + parseFramerErr := func(e error) error { + if e == nil { + return nil + } + + if strings.Contains(e.Error(), io.ErrClosedPipe.Error()) { + // The pipe check is for tests + return syscall.EPIPE + } + + // The connection was closed by our peer + if strings.Contains(e.Error(), syscall.EPIPE.Error()) || strings.Contains(e.Error(), syscall.ECONNRESET.Error()) { + return syscall.EPIPE + } + + return err + } + // Create a variable to allow setting the last event var lastEvent string @@ -594,23 +627,7 @@ OUTER: // Send the frame if n != 0 { if err := framer.Send(path, lastEvent, data[:n], offset); err != nil { - - // Check if the connection has been closed - if err == io.ErrClosedPipe { - // The pipe check is for tests - return syscall.EPIPE - } - - operr, ok := err.(*net.OpError) - if ok { - // The connection was closed by our peer - e := operr.Err.Error() - if strings.Contains(e, syscall.EPIPE.Error()) || strings.Contains(e, syscall.ECONNRESET.Error()) { - return syscall.EPIPE - } - } - - return err + return parseFramerErr(err) } } @@ -637,7 +654,7 @@ OUTER: case <-changes.Modified: continue OUTER case <-changes.Deleted: - return framer.Send(path, deleteEvent, nil, offset) + return parseFramerErr(framer.Send(path, deleteEvent, nil, offset)) case <-changes.Truncated: // Close the current reader if err := f.Close(); err != nil { @@ -657,7 +674,7 @@ OUTER: lastEvent = truncateEvent continue OUTER case <-framer.ExitCh(): - return nil + return parseFramerErr(framer.Err) case err, ok := <-eofCancelCh: if !ok { return nil @@ -850,7 +867,9 @@ func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logT return } - scanCh := time.Tick(nextLogCheckRate) + ticker := time.NewTicker(nextLogCheckRate) + defer ticker.Stop() + scanCh := ticker.C for { select { case <-t.Dead(): diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index 152832e99f9..bf01c779434 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "reflect" + "runtime" "strconv" "testing" "time" @@ -438,7 +439,7 @@ func TestHTTP_Stream_MissingParams(t *testing.T) { // tempAllocDir returns a new alloc dir that is rooted in a temp dir. The caller // should destroy the temp dir. -func tempAllocDir(t *testing.T) *allocdir.AllocDir { +func tempAllocDir(t testing.TB) *allocdir.AllocDir { dir, err := ioutil.TempDir("", "") if err != nil { t.Fatalf("TempDir() failed: %v", err) @@ -925,6 +926,112 @@ func TestHTTP_Logs_Follow(t *testing.T) { }) } +func BenchmarkHTTP_Logs_Follow(t *testing.B) { + runtime.MemProfileRate = 1 + + s := makeHTTPServer(t, nil) + defer s.Cleanup() + testutil.WaitForLeader(t, s.Agent.RPC) + + // Get a temp alloc dir and create the log dir + ad := tempAllocDir(t) + s.Agent.logger.Printf("ALEX: LOG DIR: %q", ad.SharedDir) + //defer os.RemoveAll(ad.AllocDir) + + logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName) + if err := os.MkdirAll(logDir, 0777); err != nil { + t.Fatalf("Failed to make log dir: %v", err) + } + + // Create a series of log files in the temp dir + task := "foo" + logType := "stdout" + expected := make([]byte, 1024*1024*100) + initialWrites := 3 + + writeToFile := func(index int, data []byte) { + logFile := fmt.Sprintf("%s.%s.%d", task, logType, index) + logFilePath := filepath.Join(logDir, logFile) + err := ioutil.WriteFile(logFilePath, data, 777) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + } + + part := (len(expected) / 3) - 50 + goodEnough := (8 * len(expected)) / 10 + for i := 0; i < initialWrites; i++ { + writeToFile(i, expected[i*part:(i+1)*part]) + } + + t.ResetTimer() + for i := 0; i < t.N; i++ { + s.Agent.logger.Printf("BENCHMARK %d", i) + + // Create a decoder + r, w := io.Pipe() + wrappedW := &WriteCloseChecker{WriteCloser: w} + defer r.Close() + defer w.Close() + dec := codec.NewDecoder(r, jsonHandle) + + var received []byte + + // Start the reader + fullResultCh := make(chan struct{}) + go func() { + for { + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + if err == io.EOF { + t.Logf("EOF") + return + } + + t.Fatalf("failed to decode: %v", err) + } + + if frame.IsHeartbeat() { + continue + } + + received = append(received, frame.Data...) + if len(received) > goodEnough { + close(fullResultCh) + return + } + } + }() + + // Start streaming logs + go func() { + if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil { + t.Fatalf("logs() failed: %v", err) + } + }() + + select { + case <-fullResultCh: + case <-time.After(time.Duration(60 * time.Second)): + t.Fatalf("did not receive data: %d < %d", len(received), goodEnough) + } + + s.Agent.logger.Printf("ALEX: CLOSING") + + // Close the reader + r.Close() + s.Agent.logger.Printf("ALEX: CLOSED") + + s.Agent.logger.Printf("ALEX: WAITING FOR WRITER TO CLOSE") + testutil.WaitForResult(func() (bool, error) { + return wrappedW.Closed, nil + }, func(err error) { + t.Fatalf("connection not closed") + }) + s.Agent.logger.Printf("ALEX: WRITER CLOSED") + } +} + func TestLogs_findClosest(t *testing.T) { task := "foo" entries := []*allocdir.AllocFileInfo{ diff --git a/command/agent/http_test.go b/command/agent/http_test.go index aa71add7208..34f37dcdf07 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -381,7 +381,7 @@ func getIndex(t *testing.T, resp *httptest.ResponseRecorder) uint64 { return uint64(val) } -func httpTest(t *testing.T, cb func(c *Config), f func(srv *TestServer)) { +func httpTest(t testing.TB, cb func(c *Config), f func(srv *TestServer)) { s := makeHTTPServer(t, cb) defer s.Cleanup() testutil.WaitForLeader(t, s.Agent.RPC) diff --git a/testutil/wait.go b/testutil/wait.go index 23c88c4977f..bdac812d0d4 100644 --- a/testutil/wait.go +++ b/testutil/wait.go @@ -54,7 +54,7 @@ func IsTravis() bool { type rpcFn func(string, interface{}, interface{}) error -func WaitForLeader(t *testing.T, rpc rpcFn) { +func WaitForLeader(t testing.TB, rpc rpcFn) { WaitForResult(func() (bool, error) { args := &structs.GenericRequest{} var leader string