From 118bc11040645bf7bc1a56fd0d276c6b5a8f70e8 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Sun, 13 Nov 2022 10:59:46 -0500 Subject: [PATCH 1/6] libbeat - allow per beat.Client control of event normalization Control over the addition of the "generalizeEvent" processor into the publishing pipeline was only available at the Beat level. This adds a new option that can be set by input's when they create their beat.Client. This allows inputs to override the Beat's default behavior. My expected use case it to disable event normalization for inputs that are known to only produce beat.Events containing the standard data types expected by the processors and outputs (i.e. map[string]interface{} containing primitives, slices, or other map[string]interface{}). Inputs would want to disable the event normalization processor if they can because it adds unnecessary processing (recurses over the fields and often allocates). --- CHANGELOG-developer.next.asciidoc | 1 + libbeat/beat/pipeline.go | 4 +++ libbeat/publisher/processing/default.go | 8 +++-- libbeat/publisher/processing/default_test.go | 37 ++++++++++++++++++++ 4 files changed, 48 insertions(+), 2 deletions(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 8d9da05617f..10f3de4397e 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -147,6 +147,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Added `.python-version` file {pull}32323[32323] - Add support for multiple regions in GCP {pull}32964[32964] - Use `T.TempDir` to create temporary test directory {pull}33082[33082] +- Add an option to disable event normalization when creating a `beat.Client`. {pull}33657[33657] ==== Deprecated diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 24a276a5dac..aee5d0ce5ad 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -121,6 +121,10 @@ type ProcessingConfig struct { // Disables the addition of host.name if it was enabled for the publisher. DisableHost bool + // EventNormalization controls whether the event normalization processor + // is applied to events. If nil the Beat's default behavior prevails. + EventNormalization *bool + // Private contains additional information to be passed to the processing // pipeline builder. Private interface{} diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index ea8b727b6ec..69cebfbae6d 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -288,8 +288,12 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, builtin = tmp } - if !b.skipNormalize { - // setup 1: generalize/normalize output (P) + // setup 1: generalize/normalize output (P) + if cfg.EventNormalization != nil { + if *cfg.EventNormalization { + processors.add(newGeneralizeProcessor(cfg.KeepNull)) + } + } else if !b.skipNormalize { processors.add(newGeneralizeProcessor(cfg.KeepNull)) } diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go index 587ebc8390f..97b59e38dd7 100644 --- a/libbeat/publisher/processing/default_test.go +++ b/libbeat/publisher/processing/default_test.go @@ -280,6 +280,43 @@ func TestProcessorsConfigs(t *testing.T) { } } +// TestEventNormalizationOverride verifies that the EventNormalization option +// in beat.ProcessingConfig overrides the "skipNormalize" setting that is +// specified in the builder (this is the default value set by the Beat). +func TestEventNormalizationOverride(t *testing.T) { + boolPtr := func(b bool) *bool { return &b } + + testCases := []struct { + skipNormalize bool + normalizeOverride *bool + hasGeneralizeProcessor bool + }{ + {false, nil, true}, + {false, boolPtr(false), false}, + {false, boolPtr(true), true}, + {true, nil, false}, + {true, boolPtr(false), false}, + {true, boolPtr(true), true}, + } + + for _, tc := range testCases { + builder, err := newBuilder(beat.Info{}, logp.NewLogger(""), nil, mapstr.EventMetadata{}, nil, tc.skipNormalize, false) + require.NoError(t, err) + + processor, err := builder.Create(beat.ProcessingConfig{EventNormalization: tc.normalizeOverride}, false) + require.NoError(t, err) + group := processor.(*group) + + if tc.hasGeneralizeProcessor { + if assert.NotEmpty(t, group.list) { + assert.Equal(t, "generalizeEvent", group.list[0].String()) + } + } else { + assert.Empty(t, group.list) + } + } +} + func TestNormalization(t *testing.T) { cases := map[string]struct { normalize bool From 78a4711cbbbb42639da35e9ce526b959506bee20 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Sun, 13 Nov 2022 11:25:34 -0500 Subject: [PATCH 2/6] lint / misspell - fix spelling --- libbeat/beat/pipeline.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index aee5d0ce5ad..c6702257833 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -69,10 +69,10 @@ type ClientConfig struct { // operations on ACKer are normally executed in different go routines. ACKers // are required to be multi-threading safe. type ACKer interface { - // AddEvent informs the ACKer that a new event has been send to the client. + // AddEvent informs the ACKer that a new event has been sent to the client. // AddEvent is called after the processors have handled the event. If the // event has been dropped by the processor `published` will be set to true. - // This allows the ACKer to do some bookeeping for dropped events. + // This allows the ACKer to do some bookkeeping for dropped events. AddEvent(event Event, published bool) // ACK Events from the output and pipeline queue are forwarded to ACKEvents. @@ -83,7 +83,7 @@ type ACKer interface { // Close informs the ACKer that the Client used to publish to the pipeline has been closed. // No new events should be published anymore. The ACKEvents method still will be actively called // as long as there are pending events for the client in the pipeline. The Close signal can be used - // to supress any ACK event propagation if required. + // to suppress any ACK event propagation if required. // Close might be called from another go-routine than AddEvent and ACKEvents. Close() } @@ -173,7 +173,7 @@ const ( // to update state keeping track of the sending status. GuaranteedSend - // DropIfFull drops an event to be send if the pipeline is currently full. + // DropIfFull drops an event to be sent if the pipeline is currently full. // This ensures a beats internals can continue processing if the pipeline has // filled up. Useful if an event stream must be processed to keep internal // state up-to-date. From 9ce202a0c391081ea2f82f4b092aeb16942bba5c Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Sun, 13 Nov 2022 11:27:07 -0500 Subject: [PATCH 3/6] lint / unused - remove drop field --- libbeat/publisher/processing/default.go | 1 - 1 file changed, 1 deletion(-) diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index 69cebfbae6d..a0c69e268ad 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -57,7 +57,6 @@ type builder struct { // global pipeline processors processors *group - drop bool // disabled is set if outputs have been disabled via CLI alwaysCopy bool } From f7ae227a18a568e38704d62e91dcdbef1b19e9e9 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Sun, 13 Nov 2022 11:28:16 -0500 Subject: [PATCH 4/6] lint / errorlint - wrap error in fmt.Errof --- libbeat/publisher/processing/default.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index a0c69e268ad..512af426fc8 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -108,7 +108,7 @@ func MakeDefaultSupport( processors, err := processors.New(cfg.Processors) if err != nil { - return nil, fmt.Errorf("error initializing processors: %v", err) + return nil, fmt.Errorf("error initializing processors: %w", err) } return newBuilder(info, log, processors, cfg.EventMetadata, modifiers, !normalize, cfg.TimeSeries) From 6769a8b93898a43cc05065f3b4150cd701332459 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Sun, 13 Nov 2022 11:49:45 -0500 Subject: [PATCH 5/6] lint / errcheck - add missing checks --- libbeat/publisher/processing/default.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index 512af426fc8..8edb8647b43 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -168,7 +168,7 @@ func WithObserverMeta() modifier { "version": info.Version, } if info.Name != info.Hostname { - metadata.Put("name", info.Name) + metadata["name"] = info.Name } return mapstr.M{"observer": metadata} }) @@ -211,9 +211,11 @@ func newBuilder( b.builtinMeta = builtin } - if fields := eventMeta.Fields; len(fields) > 0 { + if len(eventMeta.Fields) > 0 { b.fields = mapstr.M{} - mapstr.MergeFields(b.fields, fields.Clone(), eventMeta.FieldsUnderRoot) + if err := mapstr.MergeFields(b.fields, eventMeta.Fields.Clone(), eventMeta.FieldsUnderRoot); err != nil { + return nil, fmt.Errorf("failed merging event metadata into fields: %w", err) + } } if timeSeries { @@ -267,7 +269,7 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, builtin := b.builtinMeta if cfg.DisableHost { tmp := builtin.Clone() - tmp.Delete("host") + delete(tmp, "host") builtin = tmp } @@ -313,7 +315,9 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, fields := cfg.Fields.Clone() fields.DeepUpdate(b.fields.Clone()) if em := cfg.EventMetadata; len(em.Fields) > 0 { - mapstr.MergeFieldsDeep(fields, em.Fields.Clone(), em.FieldsUnderRoot) + if err := mapstr.MergeFieldsDeep(fields, em.Fields.Clone(), em.FieldsUnderRoot); err != nil { + return nil, fmt.Errorf("failed merging client event metadata into fields: %w", err) + } } if len(fields) > 0 { From 136a8d18841830859dce8b9df84e00eb0d91c999 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Mon, 14 Nov 2022 14:52:35 -0500 Subject: [PATCH 6/6] Add key names --- libbeat/publisher/processing/default_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go index 97b59e38dd7..7907063a796 100644 --- a/libbeat/publisher/processing/default_test.go +++ b/libbeat/publisher/processing/default_test.go @@ -291,12 +291,12 @@ func TestEventNormalizationOverride(t *testing.T) { normalizeOverride *bool hasGeneralizeProcessor bool }{ - {false, nil, true}, - {false, boolPtr(false), false}, - {false, boolPtr(true), true}, - {true, nil, false}, - {true, boolPtr(false), false}, - {true, boolPtr(true), true}, + {skipNormalize: false, normalizeOverride: nil, hasGeneralizeProcessor: true}, + {skipNormalize: false, normalizeOverride: boolPtr(false), hasGeneralizeProcessor: false}, + {skipNormalize: false, normalizeOverride: boolPtr(true), hasGeneralizeProcessor: true}, + {skipNormalize: true, normalizeOverride: nil, hasGeneralizeProcessor: false}, + {skipNormalize: true, normalizeOverride: boolPtr(false), hasGeneralizeProcessor: false}, + {skipNormalize: true, normalizeOverride: boolPtr(true), hasGeneralizeProcessor: true}, } for _, tc := range testCases {