Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #24426 to 7.x: Add tests for encoding setting of filestream input #24629

Merged
merged 1 commit into from
Mar 18, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ import (

// Config stores the options of a file stream.
type config struct {
readerConfig
Reader readerConfig `config:",inline"`

Paths []string `config:"paths"`
Close closerConfig `config:"close"`
@@ -79,7 +79,7 @@ type backoffConfig struct {

func defaultConfig() config {
return config{
readerConfig: defaultReaderConfig(),
Reader: defaultReaderConfig(),
Paths: []string{},
Close: defaultCloserConfig(),
CleanInactive: 0,
37 changes: 13 additions & 24 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
@@ -53,14 +53,8 @@ type fileMeta struct {
// are actively written by other applications.
type filestream struct {
readerConfig readerConfig
bufferSize int
tailFile bool // TODO
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
lineTerminator readfile.LineTerminator
excludeLines []match.Matcher
includeLines []match.Matcher
maxBytes int
closerConfig closerConfig
}

@@ -97,9 +91,9 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
return nil, nil, fmt.Errorf("error while creating file identifier: %v", err)
}

encodingFactory, ok := encoding.FindEncoding(config.Encoding)
encodingFactory, ok := encoding.FindEncoding(config.Reader.Encoding)
if !ok || encodingFactory == nil {
return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Encoding)
return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Reader.Encoding)
}

prospector := &fileProspector{
@@ -111,13 +105,8 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
}

filestream := &filestream{
readerConfig: config.readerConfig,
bufferSize: config.BufferSize,
readerConfig: config.Reader,
encodingFactory: encodingFactory,
lineTerminator: config.LineTerminator,
excludeLines: config.ExcludeLines,
includeLines: config.IncludeLines,
maxBytes: config.MaxBytes,
closerConfig: config.Close,
}

@@ -191,7 +180,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri
return nil, err
}

log.Debug("newLogFileReader with config.MaxBytes:", inp.maxBytes)
log.Debug("newLogFileReader with config.MaxBytes:", inp.readerConfig.MaxBytes)

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
@@ -211,22 +200,22 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters.
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file.
// The further size limiting is performed by LimitReader at the end of the readers pipeline as needed.
encReaderMaxBytes := inp.maxBytes * 4
encReaderMaxBytes := inp.readerConfig.MaxBytes * 4

var r reader.Reader
r, err = readfile.NewEncodeReader(dbgReader, readfile.Config{
Codec: inp.encoding,
BufferSize: inp.bufferSize,
Terminator: inp.lineTerminator,
BufferSize: inp.readerConfig.BufferSize,
Terminator: inp.readerConfig.LineTerminator,
MaxBytes: encReaderMaxBytes,
})
if err != nil {
f.Close()
return nil, err
}

r = readfile.NewStripNewline(r, inp.lineTerminator)
r = readfile.NewLimitReader(r, inp.maxBytes)
r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator)
r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes)

return r, nil
}
@@ -335,14 +324,14 @@ func (inp *filestream) readFromSource(
// isDroppedLine decides if the line is exported or not based on
// the include_lines and exclude_lines options.
func (inp *filestream) isDroppedLine(log *logp.Logger, line string) bool {
if len(inp.includeLines) > 0 {
if !matchAny(inp.includeLines, line) {
if len(inp.readerConfig.IncludeLines) > 0 {
if !matchAny(inp.readerConfig.IncludeLines, line) {
log.Debug("Drop line as it does not match any of the include patterns %s", line)
return true
}
}
if len(inp.excludeLines) > 0 {
if matchAny(inp.excludeLines, line) {
if len(inp.readerConfig.ExcludeLines) > 0 {
if matchAny(inp.readerConfig.ExcludeLines, line) {
log.Debug("Drop line as it does match one of the exclude patterns%s", line)
return true
}
137 changes: 137 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
@@ -20,9 +20,15 @@
package filestream

import (
"bytes"
"context"
"runtime"
"testing"

"github.com/stretchr/testify/require"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/transform"
)

// test_close_renamed from test_harvester.py
@@ -100,3 +106,134 @@ func TestFilestreamCloseEOF(t *testing.T) {

env.requireOffsetInRegistry(testlogName, expectedOffset)
}

// test_empty_lines from test_harvester.py
func TestFilestreamEmptyLine(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("first log line\nnext is an empty line\n")
env.mustWriteLinesToFile(testlogName, testlines)

env.waitUntilEventCount(2)
env.requireOffsetInRegistry(testlogName, len(testlines))

moreTestlines := []byte("\nafter an empty line\n")
env.mustAppendLinesToFile(testlogName, moreTestlines)

env.waitUntilEventCount(3)
env.requireEventsReceived([]string{
"first log line",
"next is an empty line",
"after an empty line",
})

cancelInput()
env.waitUntilInputStops()

env.requireOffsetInRegistry(testlogName, len(testlines)+len(moreTestlines))
}

// test_empty_lines_only from test_harvester.py
// This test differs from the original because in filestream
// input offset is no longer persisted when the line is empty.
func TestFilestreamEmptyLinesOnly(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("\n\n\n")
env.mustWriteLinesToFile(testlogName, testlines)

cancelInput()
env.waitUntilInputStops()

env.requireNoEntryInRegistry(testlogName)
}

// test_bom_utf8 from test_harvester.py
func TestFilestreamBOMUTF8(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

// BOM: 0xEF,0xBB,0xBF
lines := append([]byte{0xEF, 0xBB, 0xBF}, []byte(`#Software: Microsoft Exchange Server
#Version: 14.0.0.0
#Log-type: Message Tracking Log
#Date: 2016-04-05T00:00:02.052Z
#Fields: date-time,client-ip,client-hostname,server-ip,server-hostname,source-context,connector-id,source,event-id,internal-message-id,message-id,recipient-address,recipient-status,total-bytes,recipient-count,related-recipient-address,reference,message-subject,sender-address,return-path,message-info,directionality,tenant-id,original-client-ip,original-server-ip,custom-data
2016-04-05T00:00:02.052Z,,,,,"MDB:61914740-3f1b-4ddb-94e0-557196870cfa, Mailbox:279f077c-216f-4323-a9ee-48e50ffd3cad, Event:269492708, MessageClass:IPM.Note.StorageQuotaWarning.Warning, CreationTime:2016-04-05T00:00:01.022Z, ClientType:System",,STOREDRIVER,NOTIFYMAPI,,,,,,,,,,,,,,,,,S:ItemEntryId=00-00-00-00-37-DB-F9-F9-B5-F2-42-4F-86-62-E6-5D-FC-0C-A1-41-07-00-0E-D6-03-16-80-DC-8C-44-9D-30-07-23-ED-71-B7-F7-00-00-1F-D4-B5-0E-00-00-2E-EF-F2-59-0E-E8-2D-46-BC-31-02-85-0D-67-98-43-00-00-37-4A-A3-B3-00-00
2016-04-05T00:00:02.145Z,,,,,"MDB:61914740-3f1b-4ddb-94e0-557196870cfa, Mailbox:49cb09c6-5b76-415d-a085-da0ad9079682, Event:269492711, MessageClass:IPM.Note.StorageQuotaWarning.Warning, CreationTime:2016-04-05T00:00:01.038Z, ClientType:System",,STOREDRIVER,NOTIFYMAPI,,,,,,,,,,,,,,,,,S:ItemEntryId=00-00-00-00-97-8F-07-43-51-44-61-4A-AD-BD-29-D4-97-4E-20-A0-07-00-0E-D6-03-16-80-DC-8C-44-9D-30-07-23-ED-71-B7-F7-00-8E-8F-BD-EB-57-00-00-3D-FB-CE-26-A4-8D-46-4C-A4-35-0F-A7-9B-FA-D7-B9-00-00-37-44-2F-CA-00-00
`)...)
env.mustWriteLinesToFile(testlogName, lines)

env.waitUntilEventCount(7)

cancelInput()
env.waitUntilInputStops()

messages := env.getOutputMessages()
require.Equal(t, messages[0], "#Software: Microsoft Exchange Server")
}

// test_boms from test_harvester.py
func TestFilestreamUTF16BOMs(t *testing.T) {
encodings := map[string]encoding.Encoding{
"utf-16be-bom": unicode.UTF16(unicode.BigEndian, unicode.UseBOM),
"utf-16le-bom": unicode.UTF16(unicode.LittleEndian, unicode.UseBOM),
}

for name, enc := range encodings {
name := name
encoder := enc.NewEncoder()
t.Run(name, func(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"encoding": name,
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

line := []byte("first line\n")
buf := bytes.NewBuffer(nil)
writer := transform.NewWriter(buf, encoder)
writer.Write(line)
writer.Close()

env.mustWriteLinesToFile(testlogName, buf.Bytes())

env.waitUntilEventCount(1)

env.requireEventsReceived([]string{"first line"})

cancelInput()
env.waitUntilInputStops()
})
}
}