diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index dde5c99c772..14236d71824 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -472,6 +472,7 @@ field. You can revert this change by configuring tags for the module and omittin - Add geoip AS lookup & improve ECS categorization in aws cloudtrail fileset. {issue}18644[18644] {pull}18958[18958] - Improved performance of PANW sample dashboards. {issue}19031[19031] {pull}19032[19032] - Add support for v1 consumer API in Cloud Foundry input, use it by default. {pull}19125[19125] +- Add new mode to multiline reader to aggregate constant number of lines {pull}18352[18352] *Heartbeat* diff --git a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl index db105f17ef0..ae4816f4f82 100644 --- a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl +++ b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl @@ -144,6 +144,15 @@ filebeat.inputs: # Default is 5s. #multiline.timeout: 5s + # To aggregate constant number of lines into a single event use the count mode of multiline. + #multiline.type: count + + # The number of lines to aggregate into a single event. + #multiline.count_lines: 3 + + # Do not add new line character when concatenating lines. + #multiline.skip_newline: false + # Setting tail_files to true means filebeat starts reading new files at the end # instead of the beginning. If this is used in combination with log rotation # this can mean that the first entries of a new file are skipped. diff --git a/filebeat/docs/multiline.asciidoc b/filebeat/docs/multiline.asciidoc index cc72366629a..546f71d3276 100644 --- a/filebeat/docs/multiline.asciidoc +++ b/filebeat/docs/multiline.asciidoc @@ -29,6 +29,7 @@ The following example shows how to configure {beatname_uc} to handle a multiline [source,yaml] ------------------------------------------------------------------------------------- +multiline.type: pattern multiline.pattern: '^\[' multiline.negate: true multiline.match: after @@ -46,6 +47,8 @@ multiline.match: after at org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction.checkBlock(TransportDeleteIndexAction.java:75) ------------------------------------------------------------------------------------- +*`multiline.type`*:: Defines which aggregation method to use. The default is `pattern`. The other option +is `count` which lets you aggregate constant number of lines. *`multiline.pattern`*:: Specifies the regular expression pattern to match. Note that the regexp patterns supported by {beatname_uc} differ somewhat from the patterns supported by Logstash. See <> for a list of supported regexp patterns. @@ -76,6 +79,10 @@ lines are discarded. The default is 500. *`multiline.timeout`*:: After the specified timeout, {beatname_uc} sends the multiline event even if no new pattern is found to start a new event. The default is 5s. +*`multiline.count_lines`*:: The number of lines to aggregate into a single event. + +*`multiline.skip_newline`*:: When set, multiline events are concatenated without a line separator. + ==== Examples of multiline configuration @@ -103,6 +110,7 @@ To consolidate these lines into a single event in {beatname_uc}, use the followi [source,yaml] ------------------------------------------------------------------------------------- +multiline.type: pattern multiline.pattern: '^[[:space:]]' multiline.negate: false multiline.match: after @@ -127,6 +135,7 @@ To consolidate these lines into a single event in {beatname_uc}, use the followi [source,yaml] ------------------------------------------------------------------------------------- +multiline.type: pattern multiline.pattern: '^[[:space:]]+(at|\.{3})[[:space:]]+\b|^Caused by:' multiline.negate: false multiline.match: after @@ -153,6 +162,7 @@ To consolidate these lines into a single event in {beatname_uc}, use the followi [source,yaml] ------------------------------------------------------------------------------------- +multiline.type: pattern multiline.pattern: '\\$' multiline.negate: false multiline.match: before @@ -176,6 +186,7 @@ To consolidate these lines into a single event in {beatname_uc}, use the followi [source,yaml] ------------------------------------------------------------------------------------- +multiline.type: pattern multiline.pattern: '^\[[0-9]{4}-[0-9]{2}-[0-9]{2}' multiline.negate: true multiline.match: after @@ -200,6 +211,7 @@ To consolidate this as a single event in {beatname_uc}, use the following multil [source,yaml] ------------------------------------------------------------------------------------- +multiline.type: pattern multiline.pattern: 'Start new event' multiline.negate: true multiline.match: after diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 0a3d03d98be..1a17cd9a514 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -531,6 +531,15 @@ filebeat.inputs: # Default is 5s. #multiline.timeout: 5s + # To aggregate constant number of lines into a single event use the count mode of multiline. + #multiline.type: count + + # The number of lines to aggregate into a single event. + #multiline.count_lines: 3 + + # Do not add new line character when concatenating lines. + #multiline.skip_newline: false + # Setting tail_files to true means filebeat starts reading new files at the end # instead of the beginning. If this is used in combination with log rotation # this can mean that the first entries of a new file are skipped. diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 2275d761bca..4f74323ead1 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -86,6 +86,7 @@ filebeat.{{input_config | default("inputs")}}: {% if multiline %} multiline: + type: {{multiline_type}} pattern: {{pattern}} negate: {{negate}} match: {{match}} diff --git a/filebeat/tests/system/test_multiline.py b/filebeat/tests/system/test_multiline.py index ed7a42a1289..9366028f75a 100644 --- a/filebeat/tests/system/test_multiline.py +++ b/filebeat/tests/system/test_multiline.py @@ -17,6 +17,7 @@ def test_java_elasticsearch_log(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", multiline=True, + multiline_type="pattern", pattern="^\[", negate="true", match="after" @@ -48,6 +49,7 @@ def test_c_style_log(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", multiline=True, + multiline_type="pattern", pattern="\\\\$", match="before" ) @@ -78,6 +80,7 @@ def test_rabbitmq_multiline_log(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", multiline=True, + multiline_type="pattern", pattern="^=[A-Z]+", match="after", negate="true", @@ -122,6 +125,7 @@ def test_max_lines(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", multiline=True, + multiline_type="pattern", pattern="^\[", negate="true", match="after", @@ -160,6 +164,7 @@ def test_timeout(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", multiline=True, + multiline_type="pattern", pattern="^\[", negate="true", match="after", @@ -204,6 +209,7 @@ def test_max_bytes(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", multiline=True, + multiline_type="pattern", pattern="^\[", negate="true", match="after", @@ -240,6 +246,7 @@ def test_close_timeout_with_multiline(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", multiline=True, + multiline_type="pattern", pattern="^\[", negate="true", match="after", @@ -295,6 +302,7 @@ def test_consecutive_newline(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", multiline=True, + multiline_type="pattern", pattern="^\[", negate="true", match="after", @@ -340,11 +348,12 @@ def test_invalid_config(self): self.render_config_template( path=os.path.abspath(self.working_dir + "/log/") + "*", multiline=True, + multiline_type="pattern", match="after", ) proc = self.start_beat() - self.wait_until(lambda: self.log_contains("missing required field accessing") == 1) + self.wait_until(lambda: self.log_contains("multiline.pattern cannot be empty") == 1) proc.check_kill_and_wait(exit_code=1) diff --git a/libbeat/reader/multiline/counter.go b/libbeat/reader/multiline/counter.go new file mode 100644 index 00000000000..bd410bc4ef7 --- /dev/null +++ b/libbeat/reader/multiline/counter.go @@ -0,0 +1,133 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package multiline + +import ( + "github.com/elastic/beats/v7/libbeat/reader" +) + +type counterReader struct { + reader reader.Reader + state func(*counterReader) (reader.Message, error) + linesCount int // number of lines to collect + msgBuffer *messageBuffer +} + +func newMultilineCountReader( + r reader.Reader, + separator string, + maxBytes int, + config *Config, +) (reader.Reader, error) { + maxLines := config.LinesCount + if l := config.MaxLines; l != nil && 0 < *l { + maxLines = *l + } + + return &counterReader{ + reader: r, + state: (*counterReader).readFirst, + linesCount: config.LinesCount, + msgBuffer: newMessageBuffer(maxBytes, maxLines, []byte(separator), config.SkipNewLine), + }, nil +} + +// Next returns next multi-line event. +func (cr *counterReader) Next() (reader.Message, error) { + return cr.state(cr) +} + +func (cr *counterReader) readFirst() (reader.Message, error) { + for { + message, err := cr.reader.Next() + if err != nil { + return message, err + } + + if message.Bytes == 0 { + continue + } + + cr.msgBuffer.startNewMessage(message) + if cr.msgBuffer.processedLines == cr.linesCount { + msg := cr.msgBuffer.finalize() + return msg, nil + } + + cr.setState((*counterReader).readNext) + return cr.readNext() + } +} + +func (cr *counterReader) readNext() (reader.Message, error) { + for { + message, err := cr.reader.Next() + if err != nil { + // handle error without any bytes returned from reader + if message.Bytes == 0 { + // no lines buffered -> return error + if cr.msgBuffer.isEmpty() { + return reader.Message{}, err + } + + // lines buffered, return multiline and error on next read + msg := cr.msgBuffer.finalize() + cr.msgBuffer.setErr(err) + cr.setState((*counterReader).readFailed) + return msg, nil + } + + // handle error with some content being returned by reader and + // line matching multiline criteria or no multiline started yet + if cr.msgBuffer.isEmptyMessage() { + cr.msgBuffer.addLine(message) + + // return multiline and error on next read + msg := cr.msgBuffer.finalize() + cr.msgBuffer.setErr(err) + cr.setState((*counterReader).readFailed) + return msg, nil + } + } + + // add line to current multiline event + cr.msgBuffer.addLine(message) + if cr.msgBuffer.processedLines == cr.linesCount { + msg := cr.msgBuffer.finalize() + cr.setState((*counterReader).readFirst) + return msg, nil + } + } +} + +func (cr *counterReader) readFailed() (reader.Message, error) { + err := cr.msgBuffer.err + cr.msgBuffer.setErr(nil) + cr.resetState() + return reader.Message{}, err +} + +// resetState sets state of the reader to readFirst +func (cr *counterReader) resetState() { + cr.setState((*counterReader).readFirst) +} + +// setState sets state to the given function +func (cr *counterReader) setState(next func(cr *counterReader) (reader.Message, error)) { + cr.state = next +} diff --git a/libbeat/reader/multiline/message_buffer.go b/libbeat/reader/multiline/message_buffer.go new file mode 100644 index 00000000000..506efc7599d --- /dev/null +++ b/libbeat/reader/multiline/message_buffer.go @@ -0,0 +1,147 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package multiline + +import ( + "github.com/elastic/beats/v7/libbeat/reader" +) + +type messageBuffer struct { + maxBytes int // bytes stored in content + maxLines int + separator []byte + skipNewline bool + last []byte + numLines int + processedLines int + truncated int + err error // last seen error + message reader.Message +} + +func newMessageBuffer(maxBytes, maxLines int, separator []byte, skipNewline bool) *messageBuffer { + return &messageBuffer{ + maxBytes: maxBytes, + maxLines: maxLines, + separator: separator, + skipNewline: skipNewline, + message: reader.Message{}, + err: nil, + } +} + +func (b *messageBuffer) startNewMessage(msg reader.Message) { + b.clear() + b.load(msg) +} + +// load loads the reader with the given message. It is recommended to either +// run clear or finalize before. +func (b *messageBuffer) load(m reader.Message) { + b.addLine(m) + // Timestamp of first message is taken as overall timestamp + b.message.Ts = m.Ts + b.message.AddFields(m.Fields) +} + +// clearBuffer resets the reader buffer variables +func (b *messageBuffer) clear() { + b.message = reader.Message{} + b.last = nil + b.numLines = 0 + b.processedLines = 0 + b.truncated = 0 + b.err = nil +} + +// addLine adds the read content to the message +// The content is only added if maxBytes and maxLines is not exceed. In case one of the +// two is exceeded, addLine keeps processing but does not add it to the content. +func (b *messageBuffer) addLine(m reader.Message) { + if m.Bytes <= 0 { + return + } + + sz := len(b.message.Content) + addSeparator := len(b.message.Content) > 0 && len(b.separator) > 0 && !b.skipNewline + if addSeparator { + sz += len(b.separator) + } + + space := b.maxBytes - sz + + maxBytesReached := (b.maxBytes <= 0 || space > 0) + maxLinesReached := (b.maxLines <= 0 || b.numLines < b.maxLines) + + if maxBytesReached && maxLinesReached { + if space < 0 || space > len(m.Content) { + space = len(m.Content) + } + + tmp := b.message.Content + if addSeparator { + tmp = append(tmp, b.separator...) + } + b.message.Content = append(tmp, m.Content[:space]...) + b.numLines++ + + // add number of truncated bytes to fields + diff := len(m.Content) - space + if diff > 0 { + b.truncated += diff + } + } else { + // increase the number of skipped bytes, if cannot add + b.truncated += len(m.Content) + + } + b.processedLines++ + + b.last = m.Content + b.message.Bytes += m.Bytes + b.message.AddFields(m.Fields) +} + +// finalize writes the existing content into the returned message and resets all reader variables. +func (b *messageBuffer) finalize() reader.Message { + if b.truncated > 0 { + b.message.AddFlagsWithKey("log.flags", "truncated") + } + + if b.numLines > 1 { + b.message.AddFlagsWithKey("log.flags", "multiline") + } + + // Copy message from existing content + msg := b.message + + b.clear() + return msg +} + +func (b *messageBuffer) setErr(err error) { + b.err = err +} + +func (b *messageBuffer) isEmpty() bool { + return b.numLines == 0 +} + +func (b *messageBuffer) isEmptyMessage() bool { + return b.message.Bytes == 0 +} diff --git a/libbeat/reader/multiline/message_buffer_test.go b/libbeat/reader/multiline/message_buffer_test.go new file mode 100644 index 00000000000..4816ef01a37 --- /dev/null +++ b/libbeat/reader/multiline/message_buffer_test.go @@ -0,0 +1,147 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package multiline + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/reader" +) + +func TestMessageBufferAddLine(t *testing.T) { + testcases := map[string]struct { + skipNewline bool + lines [][]byte + expected reader.Message + }{ + "concatenating two events with newlines": { + skipNewline: false, + lines: [][]byte{ + []byte("line1"), + []byte("line2"), + }, + expected: reader.Message{ + Content: []byte("line1\nline2"), + }, + }, + "concatenating two events without newlines": { + skipNewline: true, + lines: [][]byte{ + []byte("{\"key1\": \"value\","), + []byte("\"key2\": \"value\"}"), + }, + expected: reader.Message{ + Content: []byte("{\"key1\": \"value\",\"key2\": \"value\"}"), + }, + }, + } + + for name, test := range testcases { + test := test + + t.Run(name, func(t *testing.T) { + buf := getTestMessageBuffer(1024, test.skipNewline, nil) + for _, l := range test.lines { + m := reader.Message{Content: l, Bytes: len(l)} + buf.addLine(m) + } + assert.Equal(t, test.expected.Content, buf.message.Content) + }) + } +} + +func TestFinalizeMessage(t *testing.T) { + testcases := map[string]struct { + maxBytes int + lines [][]byte + expected reader.Message + }{ + "one liner with no flags": { + maxBytes: 1024, + lines: [][]byte{ + []byte("one line"), + }, + expected: reader.Message{ + Content: []byte("one line"), + }, + }, + "truncated one liner message": { + maxBytes: 20, + lines: [][]byte{ + []byte("tooooooooooooooooooo looooooong line"), + }, + expected: reader.Message{ + Content: []byte("tooooooooooooooooooo"), + Fields: common.MapStr{"log": common.MapStr{"flags": []string{"truncated"}}}, + }, + }, + "untruncated multiline message": { + maxBytes: 1024, + lines: [][]byte{ + []byte("line1"), + []byte("line2"), + }, + expected: reader.Message{ + Content: []byte("line1\nline2"), + Fields: common.MapStr{"log": common.MapStr{"flags": []string{"multiline"}}}, + }, + }, + "truncated multiline message": { + maxBytes: 8, + lines: [][]byte{ + []byte("line1"), + []byte("line2"), + }, + expected: reader.Message{ + Content: []byte("line1\nli"), + Fields: common.MapStr{"log": common.MapStr{"flags": []string{"truncated", "multiline"}}}, + }, + }, + } + + for name, test := range testcases { + test := test + + t.Run(name, func(t *testing.T) { + var messages []reader.Message + for _, l := range test.lines { + messages = append(messages, reader.Message{Content: l, Bytes: len(l)}) + } + buf := getTestMessageBuffer(test.maxBytes, false, messages) + actualMsg := buf.finalize() + + assert.Equal(t, test.expected.Content, actualMsg.Content) + assert.Equal(t, test.expected.Fields, actualMsg.Fields) + }) + } + +} + +func getTestMessageBuffer(maxBytes int, skipNewline bool, messages []reader.Message) *messageBuffer { + buf := newMessageBuffer(maxBytes, 5, []byte("\n"), skipNewline) + buf.clear() + + for _, m := range messages { + buf.addLine(m) + } + + return buf +} diff --git a/libbeat/reader/multiline/multiline.go b/libbeat/reader/multiline/multiline.go index 36bd8f14eaf..04f5941c11d 100644 --- a/libbeat/reader/multiline/multiline.go +++ b/libbeat/reader/multiline/multiline.go @@ -18,57 +18,9 @@ package multiline import ( - "errors" "fmt" - "time" - "github.com/elastic/beats/v7/libbeat/common/match" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/reader" - "github.com/elastic/beats/v7/libbeat/reader/readfile" -) - -// MultiLine reader combining multiple line events into one multi-line event. -// -// Lines to be combined are matched by some configurable predicate using -// regular expression. -// -// The maximum number of bytes and lines to be returned is fully configurable. -// Even if limits are reached subsequent lines are matched, until event is -// fully finished. -// -// Errors will force the multiline reader to return the currently active -// multiline event first and finally return the actual error on next call to Next. -type Reader struct { - reader reader.Reader - pred matcher - flushMatcher *match.Matcher - maxBytes int // bytes stored in content - maxLines int - separator []byte - last []byte - numLines int - truncated int - err error // last seen error - state func(*Reader) (reader.Message, error) - message reader.Message - logger *logp.Logger -} - -const ( - // Default maximum number of lines to return in one multi-line event - defaultMaxLines = 500 - - // Default timeout to finish a multi-line event. - defaultMultilineTimeout = 5 * time.Second -) - -// Matcher represents the predicate comparing any two lines -// to find start and end of multiline events in stream of line events. -type matcher func(last, current []byte) bool - -var ( - sigMultilineTimeout = errors.New("multiline timeout") ) // New creates a new multi-line reader combining stream of @@ -78,298 +30,11 @@ func New( separator string, maxBytes int, config *Config, -) (*Reader, error) { - types := map[string]func(match.Matcher) (matcher, error){ - "before": beforeMatcher, - "after": afterMatcher, - } - - matcherType, ok := types[config.Match] - if !ok { - return nil, fmt.Errorf("unknown matcher type: %s", config.Match) - } - - matcher, err := matcherType(*config.Pattern) - if err != nil { - return nil, err - } - - flushMatcher := config.FlushPattern - - if config.Negate { - matcher = negatedMatcher(matcher) - } - - maxLines := defaultMaxLines - if config.MaxLines != nil { - maxLines = *config.MaxLines - } - - tout := defaultMultilineTimeout - if config.Timeout != nil { - tout = *config.Timeout - if tout < 0 { - return nil, fmt.Errorf("timeout %v must not be negative", config.Timeout) - } - } - - if tout > 0 { - r = readfile.NewTimeoutReader(r, sigMultilineTimeout, tout) - } - - mlr := &Reader{ - reader: r, - pred: matcher, - flushMatcher: flushMatcher, - state: (*Reader).readFirst, - maxBytes: maxBytes, - maxLines: maxLines, - separator: []byte(separator), - message: reader.Message{}, - logger: logp.NewLogger("reader_multiline"), - } - return mlr, nil -} - -// Next returns next multi-line event. -func (mlr *Reader) Next() (reader.Message, error) { - return mlr.state(mlr) -} - -func (mlr *Reader) readFirst() (reader.Message, error) { - for { - message, err := mlr.reader.Next() - if err != nil { - // no lines buffered -> ignore timeout - if err == sigMultilineTimeout { - continue - } - - mlr.logger.Debug("Multiline event flushed because timeout reached.") - - // pass error to caller (next layer) for handling - return message, err - } - - if message.Bytes == 0 { - continue - } - - // Start new multiline event - mlr.clear() - mlr.load(message) - mlr.setState((*Reader).readNext) - return mlr.readNext() - } -} - -func (mlr *Reader) readNext() (reader.Message, error) { - for { - message, err := mlr.reader.Next() - if err != nil { - // handle multiline timeout signal - if err == sigMultilineTimeout { - // no lines buffered -> ignore timeout - if mlr.numLines == 0 { - continue - } - - mlr.logger.Debug("Multiline event flushed because timeout reached.") - - // return collected multiline event and - // empty buffer for new multiline event - msg := mlr.finalize() - mlr.resetState() - return msg, nil - } - - // handle error without any bytes returned from reader - if message.Bytes == 0 { - // no lines buffered -> return error - if mlr.numLines == 0 { - return reader.Message{}, err - } - - // lines buffered, return multiline and error on next read - msg := mlr.finalize() - mlr.err = err - mlr.setState((*Reader).readFailed) - return msg, nil - } - - // handle error with some content being returned by reader and - // line matching multiline criteria or no multiline started yet - if mlr.message.Bytes == 0 || mlr.pred(mlr.last, message.Content) { - mlr.addLine(message) - - // return multiline and error on next read - msg := mlr.finalize() - mlr.err = err - mlr.setState((*Reader).readFailed) - return msg, nil - } - - // no match, return current multiline and retry with current line on next - // call to readNext awaiting the error being reproduced (or resolved) - // in next call to Next - msg := mlr.finalize() - mlr.load(message) - return msg, nil - } - - // handle case when endPattern is reached - if mlr.flushMatcher != nil { - endPatternReached := (mlr.flushMatcher.Match(message.Content)) - - if endPatternReached == true { - // return collected multiline event and - // empty buffer for new multiline event - mlr.addLine(message) - msg := mlr.finalize() - mlr.resetState() - return msg, nil - } - } - - // if predicate does not match current multiline -> return multiline event - if mlr.message.Bytes > 0 && !mlr.pred(mlr.last, message.Content) { - msg := mlr.finalize() - mlr.load(message) - return msg, nil - } - - // add line to current multiline event - mlr.addLine(message) - } -} - -// readFailed returns empty message and error and resets line reader -func (mlr *Reader) readFailed() (reader.Message, error) { - err := mlr.err - mlr.err = nil - mlr.resetState() - return reader.Message{}, err -} - -// load loads the reader with the given message. It is recommend to either -// run clear or finalize before. -func (mlr *Reader) load(m reader.Message) { - mlr.addLine(m) - // Timestamp of first message is taken as overall timestamp - mlr.message.Ts = m.Ts - mlr.message.AddFields(m.Fields) -} - -// clearBuffer resets the reader buffer variables -func (mlr *Reader) clear() { - mlr.message = reader.Message{} - mlr.last = nil - mlr.numLines = 0 - mlr.truncated = 0 - mlr.err = nil -} - -// finalize writes the existing content into the returned message and resets all reader variables. -func (mlr *Reader) finalize() reader.Message { - if mlr.truncated > 0 { - mlr.message.AddFlagsWithKey("log.flags", "truncated") - } - - if mlr.numLines > 1 { - mlr.message.AddFlagsWithKey("log.flags", "multiline") - } - - // Copy message from existing content - msg := mlr.message - - mlr.clear() - return msg -} - -// addLine adds the read content to the message -// The content is only added if maxBytes and maxLines is not exceed. In case one of the -// two is exceeded, addLine keeps processing but does not add it to the content. -func (mlr *Reader) addLine(m reader.Message) { - if m.Bytes <= 0 { - return - } - - sz := len(mlr.message.Content) - addSeparator := len(mlr.message.Content) > 0 && len(mlr.separator) > 0 - if addSeparator { - sz += len(mlr.separator) - } - - space := mlr.maxBytes - sz - - maxBytesReached := (mlr.maxBytes <= 0 || space > 0) - maxLinesReached := (mlr.maxLines <= 0 || mlr.numLines < mlr.maxLines) - - if maxBytesReached && maxLinesReached { - if space < 0 || space > len(m.Content) { - space = len(m.Content) - } - - tmp := mlr.message.Content - if addSeparator { - tmp = append(tmp, mlr.separator...) - } - mlr.message.Content = append(tmp, m.Content[:space]...) - mlr.numLines++ - - // add number of truncated bytes to fields - diff := len(m.Content) - space - if diff > 0 { - mlr.truncated += diff - } - } else { - // increase the number of skipped bytes, if cannot add - mlr.truncated += len(m.Content) - - } - - mlr.last = m.Content - mlr.message.Bytes += m.Bytes - mlr.message.AddFields(m.Fields) -} - -// resetState sets state of the reader to readFirst -func (mlr *Reader) resetState() { - mlr.setState((*Reader).readFirst) -} - -// setState sets state to the given function -func (mlr *Reader) setState(next func(mlr *Reader) (reader.Message, error)) { - mlr.state = next -} - -// matchers - -func afterMatcher(pat match.Matcher) (matcher, error) { - return genPatternMatcher(pat, func(last, current []byte) []byte { - return current - }) -} - -func beforeMatcher(pat match.Matcher) (matcher, error) { - return genPatternMatcher(pat, func(last, current []byte) []byte { - return last - }) -} - -func negatedMatcher(m matcher) matcher { - return func(last, current []byte) bool { - return !m(last, current) - } -} - -func genPatternMatcher( - pat match.Matcher, - sel func(last, current []byte) []byte, -) (matcher, error) { - matcher := func(last, current []byte) bool { - line := sel(last, current) - return pat.Match(line) +) (reader.Reader, error) { + if config.Type == patternMode { + return newMultilinePatternReader(r, separator, maxBytes, config) + } else if config.Type == countMode { + return newMultilineCountReader(r, separator, maxBytes, config) } - return matcher, nil + return nil, fmt.Errorf("unknown multiline type %d", config.Type) } diff --git a/libbeat/reader/multiline/multiline_config.go b/libbeat/reader/multiline/multiline_config.go index 93c155c3907..586816c55e3 100644 --- a/libbeat/reader/multiline/multiline_config.go +++ b/libbeat/reader/multiline/multiline_config.go @@ -24,20 +24,67 @@ import ( "github.com/elastic/beats/v7/libbeat/common/match" ) +type multilineType uint8 + +const ( + patternMode multilineType = iota + countMode + + patternStr = "pattern" + countStr = "count" +) + +var ( + multilineTypes = map[string]multilineType{ + patternStr: patternMode, + countStr: countMode, + } +) + // Config holds the options of multiline readers. type Config struct { + Type multilineType `config:"type"` + Negate bool `config:"negate"` - Match string `config:"match" validate:"required"` + Match string `config:"match"` MaxLines *int `config:"max_lines"` - Pattern *match.Matcher `config:"pattern" validate:"required"` + Pattern *match.Matcher `config:"pattern"` Timeout *time.Duration `config:"timeout" validate:"positive"` FlushPattern *match.Matcher `config:"flush_pattern"` + + LinesCount int `config:"count_lines" validate:"positive"` + SkipNewLine bool `config:"skip_newline"` } // Validate validates the Config option for multiline reader. func (c *Config) Validate() error { - if c.Match != "after" && c.Match != "before" { - return fmt.Errorf("unknown matcher type: %s", c.Match) + if c.Type == patternMode { + if c.Match != "after" && c.Match != "before" { + return fmt.Errorf("unknown matcher type: %s", c.Match) + } + if c.Pattern == nil { + return fmt.Errorf("multiline.pattern cannot be empty when pattern based matching is selected") + } + } else if c.Type == countMode { + if c.LinesCount == 0 { + return fmt.Errorf("multiline.count_lines cannot be zero when count based is selected") + } + } + return nil +} + +// Unpack selects the approriate aggregation method for creating multiline events. +// If it is not configured pattern matching is chosen. +func (m *multilineType) Unpack(value string) error { + if value == "" { + *m = patternMode + return nil + } + + s, ok := multilineTypes[value] + if !ok { + return fmt.Errorf("unknown multiline type: %s", value) } + *m = s return nil } diff --git a/libbeat/reader/multiline/multiline_test.go b/libbeat/reader/multiline/multiline_test.go index a17c79c53d6..2297fbc98b5 100644 --- a/libbeat/reader/multiline/multiline_test.go +++ b/libbeat/reader/multiline/multiline_test.go @@ -47,6 +47,7 @@ func TestMultilineAfterOK(t *testing.T) { pattern := match.MustCompile(`^[ \t] +`) // next line is indented by spaces testMultilineOK(t, Config{ + Type: patternMode, Pattern: &pattern, Match: "after", }, @@ -61,6 +62,7 @@ func TestMultilineBeforeOK(t *testing.T) { testMultilineOK(t, Config{ + Type: patternMode, Pattern: &pattern, Match: "before", }, @@ -75,6 +77,7 @@ func TestMultilineAfterNegateOK(t *testing.T) { testMultilineOK(t, Config{ + Type: patternMode, Pattern: &pattern, Negate: true, Match: "after", @@ -90,6 +93,7 @@ func TestMultilineBeforeNegateOK(t *testing.T) { testMultilineOK(t, Config{ + Type: patternMode, Pattern: &pattern, Negate: true, Match: "before", @@ -106,6 +110,7 @@ func TestMultilineAfterNegateOKFlushPattern(t *testing.T) { testMultilineOK(t, Config{ + Type: patternMode, Pattern: &pattern, Negate: true, Match: "after", @@ -124,6 +129,7 @@ func TestMultilineAfterNegateOKFlushPatternWhereTheFirstLinesDosentMatchTheStart testMultilineOK(t, Config{ + Type: patternMode, Pattern: &pattern, Negate: true, Match: "after", @@ -140,6 +146,7 @@ func TestMultilineBeforeNegateOKWithEmptyLine(t *testing.T) { pattern := match.MustCompile(`;$`) // last line ends with ';' testMultilineOK(t, Config{ + Type: patternMode, Pattern: &pattern, Negate: true, Match: "before", @@ -155,6 +162,7 @@ func TestMultilineAfterTruncated(t *testing.T) { maxLines := 2 testMultilineTruncated(t, Config{ + Type: patternMode, Pattern: &pattern, Match: "after", MaxLines: &maxLines, @@ -170,6 +178,7 @@ func TestMultilineAfterTruncated(t *testing.T) { ) testMultilineTruncated(t, Config{ + Type: patternMode, Pattern: &pattern, Match: "after", MaxLines: &maxLines, @@ -185,6 +194,53 @@ func TestMultilineAfterTruncated(t *testing.T) { ) } +func TestMultilineCount(t *testing.T) { + maxLines := 2 + testMultilineOK(t, + Config{ + Type: countMode, + MaxLines: &maxLines, + LinesCount: 2, + }, + 2, + "line1\n line1.1\n", + "line2\n line2.1\n", + ) + maxLines = 4 + testMultilineOK(t, + Config{ + Type: countMode, + MaxLines: &maxLines, + LinesCount: 4, + }, + 2, + "line1\n line1.1\nline2\n line2.1\n", + "line3\n line3.1\nline4\n line4.1\n", + ) + maxLines = 1 + testMultilineOK(t, + Config{ + Type: countMode, + MaxLines: &maxLines, + LinesCount: 1, + }, + 8, + "line1\n", "line1.1\n", "line2\n", "line2.1\n", "line3\n", "line3.1\n", "line4\n", "line4.1\n", + ) + maxLines = 2 + testMultilineTruncated(t, + Config{ + Type: countMode, + MaxLines: &maxLines, + LinesCount: 3, + }, + 4, + true, + []string{"line1\n line1.1\n line1.2\n", "line2\n line2.1\n line2.2\n", "line3\n line3.1\n line3.2\n", "line4\n line4.1\n line4.3\n"}, + []string{"line1\n", "line2\n", "line3\n", "line4\n"}, + ) +} + func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) { _, buf := createLineBuffer(expected...) r := createMultilineTestReader(t, buf, cfg) diff --git a/libbeat/reader/multiline/pattern.go b/libbeat/reader/multiline/pattern.go new file mode 100644 index 00000000000..116af80135a --- /dev/null +++ b/libbeat/reader/multiline/pattern.go @@ -0,0 +1,285 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package multiline + +import ( + "errors" + "fmt" + "time" + + "github.com/elastic/beats/v7/libbeat/common/match" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/reader" + "github.com/elastic/beats/v7/libbeat/reader/readfile" +) + +// MultiLine reader combining multiple line events into one multi-line event. +// +// Lines to be combined are matched by some configurable predicate using +// regular expression. +// +// The maximum number of bytes and lines to be returned is fully configurable. +// Even if limits are reached subsequent lines are matched, until event is +// fully finished. +// +// Errors will force the multiline reader to return the currently active +// multiline event first and finally return the actual error on next call to Next. +type patternReader struct { + reader reader.Reader + pred matcher + flushMatcher *match.Matcher + state func(*patternReader) (reader.Message, error) + logger *logp.Logger + msgBuffer *messageBuffer +} + +const ( + // Default maximum number of lines to return in one multi-line event + defaultMaxLines = 500 + + // Default timeout to finish a multi-line event. + defaultMultilineTimeout = 5 * time.Second +) + +// Matcher represents the predicate comparing any two lines +// to find start and end of multiline events in stream of line events. +type matcher func(last, current []byte) bool + +var ( + sigMultilineTimeout = errors.New("multiline timeout") +) + +func newMultilinePatternReader( + r reader.Reader, + separator string, + maxBytes int, + config *Config, +) (reader.Reader, error) { + + matcher, err := setupPatternMatcher(config) + if err != nil { + return nil, err + } + + maxLines := defaultMaxLines + if config.MaxLines != nil { + maxLines = *config.MaxLines + } + + tout := defaultMultilineTimeout + if config.Timeout != nil { + tout = *config.Timeout + } + + if tout > 0 { + r = readfile.NewTimeoutReader(r, sigMultilineTimeout, tout) + } + + pr := &patternReader{ + reader: r, + pred: matcher, + flushMatcher: config.FlushPattern, + state: (*patternReader).readFirst, + msgBuffer: newMessageBuffer(maxBytes, maxLines, []byte(separator), config.SkipNewLine), + logger: logp.NewLogger("reader_multiline"), + } + return pr, nil +} + +func setupPatternMatcher(config *Config) (matcher, error) { + types := map[string]func(match.Matcher) (matcher, error){ + "before": beforeMatcher, + "after": afterMatcher, + } + + matcherType, ok := types[config.Match] + if !ok { + return nil, fmt.Errorf("unknown matcher type: %s", config.Match) + } + + m, err := matcherType(*config.Pattern) + if err != nil { + return nil, err + } + + if config.Negate { + m = negatedMatcher(m) + } + + return m, nil +} + +// Next returns next multi-line event. +func (pr *patternReader) Next() (reader.Message, error) { + return pr.state(pr) +} + +func (pr *patternReader) readFirst() (reader.Message, error) { + for { + message, err := pr.reader.Next() + if err != nil { + // no lines buffered -> ignore timeout + if err == sigMultilineTimeout { + continue + } + + pr.logger.Debug("Multiline event flushed because timeout reached.") + + // pass error to caller (next layer) for handling + return message, err + } + + if message.Bytes == 0 { + continue + } + + // Start new multiline event + pr.msgBuffer.startNewMessage(message) + pr.setState((*patternReader).readNext) + return pr.readNext() + } +} + +func (pr *patternReader) readNext() (reader.Message, error) { + for { + message, err := pr.reader.Next() + if err != nil { + // handle multiline timeout signal + if err == sigMultilineTimeout { + // no lines buffered -> ignore timeout + if pr.msgBuffer.isEmpty() { + continue + } + + pr.logger.Debug("Multiline event flushed because timeout reached.") + + // return collected multiline event and + // empty buffer for new multiline event + msg := pr.msgBuffer.finalize() + pr.resetState() + return msg, nil + } + + // handle error without any bytes returned from reader + if message.Bytes == 0 { + // no lines buffered -> return error + if pr.msgBuffer.isEmpty() { + return reader.Message{}, err + } + + // lines buffered, return multiline and error on next read + return pr.collectMessageAfterError(err) + } + + // handle error with some content being returned by reader and + // line matching multiline criteria or no multiline started yet + if pr.msgBuffer.isEmptyMessage() || pr.pred(pr.msgBuffer.last, message.Content) { + pr.msgBuffer.addLine(message) + + // return multiline and error on next read + return pr.collectMessageAfterError(err) + } + + // no match, return current multiline and retry with current line on next + // call to readNext awaiting the error being reproduced (or resolved) + // in next call to Next + msg := pr.msgBuffer.finalize() + pr.msgBuffer.load(message) + return msg, nil + } + + // handle case when endPattern is reached + if pr.flushMatcher != nil { + endPatternReached := (pr.flushMatcher.Match(message.Content)) + + if endPatternReached == true { + // return collected multiline event and + // empty buffer for new multiline event + pr.msgBuffer.addLine(message) + msg := pr.msgBuffer.finalize() + pr.resetState() + return msg, nil + } + } + + // if predicate does not match current multiline -> return multiline event + if !pr.msgBuffer.isEmptyMessage() && !pr.pred(pr.msgBuffer.last, message.Content) { + msg := pr.msgBuffer.finalize() + pr.msgBuffer.load(message) + return msg, nil + } + + // add line to current multiline event + pr.msgBuffer.addLine(message) + } +} + +func (pr *patternReader) collectMessageAfterError(err error) (reader.Message, error) { + msg := pr.msgBuffer.finalize() + pr.msgBuffer.setErr(err) + pr.setState((*patternReader).readFailed) + return msg, nil +} + +// readFailed returns empty message and error and resets line reader +func (pr *patternReader) readFailed() (reader.Message, error) { + err := pr.msgBuffer.err + pr.msgBuffer.setErr(nil) + pr.resetState() + return reader.Message{}, err +} + +// resetState sets state of the reader to readFirst +func (pr *patternReader) resetState() { + pr.setState((*patternReader).readFirst) +} + +// setState sets state to the given function +func (pr *patternReader) setState(next func(pr *patternReader) (reader.Message, error)) { + pr.state = next +} + +// matchers +func afterMatcher(pat match.Matcher) (matcher, error) { + return genPatternMatcher(pat, func(last, current []byte) []byte { + return current + }) +} + +func beforeMatcher(pat match.Matcher) (matcher, error) { + return genPatternMatcher(pat, func(last, current []byte) []byte { + return last + }) +} + +func negatedMatcher(m matcher) matcher { + return func(last, current []byte) bool { + return !m(last, current) + } +} + +func genPatternMatcher( + pat match.Matcher, + sel func(last, current []byte) []byte, +) (matcher, error) { + matcher := func(last, current []byte) bool { + line := sel(last, current) + return pat.Match(line) + } + return matcher, nil +} diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 02722e6ae70..261a10b2f69 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -1196,6 +1196,15 @@ filebeat.inputs: # Default is 5s. #multiline.timeout: 5s + # To aggregate constant number of lines into a single event use the count mode of multiline. + #multiline.type: count + + # The number of lines to aggregate into a single event. + #multiline.count_lines: 3 + + # Do not add new line character when concatenating lines. + #multiline.skip_newline: false + # Setting tail_files to true means filebeat starts reading new files at the end # instead of the beginning. If this is used in combination with log rotation # this can mean that the first entries of a new file are skipped.