Skip to content

Commit

Permalink
Introducing preprocessors on the module input override which are run …
Browse files Browse the repository at this point in the history
…after the input but before the processors in the module so some pre transformation can occur.
  • Loading branch information
mjmbischoff committed Jul 30, 2021
1 parent 777daee commit a3761d3
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 0 deletions.
35 changes: 35 additions & 0 deletions filebeat/docs/filebeat-modules-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,39 @@ You can also enable `close_eof` for all inputs created by any of the modules:
-M "*.*.input.close_eof=true"
----------------------------------------------------------------------

==== Processors

As described in <<defining-processors.html#where-valid,Where are processors valid?>> you can define `processors`
under the input section of the module definition. These processors are run _after_ any processors defined in the
module.

[source,yaml]
----------------------------------------------------------------------
- module: nginx
access:
input:
processors:
- drop_event:
when:
contains:
source: "test"
----------------------------------------------------------------------

Alternatively you can define `preprocessors` which are run after the input but _before_ the processors defined
in the module. Preprocessors can be useful if we don't retrieve the data directly from the log source and the
intermediate channel introduces additional 'wrapping'.

[source,yaml]
----------------------------------------------------------------------
- module: nginx
access:
input:
preprocessors:
- drop_event:
when:
contains:
source: "test"
----------------------------------------------------------------------


:modulename!:
14 changes: 14 additions & 0 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,21 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) {
if err != nil {
return nil, fmt.Errorf("Error creating config from input overrides: %v", err)
}

cfg, err = common.MergeConfigsWithOptions([]*common.Config{cfg, overrides}, ucfg.FieldReplaceValues("**.paths"), ucfg.FieldAppendValues("**.processors"))
if overrides.HasField("preprocessors") {
preprocessors, err := overrides.Child("preprocessors", -1)
if err != nil {
return nil, fmt.Errorf("Error creating config from input overrides: %v", err)
}
overrides, err = common.NewConfigFrom(map[string]interface{}{
"processors": preprocessors,
})
if err != nil {
return nil, fmt.Errorf("Error creating config from input overrides: %v", err)
}
cfg, err = common.MergeConfigsWithOptions([]*common.Config{cfg, overrides}, ucfg.FieldReplaceValues("**.paths"), ucfg.FieldPrependValues("**.processors"))
}
if err != nil {
return nil, fmt.Errorf("Error applying config overrides: %v", err)
}
Expand Down
94 changes: 94 additions & 0 deletions filebeat/fileset/fileset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,100 @@ func TestGetInputConfigNginxOverrides(t *testing.T) {
require.Equal(t, "foobar", v)
},
},
"processors": {
map[string]interface{}{
"processors": []map[string]interface{}{
map[string]interface{}{
"drop_event": map[string]interface{}{
"when": map[string]interface{}{
"contains": map[string]interface{}{
"source": "test",
},
},
},
},
},
},
func(t require.TestingT, cfg interface{}, rest ...interface{}) {
c, ok := cfg.(*common.Config)
if !ok {
t.FailNow()
}

require.True(t, c.HasField("processors"))
count, _ := c.CountField("processors")
require.Equal(t, 3, count)
processors, err := c.Child("processors", -1)
require.NoError(t, err)
dropProcessor, err := processors.Child("", 2)
require.NoError(t, err)
require.True(t, dropProcessor.HasField("drop_event"), "should contain 'drop_event' last")
},
},
"preprocessors": {
map[string]interface{}{
"close_eof": true,
"preprocessors": []map[string]interface{}{
map[string]interface{}{
"drop_event": map[string]interface{}{
"when": map[string]interface{}{
"contains": map[string]interface{}{
"source": "test",
},
},
},
},
},
},
func(t require.TestingT, cfg interface{}, rest ...interface{}) {
c, ok := cfg.(*common.Config)
if !ok {
t.FailNow()
}

require.True(t, c.HasField("processors"))
count, _ := c.CountField("processors")
require.Equal(t, 3, count, "Expecting processors to be merged 2 from module 1 from preprocessor")
processors, err := c.Child("processors", -1)
require.NoError(t, err)
addFieldsProcessor, err := processors.Child("", 2)
require.NoError(t, err)
require.True(t, addFieldsProcessor.HasField("add_fields"), "should contain 'add_fields' last")
},
},
"combinedProcessors": {
map[string]interface{}{
"close_eof": true,
"processors": []map[string]interface{}{
map[string]interface{}{
"drop_event": map[string]interface{}{},
},
},
"preprocessors": []map[string]interface{}{
map[string]interface{}{
"drop_event": map[string]interface{}{},
},
},
},
func(t require.TestingT, cfg interface{}, rest ...interface{}) {
c, ok := cfg.(*common.Config)
if !ok {
t.FailNow()
}

require.True(t, c.HasField("processors"))
count, _ := c.CountField("processors")
require.Equal(t, 4, count, "Expecting processors to be merged: 2 from module, 1 from preprocessor, 1 from processor")
processors, err := c.Child("processors", -1)
require.NoError(t, err)
dropProcessor, err := processors.Child("", 0)
require.NoError(t, err)
require.True(t, dropProcessor.HasField("drop_event"), "should contain 'drop_event' first")
dropProcessor, err = processors.Child("", 3)
require.NoError(t, err)
require.True(t, dropProcessor.HasField("drop_event"), "should contain 'drop_event' last")
},
},
}

for name, test := range tests {
Expand Down

0 comments on commit a3761d3

Please sign in to comment.