From a3761d36b925abe2727c9f2413d574c9ee5bdc4a Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Fri, 30 Jul 2021 15:00:01 +0200 Subject: [PATCH 1/2] Introducing preprocessors on the module input override which are run after the input but before the processors in the module so some pre transformation can occur. --- .../docs/filebeat-modules-options.asciidoc | 35 +++++++ filebeat/fileset/fileset.go | 14 +++ filebeat/fileset/fileset_test.go | 94 +++++++++++++++++++ 3 files changed, 143 insertions(+) diff --git a/filebeat/docs/filebeat-modules-options.asciidoc b/filebeat/docs/filebeat-modules-options.asciidoc index bba71a69163..69cfa27756d 100644 --- a/filebeat/docs/filebeat-modules-options.asciidoc +++ b/filebeat/docs/filebeat-modules-options.asciidoc @@ -121,4 +121,39 @@ You can also enable `close_eof` for all inputs created by any of the modules: -M "*.*.input.close_eof=true" ---------------------------------------------------------------------- +==== Processors + +As described in <> you can define `processors` +under the input section of the module definition. These processors are run _after_ any processors defined in the +module. + +[source,yaml] +---------------------------------------------------------------------- +- module: nginx + access: + input: + processors: + - drop_event: + when: + contains: + source: "test" +---------------------------------------------------------------------- + +Alternatively you can define `preprocessors` which are run after the input but _before_ the processors defined +in the module. Preprocessors can be useful if we don't retrieve the data directly from the log source and the +intermediate channel introduces additional 'wrapping'. + +[source,yaml] +---------------------------------------------------------------------- +- module: nginx + access: + input: + preprocessors: + - drop_event: + when: + contains: + source: "test" +---------------------------------------------------------------------- + + :modulename!: diff --git a/filebeat/fileset/fileset.go b/filebeat/fileset/fileset.go index 09a2738bb17..a325bbfccdc 100644 --- a/filebeat/fileset/fileset.go +++ b/filebeat/fileset/fileset.go @@ -373,7 +373,21 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) { if err != nil { return nil, fmt.Errorf("Error creating config from input overrides: %v", err) } + cfg, err = common.MergeConfigsWithOptions([]*common.Config{cfg, overrides}, ucfg.FieldReplaceValues("**.paths"), ucfg.FieldAppendValues("**.processors")) + if overrides.HasField("preprocessors") { + preprocessors, err := overrides.Child("preprocessors", -1) + if err != nil { + return nil, fmt.Errorf("Error creating config from input overrides: %v", err) + } + overrides, err = common.NewConfigFrom(map[string]interface{}{ + "processors": preprocessors, + }) + if err != nil { + return nil, fmt.Errorf("Error creating config from input overrides: %v", err) + } + cfg, err = common.MergeConfigsWithOptions([]*common.Config{cfg, overrides}, ucfg.FieldReplaceValues("**.paths"), ucfg.FieldPrependValues("**.processors")) + } if err != nil { return nil, fmt.Errorf("Error applying config overrides: %v", err) } diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go index 430d71e0db7..71cca23ea05 100644 --- a/filebeat/fileset/fileset_test.go +++ b/filebeat/fileset/fileset_test.go @@ -234,6 +234,100 @@ func TestGetInputConfigNginxOverrides(t *testing.T) { require.Equal(t, "foobar", v) }, }, + "processors": { + map[string]interface{}{ + "processors": []map[string]interface{}{ + map[string]interface{}{ + "drop_event": map[string]interface{}{ + "when": map[string]interface{}{ + "contains": map[string]interface{}{ + "source": "test", + }, + }, + }, + }, + }, + }, + func(t require.TestingT, cfg interface{}, rest ...interface{}) { + c, ok := cfg.(*common.Config) + if !ok { + t.FailNow() + } + + require.True(t, c.HasField("processors")) + count, _ := c.CountField("processors") + require.Equal(t, 3, count) + processors, err := c.Child("processors", -1) + require.NoError(t, err) + dropProcessor, err := processors.Child("", 2) + require.NoError(t, err) + require.True(t, dropProcessor.HasField("drop_event"), "should contain 'drop_event' last") + }, + }, + "preprocessors": { + map[string]interface{}{ + "close_eof": true, + "preprocessors": []map[string]interface{}{ + map[string]interface{}{ + "drop_event": map[string]interface{}{ + "when": map[string]interface{}{ + "contains": map[string]interface{}{ + "source": "test", + }, + }, + }, + }, + }, + }, + func(t require.TestingT, cfg interface{}, rest ...interface{}) { + c, ok := cfg.(*common.Config) + if !ok { + t.FailNow() + } + + require.True(t, c.HasField("processors")) + count, _ := c.CountField("processors") + require.Equal(t, 3, count, "Expecting processors to be merged 2 from module 1 from preprocessor") + processors, err := c.Child("processors", -1) + require.NoError(t, err) + addFieldsProcessor, err := processors.Child("", 2) + require.NoError(t, err) + require.True(t, addFieldsProcessor.HasField("add_fields"), "should contain 'add_fields' last") + }, + }, + "combinedProcessors": { + map[string]interface{}{ + "close_eof": true, + "processors": []map[string]interface{}{ + map[string]interface{}{ + "drop_event": map[string]interface{}{}, + }, + }, + "preprocessors": []map[string]interface{}{ + map[string]interface{}{ + "drop_event": map[string]interface{}{}, + }, + }, + }, + func(t require.TestingT, cfg interface{}, rest ...interface{}) { + c, ok := cfg.(*common.Config) + if !ok { + t.FailNow() + } + + require.True(t, c.HasField("processors")) + count, _ := c.CountField("processors") + require.Equal(t, 4, count, "Expecting processors to be merged: 2 from module, 1 from preprocessor, 1 from processor") + processors, err := c.Child("processors", -1) + require.NoError(t, err) + dropProcessor, err := processors.Child("", 0) + require.NoError(t, err) + require.True(t, dropProcessor.HasField("drop_event"), "should contain 'drop_event' first") + dropProcessor, err = processors.Child("", 3) + require.NoError(t, err) + require.True(t, dropProcessor.HasField("drop_event"), "should contain 'drop_event' last") + }, + }, } for name, test := range tests { From dc5772b7c18e29df2c82fbcfabff3084546106b4 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Fri, 30 Jul 2021 16:50:24 +0200 Subject: [PATCH 2/2] fixing doc reference --- filebeat/docs/filebeat-modules-options.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/docs/filebeat-modules-options.asciidoc b/filebeat/docs/filebeat-modules-options.asciidoc index 69cfa27756d..d66028a1762 100644 --- a/filebeat/docs/filebeat-modules-options.asciidoc +++ b/filebeat/docs/filebeat-modules-options.asciidoc @@ -123,7 +123,7 @@ You can also enable `close_eof` for all inputs created by any of the modules: ==== Processors -As described in <> you can define `processors` +As described in <> you can define `processors` under the input section of the module definition. These processors are run _after_ any processors defined in the module.