diff --git a/filebeat/docs/filebeat-modules-options.asciidoc b/filebeat/docs/filebeat-modules-options.asciidoc index bba71a69163..69cfa27756d 100644 --- a/filebeat/docs/filebeat-modules-options.asciidoc +++ b/filebeat/docs/filebeat-modules-options.asciidoc @@ -121,4 +121,39 @@ You can also enable `close_eof` for all inputs created by any of the modules: -M "*.*.input.close_eof=true" ---------------------------------------------------------------------- +==== Processors + +As described in <> you can define `processors` +under the input section of the module definition. These processors are run _after_ any processors defined in the +module. + +[source,yaml] +---------------------------------------------------------------------- +- module: nginx + access: + input: + processors: + - drop_event: + when: + contains: + source: "test" +---------------------------------------------------------------------- + +Alternatively you can define `preprocessors` which are run after the input but _before_ the processors defined +in the module. Preprocessors can be useful if we don't retrieve the data directly from the log source and the +intermediate channel introduces additional 'wrapping'. + +[source,yaml] +---------------------------------------------------------------------- +- module: nginx + access: + input: + preprocessors: + - drop_event: + when: + contains: + source: "test" +---------------------------------------------------------------------- + + :modulename!: diff --git a/filebeat/fileset/fileset.go b/filebeat/fileset/fileset.go index 09a2738bb17..a325bbfccdc 100644 --- a/filebeat/fileset/fileset.go +++ b/filebeat/fileset/fileset.go @@ -373,7 +373,21 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) { if err != nil { return nil, fmt.Errorf("Error creating config from input overrides: %v", err) } + cfg, err = common.MergeConfigsWithOptions([]*common.Config{cfg, overrides}, ucfg.FieldReplaceValues("**.paths"), ucfg.FieldAppendValues("**.processors")) + if overrides.HasField("preprocessors") { + preprocessors, err := overrides.Child("preprocessors", -1) + if err != nil { + return nil, fmt.Errorf("Error creating config from input overrides: %v", err) + } + overrides, err = common.NewConfigFrom(map[string]interface{}{ + "processors": preprocessors, + }) + if err != nil { + return nil, fmt.Errorf("Error creating config from input overrides: %v", err) + } + cfg, err = common.MergeConfigsWithOptions([]*common.Config{cfg, overrides}, ucfg.FieldReplaceValues("**.paths"), ucfg.FieldPrependValues("**.processors")) + } if err != nil { return nil, fmt.Errorf("Error applying config overrides: %v", err) } diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go index 430d71e0db7..71cca23ea05 100644 --- a/filebeat/fileset/fileset_test.go +++ b/filebeat/fileset/fileset_test.go @@ -234,6 +234,100 @@ func TestGetInputConfigNginxOverrides(t *testing.T) { require.Equal(t, "foobar", v) }, }, + "processors": { + map[string]interface{}{ + "processors": []map[string]interface{}{ + map[string]interface{}{ + "drop_event": map[string]interface{}{ + "when": map[string]interface{}{ + "contains": map[string]interface{}{ + "source": "test", + }, + }, + }, + }, + }, + }, + func(t require.TestingT, cfg interface{}, rest ...interface{}) { + c, ok := cfg.(*common.Config) + if !ok { + t.FailNow() + } + + require.True(t, c.HasField("processors")) + count, _ := c.CountField("processors") + require.Equal(t, 3, count) + processors, err := c.Child("processors", -1) + require.NoError(t, err) + dropProcessor, err := processors.Child("", 2) + require.NoError(t, err) + require.True(t, dropProcessor.HasField("drop_event"), "should contain 'drop_event' last") + }, + }, + "preprocessors": { + map[string]interface{}{ + "close_eof": true, + "preprocessors": []map[string]interface{}{ + map[string]interface{}{ + "drop_event": map[string]interface{}{ + "when": map[string]interface{}{ + "contains": map[string]interface{}{ + "source": "test", + }, + }, + }, + }, + }, + }, + func(t require.TestingT, cfg interface{}, rest ...interface{}) { + c, ok := cfg.(*common.Config) + if !ok { + t.FailNow() + } + + require.True(t, c.HasField("processors")) + count, _ := c.CountField("processors") + require.Equal(t, 3, count, "Expecting processors to be merged 2 from module 1 from preprocessor") + processors, err := c.Child("processors", -1) + require.NoError(t, err) + addFieldsProcessor, err := processors.Child("", 2) + require.NoError(t, err) + require.True(t, addFieldsProcessor.HasField("add_fields"), "should contain 'add_fields' last") + }, + }, + "combinedProcessors": { + map[string]interface{}{ + "close_eof": true, + "processors": []map[string]interface{}{ + map[string]interface{}{ + "drop_event": map[string]interface{}{}, + }, + }, + "preprocessors": []map[string]interface{}{ + map[string]interface{}{ + "drop_event": map[string]interface{}{}, + }, + }, + }, + func(t require.TestingT, cfg interface{}, rest ...interface{}) { + c, ok := cfg.(*common.Config) + if !ok { + t.FailNow() + } + + require.True(t, c.HasField("processors")) + count, _ := c.CountField("processors") + require.Equal(t, 4, count, "Expecting processors to be merged: 2 from module, 1 from preprocessor, 1 from processor") + processors, err := c.Child("processors", -1) + require.NoError(t, err) + dropProcessor, err := processors.Child("", 0) + require.NoError(t, err) + require.True(t, dropProcessor.HasField("drop_event"), "should contain 'drop_event' first") + dropProcessor, err = processors.Child("", 3) + require.NoError(t, err) + require.True(t, dropProcessor.HasField("drop_event"), "should contain 'drop_event' last") + }, + }, } for name, test := range tests {