Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.17](backport #33568) handle EOF on single line content #34991

Merged
merged 13 commits into from
Jun 5, 2023
5 changes: 4 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Winlogbeat*


*Functionbeat*

==== Bugfixes

*Affecting all Beats*
- Fix panics when a processor is closed twice {pull}34647[34647]

*Filebeat*

- Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568]

*Auditbeat*


Expand Down
5 changes: 3 additions & 2 deletions libbeat/reader/readfile/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package readfile
import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -39,7 +40,7 @@ func BenchmarkEncoderReader(b *testing.B) {
b.Run(name, func(b *testing.B) {
b.ReportAllocs()
for bN := 0; bN < b.N; bN++ {
reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit})
reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit, false})
if err != nil {
b.Fatal("failed to initialize reader:", err)
}
Expand All @@ -48,7 +49,7 @@ func BenchmarkEncoderReader(b *testing.B) {
for i := 0; ; i++ {
msg, err := reader.Next()
if err != nil {
if err == io.EOF {
if errors.Is(err, io.EOF) {
b.ReportMetric(float64(i), "processed_lines")
break
} else {
Expand Down
8 changes: 7 additions & 1 deletion libbeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ type Config struct {
BufferSize int
Terminator LineTerminator
MaxBytes int
// If CollectOnEOF is set to true (default false) the line reader will return the buffer if EOF reached: this
// will ensure full content including last line with no EOL will be returned for fully retrieved content that's
// not appended anymore between reads.
// If CollectOnEOF is set to false the line reader will return 0 content and keep the buffer at the current
// state of appending data after temporarily EOF.
CollectOnEOF bool
}

// New creates a new Encode reader from input reader by applying
// NewEncodeReader creates a new Encode reader from input reader by applying
// the given codec.
func NewEncodeReader(r io.ReadCloser, config Config) (EncoderReader, error) {
eReader, err := NewLineReader(r, config)
Expand Down
103 changes: 73 additions & 30 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package readfile

import (
"bytes"
"errors"
"fmt"
"io"

Expand All @@ -33,18 +34,22 @@ const unlimited = 0
// LineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
// If collectOnEOF is set to true (default false) it will return the buffer if EOF reached.
// If collectOnEOF is set to false it will return 0 content and keep the buffer at the current
// state of appending data after temporarily EOF.
type LineReader struct {
reader io.ReadCloser
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
decodedNl []byte
inBuffer *streambuf.Buffer
outBuffer *streambuf.Buffer
inOffset int // input buffer read offset
byteCount int // number of bytes decoded from input buffer into output buffer
decoder transform.Transformer
tempBuffer []byte
logger *logp.Logger
reader io.ReadCloser
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
decodedNl []byte
collectOnEOF bool
inBuffer *streambuf.Buffer
outBuffer *streambuf.Buffer
inOffset int // input buffer read offset
byteCount int // number of bytes decoded from input buffer into output buffer
decoder transform.Transformer
tempBuffer []byte
logger *logp.Logger
}

// NewLineReader creates a new reader object
Expand All @@ -63,15 +68,16 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {
}

return &LineReader{
reader: input,
maxBytes: config.MaxBytes,
decoder: config.Codec.NewDecoder(),
nl: nl,
decodedNl: terminator,
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
tempBuffer: make([]byte, config.BufferSize),
logger: logp.NewLogger("reader_line"),
reader: input,
maxBytes: config.MaxBytes,
decoder: config.Codec.NewDecoder(),
nl: nl,
decodedNl: terminator,
collectOnEOF: config.CollectOnEOF,
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
tempBuffer: make([]byte, config.BufferSize),
logger: logp.NewLogger("reader_line"),
}, nil
}

Expand All @@ -88,9 +94,46 @@ func (r *LineReader) Next() (b []byte, n int, err error) {
// read next 'potential' line from input buffer/reader
err := r.advance()
if err != nil {
return nil, 0, err
}
if errors.Is(err, io.EOF) && r.collectOnEOF {
// Found EOF and collectOnEOF is true
// -> decode input sequence into outBuffer
// let's take whole buffer len without len(nl) if it ends with it
end := r.inBuffer.Len()
if bytes.HasSuffix(r.inBuffer.Bytes(), r.decodedNl) {
end -= len(r.nl)
}

sz, err := r.decode(end)
if err != nil {
r.logger.Errorf("Error decoding line: %s", err)
// In case of error increase size by unencoded length
sz = r.inBuffer.Len()
}

// Consume transformed bytes from input buffer
_ = r.inBuffer.Advance(sz)
r.inBuffer.Reset()

// output buffer contains untile EOF. Extract
// byte slice from buffer and reset output buffer.
bytes, err := r.outBuffer.Collect(r.outBuffer.Len())
r.outBuffer.Reset()
if err != nil {
// This should never happen as otherwise we have a broken state
panic(err)
}

// return and reset consumed bytes count
sz = r.byteCount
r.byteCount = 0
return bytes, sz, io.EOF
}

// return and reset consumed bytes count
sz := r.byteCount
r.byteCount = 0
return nil, sz, err
}
// Check last decoded byte really being newline also unencoded
// if not, continue reading
buf := r.outBuffer.Bytes()
Expand Down Expand Up @@ -141,13 +184,13 @@ func (r *LineReader) advance() error {
// Try to read more bytes into buffer
n, err := r.reader.Read(r.tempBuffer)

if err == io.EOF && n > 0 {
if errors.Is(err, io.EOF) && n > 0 {
// Continue processing the returned bytes. The next call will yield EOF with 0 bytes.
err = nil
}

// Write to buffer also in case of err
r.inBuffer.Write(r.tempBuffer[:n])
_, _ = r.inBuffer.Write(r.tempBuffer[:n])

if err != nil {
return err
Expand All @@ -166,7 +209,7 @@ func (r *LineReader) advance() error {
// If newLine is found, drop the lines longer than maxBytes
for idx != -1 && idx > r.maxBytes {
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
err = r.inBuffer.Advance(idx + len(r.nl))
_ = r.inBuffer.Advance(idx + len(r.nl))
r.byteCount += idx + len(r.nl)
r.inBuffer.Reset()
r.inOffset = 0
Expand Down Expand Up @@ -234,8 +277,8 @@ func (r *LineReader) skipUntilNewLine() (int, error) {
idx = bytes.Index(r.tempBuffer[:n], r.nl)

if idx != -1 {
r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n])
skipped += idx
_, _ = r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n])
skipped += idx + len(r.nl)
} else {
skipped += n
}
Expand Down Expand Up @@ -264,8 +307,8 @@ func (r *LineReader) decode(end int) (int, error) {
nDst, nSrc, err = r.decoder.Transform(r.tempBuffer, inBytes[start:end], false)
if err != nil {
// Check if error is different from destination buffer too short
if err != transform.ErrShortDst {
r.outBuffer.Write(inBytes[0:end])
if !(errors.Is(err, transform.ErrShortDst)) {
_, _ = r.outBuffer.Write(inBytes[0:end])
start = end
break
}
Expand All @@ -275,7 +318,7 @@ func (r *LineReader) decode(end int) (int, error) {
}

start += nSrc
r.outBuffer.Write(r.tempBuffer[:nDst])
_, _ = r.outBuffer.Write(r.tempBuffer[:nDst])
}

r.byteCount += start
Expand Down
Loading