diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index 5b582ccf6e8..e199a6e6870 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -30,7 +30,7 @@ import ( // Config stores the options of a file stream. type config struct { - readerConfig + Reader readerConfig `config:",inline"` Paths []string `config:"paths"` Close closerConfig `config:"close"` @@ -79,7 +79,7 @@ type backoffConfig struct { func defaultConfig() config { return config{ - readerConfig: defaultReaderConfig(), + Reader: defaultReaderConfig(), Paths: []string{}, Close: defaultCloserConfig(), CleanInactive: 0, diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 8f21cb505ce..9c23e18473a 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -53,14 +53,8 @@ type fileMeta struct { // are actively written by other applications. type filestream struct { readerConfig readerConfig - bufferSize int - tailFile bool // TODO encodingFactory encoding.EncodingFactory encoding encoding.Encoding - lineTerminator readfile.LineTerminator - excludeLines []match.Matcher - includeLines []match.Matcher - maxBytes int closerConfig closerConfig } @@ -97,9 +91,9 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) return nil, nil, fmt.Errorf("error while creating file identifier: %v", err) } - encodingFactory, ok := encoding.FindEncoding(config.Encoding) + encodingFactory, ok := encoding.FindEncoding(config.Reader.Encoding) if !ok || encodingFactory == nil { - return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Encoding) + return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Reader.Encoding) } prospector := &fileProspector{ @@ -111,13 +105,8 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) } filestream := &filestream{ - readerConfig: config.readerConfig, - bufferSize: config.BufferSize, + readerConfig: config.Reader, encodingFactory: encodingFactory, - lineTerminator: config.LineTerminator, - excludeLines: config.ExcludeLines, - includeLines: config.IncludeLines, - maxBytes: config.MaxBytes, closerConfig: config.Close, } @@ -191,7 +180,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri return nil, err } - log.Debug("newLogFileReader with config.MaxBytes:", inp.maxBytes) + log.Debug("newLogFileReader with config.MaxBytes:", inp.readerConfig.MaxBytes) // TODO: NewLineReader uses additional buffering to deal with encoding and testing // for new lines in input stream. Simple 8-bit based encodings, or plain @@ -211,13 +200,13 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri // for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters. // This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file. // The further size limiting is performed by LimitReader at the end of the readers pipeline as needed. - encReaderMaxBytes := inp.maxBytes * 4 + encReaderMaxBytes := inp.readerConfig.MaxBytes * 4 var r reader.Reader r, err = readfile.NewEncodeReader(dbgReader, readfile.Config{ Codec: inp.encoding, - BufferSize: inp.bufferSize, - Terminator: inp.lineTerminator, + BufferSize: inp.readerConfig.BufferSize, + Terminator: inp.readerConfig.LineTerminator, MaxBytes: encReaderMaxBytes, }) if err != nil { @@ -225,8 +214,8 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri return nil, err } - r = readfile.NewStripNewline(r, inp.lineTerminator) - r = readfile.NewLimitReader(r, inp.maxBytes) + r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator) + r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes) return r, nil } @@ -335,14 +324,14 @@ func (inp *filestream) readFromSource( // isDroppedLine decides if the line is exported or not based on // the include_lines and exclude_lines options. func (inp *filestream) isDroppedLine(log *logp.Logger, line string) bool { - if len(inp.includeLines) > 0 { - if !matchAny(inp.includeLines, line) { + if len(inp.readerConfig.IncludeLines) > 0 { + if !matchAny(inp.readerConfig.IncludeLines, line) { log.Debug("Drop line as it does not match any of the include patterns %s", line) return true } } - if len(inp.excludeLines) > 0 { - if matchAny(inp.excludeLines, line) { + if len(inp.readerConfig.ExcludeLines) > 0 { + if matchAny(inp.readerConfig.ExcludeLines, line) { log.Debug("Drop line as it does match one of the exclude patterns%s", line) return true } diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index e3c08b434ac..71da3182852 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -20,9 +20,15 @@ package filestream import ( + "bytes" "context" "runtime" "testing" + + "github.com/stretchr/testify/require" + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/unicode" + "golang.org/x/text/transform" ) // test_close_renamed from test_harvester.py @@ -100,3 +106,134 @@ func TestFilestreamCloseEOF(t *testing.T) { env.requireOffsetInRegistry(testlogName, expectedOffset) } + +// test_empty_lines from test_harvester.py +func TestFilestreamEmptyLine(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + }) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + testlines := []byte("first log line\nnext is an empty line\n") + env.mustWriteLinesToFile(testlogName, testlines) + + env.waitUntilEventCount(2) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + moreTestlines := []byte("\nafter an empty line\n") + env.mustAppendLinesToFile(testlogName, moreTestlines) + + env.waitUntilEventCount(3) + env.requireEventsReceived([]string{ + "first log line", + "next is an empty line", + "after an empty line", + }) + + cancelInput() + env.waitUntilInputStops() + + env.requireOffsetInRegistry(testlogName, len(testlines)+len(moreTestlines)) +} + +// test_empty_lines_only from test_harvester.py +// This test differs from the original because in filestream +// input offset is no longer persisted when the line is empty. +func TestFilestreamEmptyLinesOnly(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + }) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + testlines := []byte("\n\n\n") + env.mustWriteLinesToFile(testlogName, testlines) + + cancelInput() + env.waitUntilInputStops() + + env.requireNoEntryInRegistry(testlogName) +} + +// test_bom_utf8 from test_harvester.py +func TestFilestreamBOMUTF8(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + }) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + // BOM: 0xEF,0xBB,0xBF + lines := append([]byte{0xEF, 0xBB, 0xBF}, []byte(`#Software: Microsoft Exchange Server +#Version: 14.0.0.0 +#Log-type: Message Tracking Log +#Date: 2016-04-05T00:00:02.052Z +#Fields: date-time,client-ip,client-hostname,server-ip,server-hostname,source-context,connector-id,source,event-id,internal-message-id,message-id,recipient-address,recipient-status,total-bytes,recipient-count,related-recipient-address,reference,message-subject,sender-address,return-path,message-info,directionality,tenant-id,original-client-ip,original-server-ip,custom-data +2016-04-05T00:00:02.052Z,,,,,"MDB:61914740-3f1b-4ddb-94e0-557196870cfa, Mailbox:279f077c-216f-4323-a9ee-48e50ffd3cad, Event:269492708, MessageClass:IPM.Note.StorageQuotaWarning.Warning, CreationTime:2016-04-05T00:00:01.022Z, ClientType:System",,STOREDRIVER,NOTIFYMAPI,,,,,,,,,,,,,,,,,S:ItemEntryId=00-00-00-00-37-DB-F9-F9-B5-F2-42-4F-86-62-E6-5D-FC-0C-A1-41-07-00-0E-D6-03-16-80-DC-8C-44-9D-30-07-23-ED-71-B7-F7-00-00-1F-D4-B5-0E-00-00-2E-EF-F2-59-0E-E8-2D-46-BC-31-02-85-0D-67-98-43-00-00-37-4A-A3-B3-00-00 +2016-04-05T00:00:02.145Z,,,,,"MDB:61914740-3f1b-4ddb-94e0-557196870cfa, Mailbox:49cb09c6-5b76-415d-a085-da0ad9079682, Event:269492711, MessageClass:IPM.Note.StorageQuotaWarning.Warning, CreationTime:2016-04-05T00:00:01.038Z, ClientType:System",,STOREDRIVER,NOTIFYMAPI,,,,,,,,,,,,,,,,,S:ItemEntryId=00-00-00-00-97-8F-07-43-51-44-61-4A-AD-BD-29-D4-97-4E-20-A0-07-00-0E-D6-03-16-80-DC-8C-44-9D-30-07-23-ED-71-B7-F7-00-8E-8F-BD-EB-57-00-00-3D-FB-CE-26-A4-8D-46-4C-A4-35-0F-A7-9B-FA-D7-B9-00-00-37-44-2F-CA-00-00 +`)...) + env.mustWriteLinesToFile(testlogName, lines) + + env.waitUntilEventCount(7) + + cancelInput() + env.waitUntilInputStops() + + messages := env.getOutputMessages() + require.Equal(t, messages[0], "#Software: Microsoft Exchange Server") +} + +// test_boms from test_harvester.py +func TestFilestreamUTF16BOMs(t *testing.T) { + encodings := map[string]encoding.Encoding{ + "utf-16be-bom": unicode.UTF16(unicode.BigEndian, unicode.UseBOM), + "utf-16le-bom": unicode.UTF16(unicode.LittleEndian, unicode.UseBOM), + } + + for name, enc := range encodings { + name := name + encoder := enc.NewEncoder() + t.Run(name, func(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "encoding": name, + }) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + line := []byte("first line\n") + buf := bytes.NewBuffer(nil) + writer := transform.NewWriter(buf, encoder) + writer.Write(line) + writer.Close() + + env.mustWriteLinesToFile(testlogName, buf.Bytes()) + + env.waitUntilEventCount(1) + + env.requireEventsReceived([]string{"first line"}) + + cancelInput() + env.waitUntilInputStops() + }) + } +}