diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 5a8b37bf45c..00dab286e98 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -462,6 +462,7 @@ https://github.com/elastic/beats/compare/v6.2.3...v6.3.0[View commits] - Add support human friendly size for the UDP input. {pull}6886[6886] - Add Syslog input to ingest RFC3164 Events via TCP and UDP {pull}6842[6842] - Remove the undefined `username` option from the Redis input and clarify the documentation. {pull}6662[6662] +- Add log.original to each log event. {pull}[] *Heartbeat* diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml index 8788afad00e..b10e81a71a9 100644 --- a/filebeat/_meta/common.reference.p2.yml +++ b/filebeat/_meta/common.reference.p2.yml @@ -80,6 +80,11 @@ filebeat.inputs: # This is especially useful for multiline log messages which can get large. #max_bytes: 10485760 + + # By default Filebeat adds log.original to each event as this contains the original log. + # This can be useful to reprocess the log files. + #original_message: true + ### Recursive glob configuration # Expand "**" patterns into regular glob patterns. diff --git a/filebeat/_meta/fields.common.yml b/filebeat/_meta/fields.common.yml index 5781e497879..bed2fb77eb9 100644 --- a/filebeat/_meta/fields.common.yml +++ b/filebeat/_meta/fields.common.yml @@ -108,6 +108,12 @@ description: > Logging level. + - name: log.original + type: keyword + description: > + The unprocessed original log message. This can be used for reprocessing logs. + index: false + - name: event.created type: date description: > diff --git a/filebeat/docs/fields.asciidoc b/filebeat/docs/fields.asciidoc index 4f5ec70dcba..863e870ee17 100644 --- a/filebeat/docs/fields.asciidoc +++ b/filebeat/docs/fields.asciidoc @@ -2682,6 +2682,18 @@ type: keyword Logging level. +-- + +*`log.original`*:: ++ +-- +type: keyword + +The unprocessed original log message. This can be used for reprocessing logs. + + +Field is not indexed. + -- *`event.created`*:: diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index f798e4ed301..6bbb698080f 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -435,6 +435,11 @@ filebeat.inputs: # This is especially useful for multiline log messages which can get large. #max_bytes: 10485760 + + # By default Filebeat adds log.original to each event as this contains the original log. + # This can be useful to reprocess the log files. + #original_message: true + ### Recursive glob configuration # Expand "**" patterns into regular glob patterns. diff --git a/filebeat/include/fields.go b/filebeat/include/fields.go index dabf266ec49..83623cea844 100644 --- a/filebeat/include/fields.go +++ b/filebeat/include/fields.go @@ -31,5 +31,5 @@ func init() { // Asset returns asset data func Asset() string { - return "" + return "" } diff --git a/filebeat/input/log/config.go b/filebeat/input/log/config.go index b88c11d932b..43ec9080bcc 100644 --- a/filebeat/input/log/config.go +++ b/filebeat/input/log/config.go @@ -43,16 +43,17 @@ var ( CleanInactive: 0, // Input - Enabled: true, - IgnoreOlder: 0, - ScanFrequency: 10 * time.Second, - CleanRemoved: true, - HarvesterLimit: 0, - Symlinks: false, - TailFiles: false, - ScanSort: "", - ScanOrder: "asc", - RecursiveGlob: true, + Enabled: true, + IgnoreOlder: 0, + ScanFrequency: 10 * time.Second, + CleanRemoved: true, + HarvesterLimit: 0, + Symlinks: false, + TailFiles: false, + ScanSort: "", + ScanOrder: "asc", + RecursiveGlob: true, + OriginalMessage: true, // Harvester BufferSize: 16 * humanize.KiByte, @@ -96,11 +97,12 @@ type config struct { ScanOrder string `config:"scan.order"` ScanSort string `config:"scan.sort"` - ExcludeLines []match.Matcher `config:"exclude_lines"` - IncludeLines []match.Matcher `config:"include_lines"` - MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` - Multiline *multiline.Config `config:"multiline"` - JSON *readjson.Config `config:"json"` + ExcludeLines []match.Matcher `config:"exclude_lines"` + IncludeLines []match.Matcher `config:"include_lines"` + MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` + Multiline *multiline.Config `config:"multiline"` + JSON *readjson.Config `config:"json"` + OriginalMessage bool `config:"original_message"` // Hidden on purpose, used by the docker input: DockerJSON *struct { diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index d5ffb3a3828..38e070808b3 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -306,6 +306,10 @@ func (h *Harvester) Run() error { } fields.DeepUpdate(message.Fields) + if h.config.OriginalMessage { + fields.Put("log.original", string(message.Original)) + } + // Check if json fields exist var jsonFields common.MapStr if f, ok := fields["json"]; ok { diff --git a/filebeat/reader/message.go b/filebeat/reader/message.go index c5e965b10d0..6ac2e1d1169 100644 --- a/filebeat/reader/message.go +++ b/filebeat/reader/message.go @@ -26,10 +26,11 @@ import ( // Message represents a reader event with timestamp, content and actual number // of bytes read from input before decoding. type Message struct { - Ts time.Time // timestamp the content was read - Content []byte // actual content read - Bytes int // total number of bytes read to generate the message - Fields common.MapStr // optional fields that can be added by reader + Ts time.Time // timestamp the content was read + Content []byte // actual content read + Original []byte // original content read + Bytes int // total number of bytes read to generate the message + Fields common.MapStr // optional fields that can be added by reader } // IsEmpty returns true in case the message is empty diff --git a/filebeat/reader/multiline/multiline.go b/filebeat/reader/multiline/multiline.go index 57209be94cd..d1223034c78 100644 --- a/filebeat/reader/multiline/multiline.go +++ b/filebeat/reader/multiline/multiline.go @@ -269,6 +269,7 @@ func (mlr *Reader) clear() { func (mlr *Reader) finalize() reader.Message { // Copy message from existing content msg := mlr.message + msg.Original = mlr.message.Content mlr.clear() return msg } diff --git a/filebeat/reader/readfile/strip_newline.go b/filebeat/reader/readfile/strip_newline.go index 3394cb9289e..cf5b506732e 100644 --- a/filebeat/reader/readfile/strip_newline.go +++ b/filebeat/reader/readfile/strip_newline.go @@ -42,6 +42,10 @@ func (p *StripNewline) Next() (reader.Message, error) { L := message.Content message.Content = L[:len(L)-lineEndingChars(L)] + // Also strip new lines from raw message + M := message.Original + message.Original = M[:len(M)-lineEndingChars(M)] + return message, err } diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 52e099d8511..67197132374 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -31,6 +31,7 @@ filebeat.{{input_config | default("inputs")}}: harvester_limit: {{harvester_limit | default(0) }} symlinks: {{symlinks}} pipeline: {{pipeline}} + original_message: {{original_message| default("true")}} {%- if input_processors %} processors: {%- for processor in input_processors %} diff --git a/filebeat/tests/system/test_harvester.py b/filebeat/tests/system/test_harvester.py index 0f8ed244c59..68638748c3e 100644 --- a/filebeat/tests/system/test_harvester.py +++ b/filebeat/tests/system/test_harvester.py @@ -818,3 +818,58 @@ def test_decode_error(self): output = self.read_output_json() assert output[2]["message"] == "hello world2" + + def test_original_message_enabled(self): + """ + Test original message enabled for json use case + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + json=dict( + keys_under_root=True, + ), + ) + os.mkdir(self.working_dir + "/log/") + logfile = self.working_dir + "/log/test.log" + message = '{"hello":"world"}' + with open(logfile, 'a') as file: + file.write(message + "\n") + + proc = self.start_beat() + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + proc.check_kill_and_wait() + + output = self.read_output() + assert len(output) == 1 + + assert output[0]["log.message"] == message + + def test_original_message_disabled(self): + """ + Test original message enabled for json use case + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + json=dict( + keys_under_root=True, + ), + original_message=False, + ) + os.mkdir(self.working_dir + "/log/") + logfile = self.working_dir + "/log/test.log" + message = '{"hello":"world"}' + with open(logfile, 'a') as file: + file.write(message + "\n") + + proc = self.start_beat() + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + proc.check_kill_and_wait() + + output = self.read_output() + assert len(output) == 1 + + assert "log.original" not in output[0] diff --git a/filebeat/tests/system/test_json.py b/filebeat/tests/system/test_json.py index d98b6025aa6..c89246d239e 100644 --- a/filebeat/tests/system/test_json.py +++ b/filebeat/tests/system/test_json.py @@ -42,7 +42,8 @@ def test_docker_logs_filtering(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", json=dict(message_key="log", keys_under_root=True), - exclude_lines=["windows"] + exclude_lines=["windows"], + original_message=False, ) os.mkdir(self.working_dir + "/log/") @@ -74,7 +75,8 @@ def test_docker_logs_multiline(self): multiline=True, pattern="^\[log\]", match="after", - negate="true" + negate="true", + original_message=False, ) os.mkdir(self.working_dir + "/log/")