Skip to content

Commit

Permalink
Make implementing Close required by reader.Reader (elastic#20455)
Browse files Browse the repository at this point in the history
This PR changes the `reader.Reader` interface to require readers implement a `Close` function.

This lets Filebeat clean-up the goroutine in `TimeoutReader`.

Closes elastic#19193
(cherry picked from commit 315a17e)
  • Loading branch information
kvch committed Aug 26, 2020
1 parent 1d08e2a commit 3808fce
Show file tree
Hide file tree
Showing 21 changed files with 107 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Remove `common.MapStrPointer` parameter from `cfgfile.Runnerfactory` interface. {pull}19135[19135]
- Replace `ACKCount`, `ACKEvents`, and `ACKLastEvent` callbacks with `ACKHandler` and interface in `beat.ClientConfig`. {pull}19632[19632]
- Remove global ACK handler support via `SetACKHandler` from publisher pipeline. {pull}19632[19632]
- Make implementing `Close` required for `reader.Reader` interfaces. {pull}20455[20455]

==== Bugfixes

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,10 @@ field. You can revert this change by configuring tags for the module and omittin
- Add event.ingested for Suricata module {pull}20220[20220]
- Add support for custom header and headersecret for filebeat http_endpoint input {pull}20435[20435]
- Convert httpjson to v2 input {pull}20226[20226]
- Add event.ingested to all Filebeat modules. {pull}20386[20386]
- Return error when log harvester tries to open a named pipe. {issue}18682[18682] {pull}20450[20450]
- Avoid goroutine leaks in Filebeat readers. {issue}19193[19193] {pull}20455[20455]


*Heartbeat*

Expand Down
5 changes: 4 additions & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,10 @@ func (h *Harvester) Run() error {
}

h.stop()
h.log.Close()
err := h.reader.Close()
if err != nil {
logp.Err("Failed to stop harvester for file %s: %v", h.state.Source, err)
}
}(h.state.Source)

logp.Info("Harvester started for file: %s", h.state.Source)
Expand Down
3 changes: 2 additions & 1 deletion filebeat/input/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ func (f *Log) wait() {
}

// Close closes the done channel but no th the file handler
func (f *Log) Close() {
func (f *Log) Close() error {
close(f.done)
// Note: File reader is not closed here because that leads to race conditions
return nil
}
10 changes: 7 additions & 3 deletions libbeat/reader/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type CheckFunc func(offset int64, buf []byte) bool
// Is is useful is you want to detect if you have received garbage from a network volume.
type Reader struct {
log *logp.Logger
reader io.Reader
reader io.ReadCloser
buffer bytes.Buffer
minBufferSize int
maxFailures int
Expand All @@ -59,7 +59,7 @@ type Reader struct {
// NewReader returns a debug reader.
func NewReader(
log *logp.Logger,
reader io.Reader,
reader io.ReadCloser,
minBufferSize int,
maxFailures int,
predicate CheckFunc,
Expand Down Expand Up @@ -115,6 +115,10 @@ func (r *Reader) Read(p []byte) (int, error) {
return n, err
}

func (r *Reader) Close() error {
return r.reader.Close()
}

func makeNullCheck(log *logp.Logger, minSize int) CheckFunc {
// create a slice with null bytes to match on the buffer.
pattern := make([]byte, minSize, minSize)
Expand Down Expand Up @@ -159,7 +163,7 @@ func summarizeBufferInfo(idx int, buf []byte) (int, []byte) {

// AppendReaders look into the current enabled log selector and will add any debug reader that match
// the selectors.
func AppendReaders(reader io.Reader) (io.Reader, error) {
func AppendReaders(reader io.ReadCloser) (io.ReadCloser, error) {
var err error

if logp.HasSelector("detect_null_bytes") || logp.HasSelector("*") {
Expand Down
13 changes: 8 additions & 5 deletions libbeat/reader/debug/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package debug
import (
"bytes"
"io"
"io/ioutil"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -78,8 +79,9 @@ func testCheckContent(t *testing.T) {
s.WriteString("hello world")
s.WriteByte(0x00)
s.WriteString("hello world")
r := ioutil.NopCloser(&s)

reader, _ := NewReader(logp.L(), &s, 5, 3, check)
reader, _ := NewReader(logp.L(), r, 5, 3, check)

_, err := reader.Read(make([]byte, 20))
if !assert.NoError(t, err) {
Expand All @@ -91,7 +93,7 @@ func testCheckContent(t *testing.T) {

func testConsumeAll(t *testing.T) {
c, _ := common.RandomBytes(2000)
reader := bytes.NewReader(c)
reader := ioutil.NopCloser(bytes.NewReader(c))
var buf bytes.Buffer
consumed := 0
debug, _ := NewReader(logp.L(), reader, 8, 20, makeNullCheck(logp.L(), 1))
Expand All @@ -106,8 +108,8 @@ func testConsumeAll(t *testing.T) {
}

func testEmptyBuffer(t *testing.T) {
var buf bytes.Buffer
debug, _ := NewReader(logp.L(), &buf, 8, 20, makeNullCheck(logp.L(), 1))
buf := ioutil.NopCloser(&bytes.Buffer{})
debug, _ := NewReader(logp.L(), buf, 8, 20, makeNullCheck(logp.L(), 1))
data := make([]byte, 33)
n, err := debug.Read(data)
assert.Equal(t, io.EOF, err)
Expand All @@ -134,8 +136,9 @@ func testSilent(t *testing.T) {
b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'})
b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'})
b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'})
r := ioutil.NopCloser(&b)

debug, _ := NewReader(logp.L(), &b, 3, 2, check)
debug, _ := NewReader(logp.L(), r, 3, 2, check)
consumed := 0
for consumed < b.Len() {
n, _ := debug.Read(make([]byte, 3))
Expand Down
11 changes: 11 additions & 0 deletions libbeat/reader/multiline/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package multiline

import (
"io"

"github.com/elastic/beats/v7/libbeat/reader"
)

Expand Down Expand Up @@ -131,3 +133,12 @@ func (cr *counterReader) resetState() {
func (cr *counterReader) setState(next func(cr *counterReader) (reader.Message, error)) {
cr.state = next
}

func (cr *counterReader) Close() error {
cr.setState((*counterReader).readClosed)
return cr.reader.Close()
}

func (cr *counterReader) readClosed() (reader.Message, error) {
return reader.Message{}, io.EOF
}
3 changes: 2 additions & 1 deletion libbeat/reader/multiline/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package multiline
import (
"bytes"
"errors"
"io/ioutil"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -333,7 +334,7 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reade
}

var r reader.Reader
r, err = readfile.NewEncodeReader(in, readfile.Config{
r, err = readfile.NewEncodeReader(ioutil.NopCloser(in), readfile.Config{
Codec: enc,
BufferSize: 4096,
Terminator: readfile.LineFeed,
Expand Down
10 changes: 10 additions & 0 deletions libbeat/reader/multiline/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package multiline
import (
"errors"
"fmt"
"io"
"time"

"github.com/elastic/beats/v7/libbeat/common/match"
Expand Down Expand Up @@ -254,6 +255,15 @@ func (pr *patternReader) setState(next func(pr *patternReader) (reader.Message,
pr.state = next
}

func (pr *patternReader) Close() error {
pr.setState((*patternReader).readClosed)
return pr.reader.Close()
}

func (pr *patternReader) readClosed() (reader.Message, error) {
return reader.Message{}, io.EOF
}

// matchers
func afterMatcher(pat match.Matcher) (matcher, error) {
return genPatternMatcher(pat, func(last, current []byte) []byte {
Expand Down
2 changes: 2 additions & 0 deletions libbeat/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package reader

import (
"errors"
"io"
)

// Reader is the interface that wraps the basic Next method for
// getting a new message.
// Next returns the message being read or and error. EOF is returned
// if reader will not return any new message on subsequent calls.
type Reader interface {
io.Closer
Next() (Message, error)
}

Expand Down
6 changes: 5 additions & 1 deletion libbeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Config struct {

// New creates a new Encode reader from input reader by applying
// the given codec.
func NewEncodeReader(r io.Reader, config Config) (EncoderReader, error) {
func NewEncodeReader(r io.ReadCloser, config Config) (EncoderReader, error) {
eReader, err := NewLineReader(r, config)
return EncoderReader{eReader}, err
}
Expand All @@ -59,3 +59,7 @@ func (r EncoderReader) Next() (reader.Message, error) {
Bytes: sz,
}, err
}

func (r EncoderReader) Close() error {
return r.reader.Close()
}
3 changes: 2 additions & 1 deletion libbeat/reader/readfile/encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package readfile

import (
"bytes"
"io/ioutil"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -46,7 +47,7 @@ func TestEncodeLines(t *testing.T) {

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
r := bytes.NewReader(testCase.Input)
r := ioutil.NopCloser(bytes.NewReader(testCase.Input))
codec, err := encFactory(r)
assert.Nil(t, err, "failed to initialize encoding: %v", err)

Expand Down
4 changes: 4 additions & 0 deletions libbeat/reader/readfile/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,7 @@ func (r *LimitReader) Next() (reader.Message, error) {
}
return message, err
}

func (r *LimitReader) Close() error {
return r.reader.Close()
}
2 changes: 2 additions & 0 deletions libbeat/reader/readfile/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func (m *mockReader) Next() (reader.Message, error) {
}, nil
}

func (m *mockReader) Close() error { return nil }

var limitTests = []struct {
line string
maxBytes int
Expand Down
8 changes: 6 additions & 2 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const unlimited = 0
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type LineReader struct {
reader io.Reader
reader io.ReadCloser
bufferSize int
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
Expand All @@ -48,7 +48,7 @@ type LineReader struct {
}

// New creates a new reader object
func NewLineReader(input io.Reader, config Config) (*LineReader, error) {
func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {
encoder := config.Codec.NewEncoder()

// Create newline char based on encoding
Expand Down Expand Up @@ -271,3 +271,7 @@ func (r *LineReader) decode(end int) (int, error) {
r.byteCount += start
return start, err
}

func (r *LineReader) Close() error {
return r.reader.Close()
}
9 changes: 5 additions & 4 deletions libbeat/reader/readfile/line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"bytes"
"encoding/hex"
"io"
"io/ioutil"
"math/rand"
"strings"
"testing"
Expand Down Expand Up @@ -97,7 +98,7 @@ func TestReaderEncodings(t *testing.T) {
}

// create line reader
reader, err := NewLineReader(buffer, Config{codec, 1024, LineFeed, unlimited})
reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, LineFeed, unlimited})
if err != nil {
t.Fatal("failed to initialize reader:", err)
}
Expand Down Expand Up @@ -157,7 +158,7 @@ func TestLineTerminators(t *testing.T) {
buffer.Write([]byte("this is my second line"))
buffer.Write(nl)

reader, err := NewLineReader(buffer, Config{codec, 1024, terminator, unlimited})
reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, terminator, unlimited})
if err != nil {
t.Errorf("failed to initialize reader: %v", err)
continue
Expand Down Expand Up @@ -229,7 +230,7 @@ func testReadLines(t *testing.T, inputLines [][]byte) {
// initialize reader
buffer := bytes.NewBuffer(inputStream)
codec, _ := encoding.Plain(buffer)
reader, err := NewLineReader(buffer, Config{codec, buffer.Len(), LineFeed, unlimited})
reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, buffer.Len(), LineFeed, unlimited})
if err != nil {
t.Fatalf("Error initializing reader: %v", err)
}
Expand Down Expand Up @@ -349,7 +350,7 @@ func TestMaxBytesLimit(t *testing.T) {
}

// Create line reader
reader, err := NewLineReader(strings.NewReader(input), Config{codec, bufferSize, LineFeed, lineMaxLimit})
reader, err := NewLineReader(ioutil.NopCloser(strings.NewReader(input)), Config{codec, bufferSize, LineFeed, lineMaxLimit})
if err != nil {
t.Fatal("failed to initialize reader:", err)
}
Expand Down
4 changes: 4 additions & 0 deletions libbeat/reader/readfile/strip_newline.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ func (p *StripNewline) autoLineEndingChars(l []byte) int {
}
return 1
}

func (p *StripNewline) Close() error {
return p.reader.Close()
}
21 changes: 18 additions & 3 deletions libbeat/reader/readfile/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package readfile

import (
"errors"
"io"
"time"

"github.com/elastic/beats/v7/libbeat/reader"
Expand All @@ -36,6 +37,7 @@ type TimeoutReader struct {
signal error
running bool
ch chan lineMessage
done chan struct{}
}

type lineMessage struct {
Expand All @@ -54,6 +56,7 @@ func NewTimeoutReader(reader reader.Reader, signal error, t time.Duration) *Time
signal: signal,
timeout: t,
ch: make(chan lineMessage, 1),
done: make(chan struct{}),
}
}

Expand All @@ -68,9 +71,13 @@ func (r *TimeoutReader) Next() (reader.Message, error) {
go func() {
for {
message, err := r.reader.Next()
r.ch <- lineMessage{message, err}
if err != nil {
break
select {
case <-r.done:
return
case r.ch <- lineMessage{message, err}:
if err != nil {
return
}
}
}
}()
Expand All @@ -85,5 +92,13 @@ func (r *TimeoutReader) Next() (reader.Message, error) {
return msg.line, msg.err
case <-timer.C:
return reader.Message{}, r.signal
case <-r.done:
return reader.Message{}, io.EOF
}
}

func (r *TimeoutReader) Close() error {
close(r.done)

return r.reader.Close()
}
4 changes: 4 additions & 0 deletions libbeat/reader/readjson/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,7 @@ func stripNewLineWin(msg *reader.Message) {
return r == '\n' || r == '\r'
})
}

func (p *DockerJSONReader) Close() error {
return p.reader.Close()
}
Loading

0 comments on commit 3808fce

Please sign in to comment.