Skip to content

Commit

Permalink
Cherry-pick #18352 to 7.x: Add new mode to multiline reader to aggreg…
Browse files Browse the repository at this point in the history
…ate constant number of lines (#19243)

* Add new mode to multiline reader to aggregate constant number of lines (#18352)

## What does this PR do?

This PR adds a new mode for the multiline reader of Libbeat (exposed in Filebeat). The new mode lets users to aggregate the configured number of lines into a single event.

Example configuration to aggregate 5 lines:
```yaml
muliline.type: count
multiline.count_lines: 5
```

This PR also adds a new configuration option `skip_newline`. If set, Filebeat does not add a newline when two events are concatenated.

Closes #18038
(cherry picked from commit e3f51ab)
  • Loading branch information
kvch authored Jun 17, 2020
1 parent 999fb5c commit 77bc7a5
Show file tree
Hide file tree
Showing 14 changed files with 876 additions and 346 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Add geoip AS lookup & improve ECS categorization in aws cloudtrail fileset. {issue}18644[18644] {pull}18958[18958]
- Improved performance of PANW sample dashboards. {issue}19031[19031] {pull}19032[19032]
- Add support for v1 consumer API in Cloud Foundry input, use it by default. {pull}19125[19125]
- Add new mode to multiline reader to aggregate constant number of lines {pull}18352[18352]

*Heartbeat*

Expand Down
9 changes: 9 additions & 0 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ filebeat.inputs:
# Default is 5s.
#multiline.timeout: 5s

# To aggregate constant number of lines into a single event use the count mode of multiline.
#multiline.type: count

# The number of lines to aggregate into a single event.
#multiline.count_lines: 3

# Do not add new line character when concatenating lines.
#multiline.skip_newline: false

# Setting tail_files to true means filebeat starts reading new files at the end
# instead of the beginning. If this is used in combination with log rotation
# this can mean that the first entries of a new file are skipped.
Expand Down
12 changes: 12 additions & 0 deletions filebeat/docs/multiline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The following example shows how to configure {beatname_uc} to handle a multiline

[source,yaml]
-------------------------------------------------------------------------------------
multiline.type: pattern
multiline.pattern: '^\['
multiline.negate: true
multiline.match: after
Expand All @@ -46,6 +47,8 @@ multiline.match: after
at org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction.checkBlock(TransportDeleteIndexAction.java:75)
-------------------------------------------------------------------------------------

*`multiline.type`*:: Defines which aggregation method to use. The default is `pattern`. The other option
is `count` which lets you aggregate constant number of lines.

*`multiline.pattern`*:: Specifies the regular expression pattern to match. Note that the regexp patterns supported by {beatname_uc}
differ somewhat from the patterns supported by Logstash. See <<regexp-support>> for a list of supported regexp patterns.
Expand Down Expand Up @@ -76,6 +79,10 @@ lines are discarded. The default is 500.

*`multiline.timeout`*:: After the specified timeout, {beatname_uc} sends the multiline event even if no new pattern is found to start a new event. The default is 5s.

*`multiline.count_lines`*:: The number of lines to aggregate into a single event.

*`multiline.skip_newline`*:: When set, multiline events are concatenated without a line separator.


==== Examples of multiline configuration

Expand Down Expand Up @@ -103,6 +110,7 @@ To consolidate these lines into a single event in {beatname_uc}, use the followi

[source,yaml]
-------------------------------------------------------------------------------------
multiline.type: pattern
multiline.pattern: '^[[:space:]]'
multiline.negate: false
multiline.match: after
Expand All @@ -127,6 +135,7 @@ To consolidate these lines into a single event in {beatname_uc}, use the followi

[source,yaml]
-------------------------------------------------------------------------------------
multiline.type: pattern
multiline.pattern: '^[[:space:]]+(at|\.{3})[[:space:]]+\b|^Caused by:'
multiline.negate: false
multiline.match: after
Expand All @@ -153,6 +162,7 @@ To consolidate these lines into a single event in {beatname_uc}, use the followi

[source,yaml]
-------------------------------------------------------------------------------------
multiline.type: pattern
multiline.pattern: '\\$'
multiline.negate: false
multiline.match: before
Expand All @@ -176,6 +186,7 @@ To consolidate these lines into a single event in {beatname_uc}, use the followi

[source,yaml]
-------------------------------------------------------------------------------------
multiline.type: pattern
multiline.pattern: '^\[[0-9]{4}-[0-9]{2}-[0-9]{2}'
multiline.negate: true
multiline.match: after
Expand All @@ -200,6 +211,7 @@ To consolidate this as a single event in {beatname_uc}, use the following multil

[source,yaml]
-------------------------------------------------------------------------------------
multiline.type: pattern
multiline.pattern: 'Start new event'
multiline.negate: true
multiline.match: after
Expand Down
9 changes: 9 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,15 @@ filebeat.inputs:
# Default is 5s.
#multiline.timeout: 5s

# To aggregate constant number of lines into a single event use the count mode of multiline.
#multiline.type: count

# The number of lines to aggregate into a single event.
#multiline.count_lines: 3

# Do not add new line character when concatenating lines.
#multiline.skip_newline: false

# Setting tail_files to true means filebeat starts reading new files at the end
# instead of the beginning. If this is used in combination with log rotation
# this can mean that the first entries of a new file are skipped.
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ filebeat.{{input_config | default("inputs")}}:

{% if multiline %}
multiline:
type: {{multiline_type}}
pattern: {{pattern}}
negate: {{negate}}
match: {{match}}
Expand Down
11 changes: 10 additions & 1 deletion filebeat/tests/system/test_multiline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def test_java_elasticsearch_log(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="^\[",
negate="true",
match="after"
Expand Down Expand Up @@ -48,6 +49,7 @@ def test_c_style_log(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="\\\\$",
match="before"
)
Expand Down Expand Up @@ -78,6 +80,7 @@ def test_rabbitmq_multiline_log(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="^=[A-Z]+",
match="after",
negate="true",
Expand Down Expand Up @@ -122,6 +125,7 @@ def test_max_lines(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="^\[",
negate="true",
match="after",
Expand Down Expand Up @@ -160,6 +164,7 @@ def test_timeout(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="^\[",
negate="true",
match="after",
Expand Down Expand Up @@ -204,6 +209,7 @@ def test_max_bytes(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="^\[",
negate="true",
match="after",
Expand Down Expand Up @@ -240,6 +246,7 @@ def test_close_timeout_with_multiline(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="^\[",
negate="true",
match="after",
Expand Down Expand Up @@ -295,6 +302,7 @@ def test_consecutive_newline(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="^\[",
negate="true",
match="after",
Expand Down Expand Up @@ -340,11 +348,12 @@ def test_invalid_config(self):
self.render_config_template(
path=os.path.abspath(self.working_dir + "/log/") + "*",
multiline=True,
multiline_type="pattern",
match="after",
)

proc = self.start_beat()

self.wait_until(lambda: self.log_contains("missing required field accessing") == 1)
self.wait_until(lambda: self.log_contains("multiline.pattern cannot be empty") == 1)

proc.check_kill_and_wait(exit_code=1)
133 changes: 133 additions & 0 deletions libbeat/reader/multiline/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package multiline

import (
"github.com/elastic/beats/v7/libbeat/reader"
)

type counterReader struct {
reader reader.Reader
state func(*counterReader) (reader.Message, error)
linesCount int // number of lines to collect
msgBuffer *messageBuffer
}

func newMultilineCountReader(
r reader.Reader,
separator string,
maxBytes int,
config *Config,
) (reader.Reader, error) {
maxLines := config.LinesCount
if l := config.MaxLines; l != nil && 0 < *l {
maxLines = *l
}

return &counterReader{
reader: r,
state: (*counterReader).readFirst,
linesCount: config.LinesCount,
msgBuffer: newMessageBuffer(maxBytes, maxLines, []byte(separator), config.SkipNewLine),
}, nil
}

// Next returns next multi-line event.
func (cr *counterReader) Next() (reader.Message, error) {
return cr.state(cr)
}

func (cr *counterReader) readFirst() (reader.Message, error) {
for {
message, err := cr.reader.Next()
if err != nil {
return message, err
}

if message.Bytes == 0 {
continue
}

cr.msgBuffer.startNewMessage(message)
if cr.msgBuffer.processedLines == cr.linesCount {
msg := cr.msgBuffer.finalize()
return msg, nil
}

cr.setState((*counterReader).readNext)
return cr.readNext()
}
}

func (cr *counterReader) readNext() (reader.Message, error) {
for {
message, err := cr.reader.Next()
if err != nil {
// handle error without any bytes returned from reader
if message.Bytes == 0 {
// no lines buffered -> return error
if cr.msgBuffer.isEmpty() {
return reader.Message{}, err
}

// lines buffered, return multiline and error on next read
msg := cr.msgBuffer.finalize()
cr.msgBuffer.setErr(err)
cr.setState((*counterReader).readFailed)
return msg, nil
}

// handle error with some content being returned by reader and
// line matching multiline criteria or no multiline started yet
if cr.msgBuffer.isEmptyMessage() {
cr.msgBuffer.addLine(message)

// return multiline and error on next read
msg := cr.msgBuffer.finalize()
cr.msgBuffer.setErr(err)
cr.setState((*counterReader).readFailed)
return msg, nil
}
}

// add line to current multiline event
cr.msgBuffer.addLine(message)
if cr.msgBuffer.processedLines == cr.linesCount {
msg := cr.msgBuffer.finalize()
cr.setState((*counterReader).readFirst)
return msg, nil
}
}
}

func (cr *counterReader) readFailed() (reader.Message, error) {
err := cr.msgBuffer.err
cr.msgBuffer.setErr(nil)
cr.resetState()
return reader.Message{}, err
}

// resetState sets state of the reader to readFirst
func (cr *counterReader) resetState() {
cr.setState((*counterReader).readFirst)
}

// setState sets state to the given function
func (cr *counterReader) setState(next func(cr *counterReader) (reader.Message, error)) {
cr.state = next
}
Loading

0 comments on commit 77bc7a5

Please sign in to comment.