Skip to content

Commit

Permalink
Add support for parsers in filestream input (elastic#24763)
Browse files Browse the repository at this point in the history
This PR adds support for pasers in the `filestream` input.

Example configuration that aggregates fives lines into a single event and parses the JSON contents:
```yaml
- type: filestream
  enabled: true
  paths:
  - test.log
  parsers:
  - multiline:
      type: count
      count_lines: 5
      skip_newline: true
  - ndjson:
      fields_under_root: true
```
  • Loading branch information
kvch authored Apr 22, 2021
1 parent 7e1d9e4 commit 30331bc
Show file tree
Hide file tree
Showing 10 changed files with 1,200 additions and 26 deletions.
28 changes: 6 additions & 22 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type readerConfig struct {
MaxBytes int `config:"message_max_bytes" validate:"min=0,nonzero"`
Tail bool `config:"seek_to_tail"`

Parsers []*common.ConfigNamespace `config:"parsers"` // TODO multiline, json, syslog?
Parsers []common.ConfigNamespace `config:"parsers"`
}

type backoffConfig struct {
Expand Down Expand Up @@ -115,34 +115,18 @@ func defaultReaderConfig() readerConfig {
LineTerminator: readfile.AutoLineTerminator,
MaxBytes: 10 * humanize.MiByte,
Tail: false,
Parsers: nil,
Parsers: make([]common.ConfigNamespace, 0),
}
}

func (c *config) Validate() error {
if len(c.Paths) == 0 {
return fmt.Errorf("no path is configured")
}
// TODO
//if c.CleanInactive != 0 && c.IgnoreOlder == 0 {
// return fmt.Errorf("ignore_older must be enabled when clean_inactive is used")
//}

// TODO
//if c.CleanInactive != 0 && c.CleanInactive <= c.IgnoreOlder+c.ScanFrequency {
// return fmt.Errorf("clean_inactive must be > ignore_older + scan_frequency to make sure only files which are not monitored anymore are removed")
//}

// TODO
//if c.JSON != nil && len(c.JSON.MessageKey) == 0 &&
// c.Multiline != nil {
// return fmt.Errorf("When using the JSON decoder and multiline together, you need to specify a message_key value")
//}

//if c.JSON != nil && len(c.JSON.MessageKey) == 0 &&
// (len(c.IncludeLines) > 0 || len(c.ExcludeLines) > 0) {
// return fmt.Errorf("When using the JSON decoder and line filtering together, you need to specify a message_key value")
//}

if err := validateParserConfig(parserConfig{maxBytes: c.Reader.MaxBytes, lineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil {
return fmt.Errorf("cannot parse parser configuration: %+v", err)
}

return nil
}
33 changes: 33 additions & 0 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,39 @@ func (e *inputTestingEnvironment) getOutputMessages() []string {
return messages
}

func (e *inputTestingEnvironment) requireEventContents(nr int, key, value string) {
events := make([]beat.Event, 0)
for _, c := range e.pipeline.clients {
for _, evt := range c.GetEvents() {
events = append(events, evt)
}
}

selectedEvent := events[nr]
v, err := selectedEvent.Fields.GetValue(key)
if err != nil {
e.t.Fatalf("cannot find key %s in event %+v", key, selectedEvent)
}

val, ok := v.(string)
if !ok {
e.t.Fatalf("value is not string %+v", v)
}
require.Equal(e.t, value, val)
}

func (e *inputTestingEnvironment) requireEventTimestamp(nr int, ts string) {
events := make([]beat.Event, 0)
for _, c := range e.pipeline.clients {
for _, evt := range c.GetEvents() {
events = append(events, evt)
}
}

selectedEvent := events[nr]
require.Equal(e.t, ts, selectedEvent.Timestamp.String())
}

type testInputStore struct {
registry *statestore.Registry
}
Expand Down
89 changes: 89 additions & 0 deletions filebeat/input/filestream/example_inputs_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build integration

package filestream

const (
elasticsearchMultilineLogs = `[2015-12-06 01:44:16,735][INFO ][node ] [Zach] version[2.0.0], pid[48553], build[de54438/2015-10-22T08:09:48Z]
[2015-12-06 01:44:16,736][INFO ][node ] [Zach] initializing ...
[2015-12-06 01:44:16,804][INFO ][plugins ] [Zach] loaded [], sites []
[2015-12-06 01:44:16,941][INFO ][env ] [Zach] using [1] data paths, mounts [[/ (/dev/disk1)]], net usable_space [66.3gb], net total_space [232.6gb], spins? [unknown], types [hfs]
[2015-12-06 01:44:19,177][INFO ][node ] [Zach] initialized
[2015-12-06 01:44:19,177][INFO ][node ] [Zach] starting ...
[2015-12-06 01:44:19,356][INFO ][transport ] [Zach] publish_address {127.0.0.1:9300}, bound_addresses {127.0.0.1:9300}, {[fe80::1]:9300}, {[::1]:9300}
[2015-12-06 01:44:19,367][INFO ][discovery ] [Zach] elasticsearch/qfPw9z0HQe6grbJQruTCJQ
[2015-12-06 01:44:22,405][INFO ][cluster.service ] [Zach] new_master {Zach}{qfPw9z0HQe6grbJQruTCJQ}{127.0.0.1}{127.0.0.1:9300}, reason: zen-disco-join(elected_as_master, [0] joins received)
[2015-12-06 01:44:22,432][INFO ][http ] [Zach] publish_address {127.0.0.1:9200}, bound_addresses {127.0.0.1:9200}, {[fe80::1]:9200}, {[::1]:9200}
[2015-12-06 01:44:22,432][INFO ][node ] [Zach] started
[2015-12-06 01:44:22,446][INFO ][gateway ] [Zach] recovered [0] indices into cluster_state
[2015-12-06 01:44:52,882][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] creating index, cause [auto(bulk api)], templates [], shards [5]/[1], mappings [process, system]
[2015-12-06 01:44:53,256][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] update_mapping [process]
[2015-12-06 01:44:53,269][DEBUG][action.admin.indices.mapping.put] [Zach] failed to put mappings on indices [[filebeat-2015.12.06]], type [process]
MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double]]}]
at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388)
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2015-12-06 01:44:53,274][DEBUG][action.bulk ] [Zach] [filebeat-2015.12.06][0] failed to execute bulk item (index) index {[filebeat-2015.12.06][process][AVF0v5vcVA0hoJdODMTz], source[{"@timestamp":"2015-12-06T00:44:52.448Z","beat":{"hostname":"ruflin","name":"ruflin"},"count":1,"proc":{"cpu":{"user":1902,"user_p":0,"system":941,"total":2843,"start_time":"Dec03"},"mem":{"size":3616309248,"rss":156405760,"rss_p":0.01,"share":0},"name":"Google Chrome H","pid":40572,"ppid":392,"state":"running"},"type":"process"}]}
MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double]]}]
at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388)
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2015-12-06 01:44:53,279][DEBUG][action.admin.indices.mapping.put] [Zach] failed to put mappings on indices [[filebeat-2015.12.06]], type [process]
MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double], mapper [proc.cpu.user_p] of different type, current_type [long], merged_type [double]]}]
at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388)
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2015-12-06 01:44:53,280][DEBUG][action.bulk ] [Zach] [filebeat-2015.12.06][1] failed to execute bulk item (index) index {[filebeat-2015.12.06][process][AVF0v5vbVA0hoJdODMTj], source[{"@timestamp":"2015-12-06T00:44:52.416Z","beat":{"hostname":"ruflin","name":"ruflin"},"count":1,"proc":{"cpu":{"user":6643,"user_p":0.01,"system":693,"total":7336,"start_time":"01:44"},"mem":{"size":5182656512,"rss":248872960,"rss_p":0.01,"share":0},"name":"java","pid":48553,"ppid":48547,"state":"running"},"type":"process"}]}
MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double], mapper [proc.cpu.user_p] of different type, current_type [long], merged_type [double]]}]
at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388)
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2015-12-06 01:44:53,334][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] update_mapping [system]
[2015-12-06 01:44:53,646][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] create_mapping [filesystem]
`

elasticsearchMultilineLongLogs = `[2015-12-06 01:44:16,735][INFO ][node ] [Zach] version[2.0.0], pid[48553], build[de54438/2015-10-22T08:09:48Z]
[2015-12-06 01:44:53,269][DEBUG][action.admin.indices.mapping.put] [Zach] failed to put mappings on indices [[filebeat-2015.12.06]], type [process]
MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double]]}]
at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388)
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2015-12-06 01:44:53,646][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] create_mapping [filesystem]
`
)
26 changes: 22 additions & 4 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type filestream struct {
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
closerConfig closerConfig
parserConfig []common.ConfigNamespace
msgPostProc []postProcesser
}

// Plugin creates a new filestream input plugin for creating a stateful input.
Expand Down Expand Up @@ -216,8 +218,16 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri
}

r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator)

r, err = newParsers(r, parserConfig{maxBytes: inp.readerConfig.MaxBytes, lineTerminator: inp.readerConfig.LineTerminator}, inp.readerConfig.Parsers)
if err != nil {
return nil, err
}

r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes)

inp.msgPostProc = newPostProcessors(inp.readerConfig.Parsers)

return r, nil
}

Expand Down Expand Up @@ -369,16 +379,24 @@ func (inp *filestream) eventFromMessage(m reader.Message, path string) beat.Even
},
}
fields.DeepUpdate(m.Fields)
m.Fields = fields

for _, proc := range inp.msgPostProc {
proc.PostProcess(&m)
}

if len(m.Content) > 0 {
if fields == nil {
fields = common.MapStr{}
if m.Fields == nil {
m.Fields = common.MapStr{}
}
if _, ok := m.Fields["message"]; !ok {
m.Fields["message"] = string(m.Content)
}
fields["message"] = string(m.Content)
}

return beat.Event{
Timestamp: m.Ts,
Fields: fields,
Meta: m.Meta,
Fields: m.Fields,
}
}
140 changes: 140 additions & 0 deletions filebeat/input/filestream/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filestream

import (
"errors"
"fmt"
"io"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/multiline"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
"github.com/elastic/beats/v7/libbeat/reader/readjson"
)

var (
ErrNoSuchParser = errors.New("no such parser")
)

// parser transforms or translates the Content attribute of a Message.
// They are able to aggregate two or more Messages into a single one.
type parser interface {
io.Closer
Next() (reader.Message, error)
}

type parserConfig struct {
maxBytes int
lineTerminator readfile.LineTerminator
}

type postProcesser interface {
PostProcess(*reader.Message)
Name() string
}

func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) (parser, error) {
p := in

parserCheck := make(map[string]int)
for _, ns := range c {
name := ns.Name()
switch name {
case "multiline":
parserCheck["multiline"]++
var config multiline.Config
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("error while parsing multiline parser config: %+v", err)
}
p, err = multiline.New(p, "\n", pCfg.maxBytes, &config)
if err != nil {
return nil, fmt.Errorf("error while creating multiline parser: %+v", err)
}
case "ndjson":
parserCheck["ndjson"]++
var config readjson.Config
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("error while parsing ndjson parser config: %+v", err)
}
p = readjson.NewJSONReader(p, &config)
default:
return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name)
}
}

// This is a temporary check. In the long run configuring multiple parsers with the same
// type is going to be supported.
if count, ok := parserCheck["multiline"]; ok && count > 1 {
return nil, fmt.Errorf("only one parser is allowed for multiline, got %d", count)
}
if count, ok := parserCheck["ndjson"]; ok && count > 1 {
return nil, fmt.Errorf("only one parser is allowed for ndjson, got %d", count)
}

return p, nil
}

func newPostProcessors(c []common.ConfigNamespace) []postProcesser {
var pp []postProcesser
for _, ns := range c {
name := ns.Name()
switch name {
case "ndjson":
var config readjson.Config
cfg := ns.Config()
cfg.Unpack(&config)
pp = append(pp, readjson.NewJSONPostProcessor(&config))
default:
continue
}
}

return pp
}

func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error {
for _, ns := range c {
name := ns.Name()
switch name {
case "multiline":
var config multiline.Config
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return fmt.Errorf("error while parsing multiline parser config: %+v", err)
}
case "ndjson":
var config readjson.Config
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return fmt.Errorf("error while parsing ndjson parser config: %+v", err)
}
default:
return fmt.Errorf("%s: %s", ErrNoSuchParser, name)
}
}

return nil
}
Loading

0 comments on commit 30331bc

Please sign in to comment.