diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index e460e2627e0..cf41c97d080 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -70,7 +70,7 @@ type readerConfig struct { MaxBytes int `config:"message_max_bytes" validate:"min=0,nonzero"` Tail bool `config:"seek_to_tail"` - Parsers []*common.ConfigNamespace `config:"parsers"` // TODO multiline, json, syslog? + Parsers []common.ConfigNamespace `config:"parsers"` } type backoffConfig struct { @@ -115,7 +115,7 @@ func defaultReaderConfig() readerConfig { LineTerminator: readfile.AutoLineTerminator, MaxBytes: 10 * humanize.MiByte, Tail: false, - Parsers: nil, + Parsers: make([]common.ConfigNamespace, 0), } } @@ -123,26 +123,10 @@ func (c *config) Validate() error { if len(c.Paths) == 0 { return fmt.Errorf("no path is configured") } - // TODO - //if c.CleanInactive != 0 && c.IgnoreOlder == 0 { - // return fmt.Errorf("ignore_older must be enabled when clean_inactive is used") - //} - - // TODO - //if c.CleanInactive != 0 && c.CleanInactive <= c.IgnoreOlder+c.ScanFrequency { - // return fmt.Errorf("clean_inactive must be > ignore_older + scan_frequency to make sure only files which are not monitored anymore are removed") - //} - - // TODO - //if c.JSON != nil && len(c.JSON.MessageKey) == 0 && - // c.Multiline != nil { - // return fmt.Errorf("When using the JSON decoder and multiline together, you need to specify a message_key value") - //} - - //if c.JSON != nil && len(c.JSON.MessageKey) == 0 && - // (len(c.IncludeLines) > 0 || len(c.ExcludeLines) > 0) { - // return fmt.Errorf("When using the JSON decoder and line filtering together, you need to specify a message_key value") - //} + + if err := validateParserConfig(parserConfig{maxBytes: c.Reader.MaxBytes, lineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil { + return fmt.Errorf("cannot parse parser configuration: %+v", err) + } return nil } diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index f337bc06411..0811fa93405 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -345,6 +345,39 @@ func (e *inputTestingEnvironment) getOutputMessages() []string { return messages } +func (e *inputTestingEnvironment) requireEventContents(nr int, key, value string) { + events := make([]beat.Event, 0) + for _, c := range e.pipeline.clients { + for _, evt := range c.GetEvents() { + events = append(events, evt) + } + } + + selectedEvent := events[nr] + v, err := selectedEvent.Fields.GetValue(key) + if err != nil { + e.t.Fatalf("cannot find key %s in event %+v", key, selectedEvent) + } + + val, ok := v.(string) + if !ok { + e.t.Fatalf("value is not string %+v", v) + } + require.Equal(e.t, value, val) +} + +func (e *inputTestingEnvironment) requireEventTimestamp(nr int, ts string) { + events := make([]beat.Event, 0) + for _, c := range e.pipeline.clients { + for _, evt := range c.GetEvents() { + events = append(events, evt) + } + } + + selectedEvent := events[nr] + require.Equal(e.t, ts, selectedEvent.Timestamp.String()) +} + type testInputStore struct { registry *statestore.Registry } diff --git a/filebeat/input/filestream/example_inputs_integration_test.go b/filebeat/input/filestream/example_inputs_integration_test.go new file mode 100644 index 00000000000..1973ef9a6d2 --- /dev/null +++ b/filebeat/input/filestream/example_inputs_integration_test.go @@ -0,0 +1,89 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build integration + +package filestream + +const ( + elasticsearchMultilineLogs = `[2015-12-06 01:44:16,735][INFO ][node ] [Zach] version[2.0.0], pid[48553], build[de54438/2015-10-22T08:09:48Z] +[2015-12-06 01:44:16,736][INFO ][node ] [Zach] initializing ... +[2015-12-06 01:44:16,804][INFO ][plugins ] [Zach] loaded [], sites [] +[2015-12-06 01:44:16,941][INFO ][env ] [Zach] using [1] data paths, mounts [[/ (/dev/disk1)]], net usable_space [66.3gb], net total_space [232.6gb], spins? [unknown], types [hfs] +[2015-12-06 01:44:19,177][INFO ][node ] [Zach] initialized +[2015-12-06 01:44:19,177][INFO ][node ] [Zach] starting ... +[2015-12-06 01:44:19,356][INFO ][transport ] [Zach] publish_address {127.0.0.1:9300}, bound_addresses {127.0.0.1:9300}, {[fe80::1]:9300}, {[::1]:9300} +[2015-12-06 01:44:19,367][INFO ][discovery ] [Zach] elasticsearch/qfPw9z0HQe6grbJQruTCJQ +[2015-12-06 01:44:22,405][INFO ][cluster.service ] [Zach] new_master {Zach}{qfPw9z0HQe6grbJQruTCJQ}{127.0.0.1}{127.0.0.1:9300}, reason: zen-disco-join(elected_as_master, [0] joins received) +[2015-12-06 01:44:22,432][INFO ][http ] [Zach] publish_address {127.0.0.1:9200}, bound_addresses {127.0.0.1:9200}, {[fe80::1]:9200}, {[::1]:9200} +[2015-12-06 01:44:22,432][INFO ][node ] [Zach] started +[2015-12-06 01:44:22,446][INFO ][gateway ] [Zach] recovered [0] indices into cluster_state +[2015-12-06 01:44:52,882][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] creating index, cause [auto(bulk api)], templates [], shards [5]/[1], mappings [process, system] +[2015-12-06 01:44:53,256][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] update_mapping [process] +[2015-12-06 01:44:53,269][DEBUG][action.admin.indices.mapping.put] [Zach] failed to put mappings on indices [[filebeat-2015.12.06]], type [process] +MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double]]}] + at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388) + at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +[2015-12-06 01:44:53,274][DEBUG][action.bulk ] [Zach] [filebeat-2015.12.06][0] failed to execute bulk item (index) index {[filebeat-2015.12.06][process][AVF0v5vcVA0hoJdODMTz], source[{"@timestamp":"2015-12-06T00:44:52.448Z","beat":{"hostname":"ruflin","name":"ruflin"},"count":1,"proc":{"cpu":{"user":1902,"user_p":0,"system":941,"total":2843,"start_time":"Dec03"},"mem":{"size":3616309248,"rss":156405760,"rss_p":0.01,"share":0},"name":"Google Chrome H","pid":40572,"ppid":392,"state":"running"},"type":"process"}]} +MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double]]}] + at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388) + at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +[2015-12-06 01:44:53,279][DEBUG][action.admin.indices.mapping.put] [Zach] failed to put mappings on indices [[filebeat-2015.12.06]], type [process] +MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double], mapper [proc.cpu.user_p] of different type, current_type [long], merged_type [double]]}] + at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388) + at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +[2015-12-06 01:44:53,280][DEBUG][action.bulk ] [Zach] [filebeat-2015.12.06][1] failed to execute bulk item (index) index {[filebeat-2015.12.06][process][AVF0v5vbVA0hoJdODMTj], source[{"@timestamp":"2015-12-06T00:44:52.416Z","beat":{"hostname":"ruflin","name":"ruflin"},"count":1,"proc":{"cpu":{"user":6643,"user_p":0.01,"system":693,"total":7336,"start_time":"01:44"},"mem":{"size":5182656512,"rss":248872960,"rss_p":0.01,"share":0},"name":"java","pid":48553,"ppid":48547,"state":"running"},"type":"process"}]} +MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double], mapper [proc.cpu.user_p] of different type, current_type [long], merged_type [double]]}] + at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388) + at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +[2015-12-06 01:44:53,334][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] update_mapping [system] +[2015-12-06 01:44:53,646][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] create_mapping [filesystem] +` + + elasticsearchMultilineLongLogs = `[2015-12-06 01:44:16,735][INFO ][node ] [Zach] version[2.0.0], pid[48553], build[de54438/2015-10-22T08:09:48Z] +[2015-12-06 01:44:53,269][DEBUG][action.admin.indices.mapping.put] [Zach] failed to put mappings on indices [[filebeat-2015.12.06]], type [process] +MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double]]}] + at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388) + at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +[2015-12-06 01:44:53,646][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] create_mapping [filesystem] +` +) diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 1a093602fb8..1e1298ee1a6 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -57,6 +57,8 @@ type filestream struct { encodingFactory encoding.EncodingFactory encoding encoding.Encoding closerConfig closerConfig + parserConfig []common.ConfigNamespace + msgPostProc []postProcesser } // Plugin creates a new filestream input plugin for creating a stateful input. @@ -216,8 +218,16 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri } r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator) + + r, err = newParsers(r, parserConfig{maxBytes: inp.readerConfig.MaxBytes, lineTerminator: inp.readerConfig.LineTerminator}, inp.readerConfig.Parsers) + if err != nil { + return nil, err + } + r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes) + inp.msgPostProc = newPostProcessors(inp.readerConfig.Parsers) + return r, nil } @@ -369,16 +379,24 @@ func (inp *filestream) eventFromMessage(m reader.Message, path string) beat.Even }, } fields.DeepUpdate(m.Fields) + m.Fields = fields + + for _, proc := range inp.msgPostProc { + proc.PostProcess(&m) + } if len(m.Content) > 0 { - if fields == nil { - fields = common.MapStr{} + if m.Fields == nil { + m.Fields = common.MapStr{} + } + if _, ok := m.Fields["message"]; !ok { + m.Fields["message"] = string(m.Content) } - fields["message"] = string(m.Content) } return beat.Event{ Timestamp: m.Ts, - Fields: fields, + Meta: m.Meta, + Fields: m.Fields, } } diff --git a/filebeat/input/filestream/parser.go b/filebeat/input/filestream/parser.go new file mode 100644 index 00000000000..70180b4a452 --- /dev/null +++ b/filebeat/input/filestream/parser.go @@ -0,0 +1,140 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "errors" + "fmt" + "io" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/reader" + "github.com/elastic/beats/v7/libbeat/reader/multiline" + "github.com/elastic/beats/v7/libbeat/reader/readfile" + "github.com/elastic/beats/v7/libbeat/reader/readjson" +) + +var ( + ErrNoSuchParser = errors.New("no such parser") +) + +// parser transforms or translates the Content attribute of a Message. +// They are able to aggregate two or more Messages into a single one. +type parser interface { + io.Closer + Next() (reader.Message, error) +} + +type parserConfig struct { + maxBytes int + lineTerminator readfile.LineTerminator +} + +type postProcesser interface { + PostProcess(*reader.Message) + Name() string +} + +func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) (parser, error) { + p := in + + parserCheck := make(map[string]int) + for _, ns := range c { + name := ns.Name() + switch name { + case "multiline": + parserCheck["multiline"]++ + var config multiline.Config + cfg := ns.Config() + err := cfg.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("error while parsing multiline parser config: %+v", err) + } + p, err = multiline.New(p, "\n", pCfg.maxBytes, &config) + if err != nil { + return nil, fmt.Errorf("error while creating multiline parser: %+v", err) + } + case "ndjson": + parserCheck["ndjson"]++ + var config readjson.Config + cfg := ns.Config() + err := cfg.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("error while parsing ndjson parser config: %+v", err) + } + p = readjson.NewJSONReader(p, &config) + default: + return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name) + } + } + + // This is a temporary check. In the long run configuring multiple parsers with the same + // type is going to be supported. + if count, ok := parserCheck["multiline"]; ok && count > 1 { + return nil, fmt.Errorf("only one parser is allowed for multiline, got %d", count) + } + if count, ok := parserCheck["ndjson"]; ok && count > 1 { + return nil, fmt.Errorf("only one parser is allowed for ndjson, got %d", count) + } + + return p, nil +} + +func newPostProcessors(c []common.ConfigNamespace) []postProcesser { + var pp []postProcesser + for _, ns := range c { + name := ns.Name() + switch name { + case "ndjson": + var config readjson.Config + cfg := ns.Config() + cfg.Unpack(&config) + pp = append(pp, readjson.NewJSONPostProcessor(&config)) + default: + continue + } + } + + return pp +} + +func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error { + for _, ns := range c { + name := ns.Name() + switch name { + case "multiline": + var config multiline.Config + cfg := ns.Config() + err := cfg.Unpack(&config) + if err != nil { + return fmt.Errorf("error while parsing multiline parser config: %+v", err) + } + case "ndjson": + var config readjson.Config + cfg := ns.Config() + err := cfg.Unpack(&config) + if err != nil { + return fmt.Errorf("error while parsing ndjson parser config: %+v", err) + } + default: + return fmt.Errorf("%s: %s", ErrNoSuchParser, name) + } + } + + return nil +} diff --git a/filebeat/input/filestream/parser_test.go b/filebeat/input/filestream/parser_test.go new file mode 100644 index 00000000000..a312012eaf3 --- /dev/null +++ b/filebeat/input/filestream/parser_test.go @@ -0,0 +1,284 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "io/ioutil" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/reader" + "github.com/elastic/beats/v7/libbeat/reader/multiline" + "github.com/elastic/beats/v7/libbeat/reader/readfile" + "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" +) + +func TestParsersConfigAndReading(t *testing.T) { + tests := map[string]struct { + lines string + parsers map[string]interface{} + expectedMessages []string + expectedError string + }{ + "no parser, no error": { + lines: "line 1\nline 2\n", + parsers: map[string]interface{}{ + "paths": []string{"dummy_path"}, + }, + expectedMessages: []string{"line 1\n", "line 2\n"}, + }, + "correct multiline parser": { + lines: "line 1.1\nline 1.2\nline 1.3\nline 2.1\nline 2.2\nline 2.3\n", + parsers: map[string]interface{}{ + "paths": []string{"dummy_path"}, + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "multiline": map[string]interface{}{ + "type": "count", + "count_lines": 3, + }, + }, + }, + }, + expectedMessages: []string{ + "line 1.1\n\nline 1.2\n\nline 1.3\n", + "line 2.1\n\nline 2.2\n\nline 2.3\n", + }, + }, + "multiline docker logs parser": { + lines: `{"log":"[log] The following are log messages\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":"[log] This one is\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":" on multiple\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":" lines","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":"[log] In total there should be 3 events\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +`, + parsers: map[string]interface{}{ + "paths": []string{"dummy_path"}, + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "ndjson": map[string]interface{}{ + "keys_under_root": true, + "message_key": "log", + }, + }, + map[string]interface{}{ + "multiline": map[string]interface{}{ + "match": "after", + "negate": true, + "pattern": "^\\[log\\]", + }, + }, + }, + }, + expectedMessages: []string{ + "[log] The following are log messages\n", + "[log] This one is\n\n on multiple\n\n lines", + "[log] In total there should be 3 events\n", + }, + }, + "non existent parser configuration": { + parsers: map[string]interface{}{ + "paths": []string{"dummy_path"}, + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "no_such_parser": nil, + }, + }, + }, + expectedError: ErrNoSuchParser.Error(), + }, + "invalid multiline parser configuration is caught before parser creation": { + parsers: map[string]interface{}{ + "paths": []string{"dummy_path"}, + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "multiline": map[string]interface{}{ + "match": "after", + }, + }, + }, + }, + expectedError: multiline.ErrMissingPattern.Error(), + }, + } + + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + cfg := defaultConfig() + parsersConfig := common.MustNewConfigFrom(test.parsers) + err := parsersConfig.Unpack(&cfg) + if test.expectedError == "" { + require.NoError(t, err) + } else { + require.Contains(t, err.Error(), test.expectedError) + return + } + + p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 64}, cfg.Reader.Parsers) + + i := 0 + msg, err := p.Next() + for err == nil { + require.Equal(t, test.expectedMessages[i], string(msg.Content)) + i++ + msg, err = p.Next() + } + }) + } +} + +func TestPostProcessor(t *testing.T) { + tests := map[string]struct { + message reader.Message + postProcessors map[string]interface{} + expectedMessage reader.Message + }{ + "no postprocesser, no processing": { + message: reader.Message{ + Content: []byte("line 1"), + }, + postProcessors: map[string]interface{}{ + "paths": []string{"dummy_path"}, + }, + expectedMessage: reader.Message{ + Content: []byte("line 1"), + }, + }, + "JSON post processer with keys_under_root": { + message: reader.Message{ + Fields: common.MapStr{ + "json": common.MapStr{ + "key": "value", + }, + }, + }, + postProcessors: map[string]interface{}{ + "paths": []string{"dummy_path"}, + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "ndjson": map[string]interface{}{ + "keys_under_root": true, + }, + }, + }, + }, + expectedMessage: reader.Message{ + Fields: common.MapStr{ + "key": "value", + }, + }, + }, + "JSON post processer with document ID": { + message: reader.Message{ + Fields: common.MapStr{ + "json": common.MapStr{ + "key": "value", + "my-id-field": "my-id", + }, + }, + }, + postProcessors: map[string]interface{}{ + "paths": []string{"dummy_path"}, + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "ndjson": map[string]interface{}{ + "keys_under_root": true, + "document_id": "my-id-field", + }, + }, + }, + }, + expectedMessage: reader.Message{ + Fields: common.MapStr{ + "key": "value", + }, + Meta: common.MapStr{ + "_id": "my-id", + }, + }, + }, + "JSON post processer with overwrite keys and under root": { + message: reader.Message{ + Fields: common.MapStr{ + "json": common.MapStr{ + "key": "value", + }, + "key": "another-value", + "other-key": "other-value", + }, + }, + postProcessors: map[string]interface{}{ + "paths": []string{"dummy_path"}, + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "ndjson": map[string]interface{}{ + "keys_under_root": true, + "overwrite_keys": true, + }, + }, + }, + }, + expectedMessage: reader.Message{ + Fields: common.MapStr{ + "key": "value", + "other-key": "other-value", + }, + }, + }, + } + + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + cfg := defaultConfig() + common.MustNewConfigFrom(test.postProcessors).Unpack(&cfg) + pp := newPostProcessors(cfg.Reader.Parsers) + + msg := test.message + for _, p := range pp { + p.PostProcess(&msg) + } + require.Equal(t, test.expectedMessage, msg) + }) + } + +} + +func testReader(lines string) reader.Reader { + encF, _ := encoding.FindEncoding("") + reader := strings.NewReader(lines) + enc, err := encF(reader) + if err != nil { + panic(err) + } + r, err := readfile.NewEncodeReader(ioutil.NopCloser(reader), readfile.Config{ + Codec: enc, + BufferSize: 1024, + Terminator: readfile.AutoLineTerminator, + MaxBytes: 1024, + }) + if err != nil { + panic(err) + } + + return r +} diff --git a/filebeat/input/filestream/parsers_integration_test.go b/filebeat/input/filestream/parsers_integration_test.go new file mode 100644 index 00000000000..3b4d27ffedb --- /dev/null +++ b/filebeat/input/filestream/parsers_integration_test.go @@ -0,0 +1,552 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build integration + +package filestream + +import ( + "context" + "testing" +) + +// test_docker_logs from test_json.py +func TestParsersDockerLogs(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "ndjson": map[string]interface{}{ + "message_key": "log", + }, + }, + }, + }) + + testline := []byte("{\"log\":\"Fetching main repository github.com/elastic/beats...\\n\",\"stream\":\"stdout\",\"time\":\"2016-03-02T22:58:51.338462311Z\"}\n") + env.mustWriteLinesToFile(testlogName, testline) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(1) + env.requireOffsetInRegistry(testlogName, len(testline)) + + env.requireEventContents(0, "json.log", "Fetching main repository github.com/elastic/beats...\n") + env.requireEventContents(0, "json.time", "2016-03-02T22:58:51.338462311Z") + env.requireEventContents(0, "json.stream", "stdout") + + cancelInput() + env.waitUntilInputStops() +} + +// test_docker_logs_filtering from test_json.py +func TestParsersDockerLogsFiltering(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "ndjson": map[string]interface{}{ + "message_key": "log", + "keys_under_root": true, + }, + }, + }, + "exclude_lines": []string{"main"}, + }) + + testline := []byte(`{"log":"Fetching main repository github.com/elastic/beats...\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":"Fetching dependencies...\n","stream":"stdout","time":"2016-03-02T22:59:04.609292428Z"} +{"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"} +`) + env.mustWriteLinesToFile(testlogName, testline) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(2) + env.requireOffsetInRegistry(testlogName, len(testline)) + + env.requireEventContents(0, "time", "2016-03-02T22:59:04.609292428Z") + env.requireEventContents(0, "stream", "stdout") + + cancelInput() + env.waitUntilInputStops() +} + +// test_simple_json_overwrite from test_json.py +func TestParsersSimpleJSONOverwrite(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "ndjson": map[string]interface{}{ + "message_key": "message", + "keys_under_root": true, + "overwrite_keys": true, + }, + }, + }, + }) + + testline := []byte("{\"source\": \"hello\", \"message\": \"test source\"}\n") + env.mustWriteLinesToFile(testlogName, testline) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(1) + env.requireOffsetInRegistry(testlogName, len(testline)) + + env.requireEventContents(0, "source", "hello") + env.requireEventContents(0, "message", "test source") + + cancelInput() + env.waitUntilInputStops() +} + +// test_timestamp_in_message from test_json.py +func TestParsersTimestampInJSONMessage(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "ndjson": map[string]interface{}{ + "keys_under_root": true, + "overwrite_keys": true, + "add_error_key": true, + }, + }, + }, + }) + + testline := []byte(`{"@timestamp":"2016-04-05T18:47:18.444Z"} +{"@timestamp":"invalid"} +{"@timestamp":{"hello": "test"}} +`) + + env.mustWriteLinesToFile(testlogName, testline) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(3) + env.requireOffsetInRegistry(testlogName, len(testline)) + + env.requireEventTimestamp(0, "2016-04-05 18:47:18.444 +0000 UTC") + env.requireEventContents(1, "error.message", "@timestamp not overwritten (parse error on invalid)") + env.requireEventContents(2, "error.message", "@timestamp not overwritten (not string)") + + cancelInput() + env.waitUntilInputStops() +} + +// test_java_elasticsearch_log from test_multiline.py +func TestParsersJavaElasticsearchLogs(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "multiline": map[string]interface{}{ + "type": "pattern", + "pattern": "^\\[", + "negate": true, + "match": "after", + "timeout": "100ms", // set to lower value to speed up test + }, + }, + }, + }) + + testlines := []byte(elasticsearchMultilineLogs) + env.mustWriteLinesToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(20) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + cancelInput() + env.waitUntilInputStops() +} + +// test_c_style_log from test_multiline.py +func TestParsersCStyleLog(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "multiline": map[string]interface{}{ + "type": "pattern", + "pattern": "\\\\$", + "negate": false, + "match": "before", + "timeout": "100ms", // set to lower value to speed up test + }, + }, + }, + }) + + testlines := []byte(`The following are log messages +This is a C style log\\ +file which is on multiple\\ +lines +In addition it has normal lines +The total should be 4 lines covered +`) + env.mustWriteLinesToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(4) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + cancelInput() + env.waitUntilInputStops() +} + +// test_rabbitmq_multiline_log from test_multiline.py +func TestParsersRabbitMQMultilineLog(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "multiline": map[string]interface{}{ + "type": "pattern", + "pattern": "^=[A-Z]+", + "negate": true, + "match": "after", + "timeout": "100ms", // set to lower value to speed up test + }, + }, + }, + }) + + testlines := []byte(`=ERROR REPORT==== 3-Feb-2016::03:10:32 === +connection <0.23893.109>, channel 3 - soft error: +{amqp_error,not_found, + "no queue 'bucket-1' in vhost '/'", + 'queue.declare'} +=ERROR REPORT==== 3-Feb-2016::03:10:32 === +connection <0.23893.109>, channel 3 - soft error: +{amqp_error,not_found, + "no queue 'bucket-1' in vhost '/'", + 'queue.declare'} +`) + env.mustWriteLinesToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(2) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + cancelInput() + env.waitUntilInputStops() +} + +// test_max_lines from test_multiline.py +func TestParsersMultilineMaxLines(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "multiline": map[string]interface{}{ + "type": "pattern", + "pattern": "^\\[", + "negate": true, + "match": "after", + "max_lines": 3, + "timeout": "100ms", // set to lower value to speed up test + }, + }, + }, + }) + + testlines := []byte(elasticsearchMultilineLongLogs) + env.mustWriteLinesToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(3) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + env.requireEventsReceived([]string{ + "[2015-12-06 01:44:16,735][INFO ][node ] [Zach] version[2.0.0], pid[48553], build[de54438/2015-10-22T08:09:48Z]", + `[2015-12-06 01:44:53,269][DEBUG][action.admin.indices.mapping.put] [Zach] failed to put mappings on indices [[filebeat-2015.12.06]], type [process] +MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double]]}] + at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388)`, + "[2015-12-06 01:44:53,646][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] create_mapping [filesystem]", + }) + + cancelInput() + env.waitUntilInputStops() +} + +// test_timeout from test_multiline.py +func TestParsersMultilineTimeout(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "multiline": map[string]interface{}{ + "type": "pattern", + "pattern": "^\\[", + "negate": true, + "match": "after", + "max_lines": 3, + "timeout": "100ms", // set to lower value to speed up test + }, + }, + }, + }) + + testlines := []byte(`[2015] hello world + First Line + Second Line +`) + env.mustWriteLinesToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(1) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + moreLines := []byte(` This should not be third + This should not be fourth +[2016] Hello world + First line again +`) + + env.mustAppendLinesToFile(testlogName, moreLines) + + env.requireEventsReceived([]string{ + `[2015] hello world + First Line + Second Line`, + }) + + env.waitUntilEventCount(3) + env.requireOffsetInRegistry(testlogName, len(testlines)+len(moreLines)) + env.requireEventsReceived([]string{`[2015] hello world + First Line + Second Line`, + ` This should not be third + This should not be fourth`, + `[2016] Hello world + First line again`, + }) + + cancelInput() + env.waitUntilInputStops() +} + +// test_max_bytes from test_multiline.py +func TestParsersMultilineMaxBytes(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "message_max_bytes": 50, + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "multiline": map[string]interface{}{ + "type": "pattern", + "pattern": "^\\[", + "negate": true, + "match": "after", + "timeout": "100ms", // set to lower value to speed up test + }, + }, + }, + }) + + testlines := []byte(elasticsearchMultilineLongLogs) + env.mustWriteLinesToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(3) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + env.requireEventsReceived([]string{ + "[2015-12-06 01:44:16,735][INFO ][node ", + "[2015-12-06 01:44:53,269][DEBUG][action.admin.indi", + "[2015-12-06 01:44:53,646][INFO ][cluster.metadata ", + }) + + cancelInput() + env.waitUntilInputStops() +} + +// test_close_timeout_with_multiline from test_multiline.py +func TestParsersCloseTimeoutWithMultiline(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "close.reader.after_interval": "100ms", + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "multiline": map[string]interface{}{ + "type": "pattern", + "pattern": "^\\[", + "negate": true, + "match": "after", + }, + }, + }, + }) + + testlines := []byte(`[2015] hello world + First Line + Second Line +`) + env.mustWriteLinesToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(1) + env.requireOffsetInRegistry(testlogName, len(testlines)) + env.waitUntilHarvesterIsDone() + + moreLines := []byte(` This should not be third + This should not be fourth +[2016] Hello world + First line again +`) + + env.mustAppendLinesToFile(testlogName, moreLines) + + env.requireEventsReceived([]string{ + `[2015] hello world + First Line + Second Line`, + }) + + env.waitUntilEventCount(3) + env.requireOffsetInRegistry(testlogName, len(testlines)+len(moreLines)) + env.requireEventsReceived([]string{`[2015] hello world + First Line + Second Line`, + ` This should not be third + This should not be fourth`, + `[2016] Hello world + First line again`, + }) + + cancelInput() + env.waitUntilInputStops() +} + +// test_consecutive_newline from test_multiline.py +func TestParsersConsecutiveNewline(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "multiline": map[string]interface{}{ + "type": "pattern", + "pattern": "^\\[", + "negate": true, + "match": "after", + "timeout": "100ms", // set to lower value to speed up test + }, + }, + }, + }) + + line1 := `[2016-09-02 19:54:23 +0000] Started 2016-09-02 19:54:23 +0000 "GET" for /gaq?path=%2FCA%2FFallbrook%2F1845-Acacia-Ln&referer=http%3A%2F%2Fwww.xxxxx.com%2FAcacia%2BLn%2BFallbrook%2BCA%2Baddresses&search_bucket=none&page_controller=v9%2Faddresses&page_action=show at 23.235.47.31 +X-Forwarded-For:72.197.227.93, 23.235.47.31 +Processing by GoogleAnalyticsController#index as JSON + + Parameters: {"path"=>"/CA/Fallbrook/1845-Acacia-Ln", "referer"=>"http://www.xxxx.com/Acacia+Ln+Fallbrook+CA+addresses", "search_bucket"=>"none", "page_controller"=>"v9/addresses", "page_action"=>"show"} +Completed 200 OK in 5ms (Views: 1.9ms) +` + line2 := `[2016-09-02 19:54:23 +0000] Started 2016-09-02 19:54:23 +0000 "GET" for /health_check at xxx.xx.44.181 +X-Forwarded-For: +SetAdCodeMiddleware.default_ad_code referer +SetAdCodeMiddleware.default_ad_code path /health_check +SetAdCodeMiddleware.default_ad_code route +` + testlines := append([]byte(line1), []byte(line2)...) + env.mustWriteLinesToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(2) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + env.requireEventsReceived([]string{ + line1[:len(line1)-1], + line2[:len(line2)-1], + }) + + cancelInput() + env.waitUntilInputStops() +} diff --git a/libbeat/reader/message.go b/libbeat/reader/message.go index 344eacd54df..5798c3a9869 100644 --- a/libbeat/reader/message.go +++ b/libbeat/reader/message.go @@ -30,6 +30,7 @@ type Message struct { Content []byte // actual content read Bytes int // total number of bytes read to generate the message Fields common.MapStr // optional fields that can be added by reader + Meta common.MapStr } // IsEmpty returns true in case the message is empty diff --git a/libbeat/reader/multiline/multiline_config.go b/libbeat/reader/multiline/multiline_config.go index d8b63b107e3..5b55b60a2df 100644 --- a/libbeat/reader/multiline/multiline_config.go +++ b/libbeat/reader/multiline/multiline_config.go @@ -80,6 +80,8 @@ func (c *Config) Validate() error { if c.Pattern == nil { return ErrMissingPattern } + } else { + return fmt.Errorf("unknown multiline type %d", c.Type) } return nil } diff --git a/libbeat/reader/readjson/json.go b/libbeat/reader/readjson/json.go index bbbbdeb3ade..370f46fdea4 100644 --- a/libbeat/reader/readjson/json.go +++ b/libbeat/reader/readjson/json.go @@ -37,6 +37,10 @@ type JSONReader struct { logger *logp.Logger } +type JSONPostProcessor struct { + cfg *Config +} + // NewJSONReader creates a new reader that can decode JSON. func NewJSONReader(r reader.Reader, cfg *Config) *JSONReader { return &JSONReader{ @@ -46,6 +50,10 @@ func NewJSONReader(r reader.Reader, cfg *Config) *JSONReader { } } +func NewJSONPostProcessor(cfg *Config) *JSONPostProcessor { + return &JSONPostProcessor{cfg} +} + // decodeJSON unmarshals the text parameter into a MapStr and // returns the new text column if one was requested. func (r *JSONReader) decode(text []byte) ([]byte, common.MapStr) { @@ -119,6 +127,69 @@ func createJSONError(message string) common.MapStr { return common.MapStr{"message": message, "type": "json"} } +func (pp *JSONPostProcessor) Name() string { + return "json" +} + +func (pp *JSONPostProcessor) PostProcess(msg *reader.Message) { + jsonFields, ok := msg.Fields[pp.Name()].(common.MapStr) + if !ok { + return + } + + // The message key might have been modified by multiline + if len(pp.cfg.MessageKey) > 0 && len(msg.Content) > 0 { + jsonFields[pp.cfg.MessageKey] = string(msg.Content) + } + + // handle the case in which r.cfg.AddErrorKey is set and len(jsonFields) == 1 + // and only thing it contains is `error` key due to error in json decoding + // which results in loss of message key in the main beat event + if len(jsonFields) == 1 && jsonFields["error"] != nil { + msg.Fields["message"] = string(msg.Content) + } + + var id string + if key := pp.cfg.DocumentID; key != "" { + if tmp, err := jsonFields.GetValue(key); err == nil { + if v, ok := tmp.(string); ok { + id = v + jsonFields.Delete(key) + } + } + } + + if pp.cfg.KeysUnderRoot { + // Delete existing json key + delete(msg.Fields, "json") + + var ts time.Time + if v, ok := jsonFields["@timestamp"]; ok { + switch t := v.(type) { + case time.Time: + ts = t + case common.Time: + ts = time.Time(ts) + } + delete(msg.Fields, "@timestamp") + + } + event := &beat.Event{ + Timestamp: ts, + Fields: msg.Fields, + } + jsontransform.WriteJSONKeys(event, jsonFields, pp.cfg.ExpandKeys, pp.cfg.OverwriteKeys, pp.cfg.AddErrorKey) + msg.Ts = event.Timestamp + } + + if id != "" { + if msg.Meta == nil { + msg.Meta = common.MapStr{} + } + msg.Meta["_id"] = id + } +} + // MergeJSONFields writes the JSON fields in the event map, // respecting the KeysUnderRoot, ExpandKeys, and OverwriteKeys configuration options. // If MessageKey is defined, the Text value from the event always