From 118bc11040645bf7bc1a56fd0d276c6b5a8f70e8 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Sun, 13 Nov 2022 10:59:46 -0500 Subject: [PATCH] libbeat - allow per beat.Client control of event normalization Control over the addition of the "generalizeEvent" processor into the publishing pipeline was only available at the Beat level. This adds a new option that can be set by input's when they create their beat.Client. This allows inputs to override the Beat's default behavior. My expected use case it to disable event normalization for inputs that are known to only produce beat.Events containing the standard data types expected by the processors and outputs (i.e. map[string]interface{} containing primitives, slices, or other map[string]interface{}). Inputs would want to disable the event normalization processor if they can because it adds unnecessary processing (recurses over the fields and often allocates). --- CHANGELOG-developer.next.asciidoc | 1 + libbeat/beat/pipeline.go | 4 +++ libbeat/publisher/processing/default.go | 8 +++-- libbeat/publisher/processing/default_test.go | 37 ++++++++++++++++++++ 4 files changed, 48 insertions(+), 2 deletions(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 8d9da05617f..10f3de4397e 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -147,6 +147,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Added `.python-version` file {pull}32323[32323] - Add support for multiple regions in GCP {pull}32964[32964] - Use `T.TempDir` to create temporary test directory {pull}33082[33082] +- Add an option to disable event normalization when creating a `beat.Client`. {pull}33657[33657] ==== Deprecated diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 24a276a5dac..aee5d0ce5ad 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -121,6 +121,10 @@ type ProcessingConfig struct { // Disables the addition of host.name if it was enabled for the publisher. DisableHost bool + // EventNormalization controls whether the event normalization processor + // is applied to events. If nil the Beat's default behavior prevails. + EventNormalization *bool + // Private contains additional information to be passed to the processing // pipeline builder. Private interface{} diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index ea8b727b6ec..69cebfbae6d 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -288,8 +288,12 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, builtin = tmp } - if !b.skipNormalize { - // setup 1: generalize/normalize output (P) + // setup 1: generalize/normalize output (P) + if cfg.EventNormalization != nil { + if *cfg.EventNormalization { + processors.add(newGeneralizeProcessor(cfg.KeepNull)) + } + } else if !b.skipNormalize { processors.add(newGeneralizeProcessor(cfg.KeepNull)) } diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go index 587ebc8390f..97b59e38dd7 100644 --- a/libbeat/publisher/processing/default_test.go +++ b/libbeat/publisher/processing/default_test.go @@ -280,6 +280,43 @@ func TestProcessorsConfigs(t *testing.T) { } } +// TestEventNormalizationOverride verifies that the EventNormalization option +// in beat.ProcessingConfig overrides the "skipNormalize" setting that is +// specified in the builder (this is the default value set by the Beat). +func TestEventNormalizationOverride(t *testing.T) { + boolPtr := func(b bool) *bool { return &b } + + testCases := []struct { + skipNormalize bool + normalizeOverride *bool + hasGeneralizeProcessor bool + }{ + {false, nil, true}, + {false, boolPtr(false), false}, + {false, boolPtr(true), true}, + {true, nil, false}, + {true, boolPtr(false), false}, + {true, boolPtr(true), true}, + } + + for _, tc := range testCases { + builder, err := newBuilder(beat.Info{}, logp.NewLogger(""), nil, mapstr.EventMetadata{}, nil, tc.skipNormalize, false) + require.NoError(t, err) + + processor, err := builder.Create(beat.ProcessingConfig{EventNormalization: tc.normalizeOverride}, false) + require.NoError(t, err) + group := processor.(*group) + + if tc.hasGeneralizeProcessor { + if assert.NotEmpty(t, group.list) { + assert.Equal(t, "generalizeEvent", group.list[0].String()) + } + } else { + assert.Empty(t, group.list) + } + } +} + func TestNormalization(t *testing.T) { cases := map[string]struct { normalize bool