diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 8d9da05617f..10f3de4397e 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -147,6 +147,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Added `.python-version` file {pull}32323[32323] - Add support for multiple regions in GCP {pull}32964[32964] - Use `T.TempDir` to create temporary test directory {pull}33082[33082] +- Add an option to disable event normalization when creating a `beat.Client`. {pull}33657[33657] ==== Deprecated diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 24a276a5dac..c6702257833 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -69,10 +69,10 @@ type ClientConfig struct { // operations on ACKer are normally executed in different go routines. ACKers // are required to be multi-threading safe. type ACKer interface { - // AddEvent informs the ACKer that a new event has been send to the client. + // AddEvent informs the ACKer that a new event has been sent to the client. // AddEvent is called after the processors have handled the event. If the // event has been dropped by the processor `published` will be set to true. - // This allows the ACKer to do some bookeeping for dropped events. + // This allows the ACKer to do some bookkeeping for dropped events. AddEvent(event Event, published bool) // ACK Events from the output and pipeline queue are forwarded to ACKEvents. @@ -83,7 +83,7 @@ type ACKer interface { // Close informs the ACKer that the Client used to publish to the pipeline has been closed. // No new events should be published anymore. The ACKEvents method still will be actively called // as long as there are pending events for the client in the pipeline. The Close signal can be used - // to supress any ACK event propagation if required. + // to suppress any ACK event propagation if required. // Close might be called from another go-routine than AddEvent and ACKEvents. Close() } @@ -121,6 +121,10 @@ type ProcessingConfig struct { // Disables the addition of host.name if it was enabled for the publisher. DisableHost bool + // EventNormalization controls whether the event normalization processor + // is applied to events. If nil the Beat's default behavior prevails. + EventNormalization *bool + // Private contains additional information to be passed to the processing // pipeline builder. Private interface{} @@ -169,7 +173,7 @@ const ( // to update state keeping track of the sending status. GuaranteedSend - // DropIfFull drops an event to be send if the pipeline is currently full. + // DropIfFull drops an event to be sent if the pipeline is currently full. // This ensures a beats internals can continue processing if the pipeline has // filled up. Useful if an event stream must be processed to keep internal // state up-to-date. diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index ea8b727b6ec..8edb8647b43 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -57,7 +57,6 @@ type builder struct { // global pipeline processors processors *group - drop bool // disabled is set if outputs have been disabled via CLI alwaysCopy bool } @@ -109,7 +108,7 @@ func MakeDefaultSupport( processors, err := processors.New(cfg.Processors) if err != nil { - return nil, fmt.Errorf("error initializing processors: %v", err) + return nil, fmt.Errorf("error initializing processors: %w", err) } return newBuilder(info, log, processors, cfg.EventMetadata, modifiers, !normalize, cfg.TimeSeries) @@ -169,7 +168,7 @@ func WithObserverMeta() modifier { "version": info.Version, } if info.Name != info.Hostname { - metadata.Put("name", info.Name) + metadata["name"] = info.Name } return mapstr.M{"observer": metadata} }) @@ -212,9 +211,11 @@ func newBuilder( b.builtinMeta = builtin } - if fields := eventMeta.Fields; len(fields) > 0 { + if len(eventMeta.Fields) > 0 { b.fields = mapstr.M{} - mapstr.MergeFields(b.fields, fields.Clone(), eventMeta.FieldsUnderRoot) + if err := mapstr.MergeFields(b.fields, eventMeta.Fields.Clone(), eventMeta.FieldsUnderRoot); err != nil { + return nil, fmt.Errorf("failed merging event metadata into fields: %w", err) + } } if timeSeries { @@ -268,7 +269,7 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, builtin := b.builtinMeta if cfg.DisableHost { tmp := builtin.Clone() - tmp.Delete("host") + delete(tmp, "host") builtin = tmp } @@ -288,8 +289,12 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, builtin = tmp } - if !b.skipNormalize { - // setup 1: generalize/normalize output (P) + // setup 1: generalize/normalize output (P) + if cfg.EventNormalization != nil { + if *cfg.EventNormalization { + processors.add(newGeneralizeProcessor(cfg.KeepNull)) + } + } else if !b.skipNormalize { processors.add(newGeneralizeProcessor(cfg.KeepNull)) } @@ -310,7 +315,9 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, fields := cfg.Fields.Clone() fields.DeepUpdate(b.fields.Clone()) if em := cfg.EventMetadata; len(em.Fields) > 0 { - mapstr.MergeFieldsDeep(fields, em.Fields.Clone(), em.FieldsUnderRoot) + if err := mapstr.MergeFieldsDeep(fields, em.Fields.Clone(), em.FieldsUnderRoot); err != nil { + return nil, fmt.Errorf("failed merging client event metadata into fields: %w", err) + } } if len(fields) > 0 { diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go index 587ebc8390f..7907063a796 100644 --- a/libbeat/publisher/processing/default_test.go +++ b/libbeat/publisher/processing/default_test.go @@ -280,6 +280,43 @@ func TestProcessorsConfigs(t *testing.T) { } } +// TestEventNormalizationOverride verifies that the EventNormalization option +// in beat.ProcessingConfig overrides the "skipNormalize" setting that is +// specified in the builder (this is the default value set by the Beat). +func TestEventNormalizationOverride(t *testing.T) { + boolPtr := func(b bool) *bool { return &b } + + testCases := []struct { + skipNormalize bool + normalizeOverride *bool + hasGeneralizeProcessor bool + }{ + {skipNormalize: false, normalizeOverride: nil, hasGeneralizeProcessor: true}, + {skipNormalize: false, normalizeOverride: boolPtr(false), hasGeneralizeProcessor: false}, + {skipNormalize: false, normalizeOverride: boolPtr(true), hasGeneralizeProcessor: true}, + {skipNormalize: true, normalizeOverride: nil, hasGeneralizeProcessor: false}, + {skipNormalize: true, normalizeOverride: boolPtr(false), hasGeneralizeProcessor: false}, + {skipNormalize: true, normalizeOverride: boolPtr(true), hasGeneralizeProcessor: true}, + } + + for _, tc := range testCases { + builder, err := newBuilder(beat.Info{}, logp.NewLogger(""), nil, mapstr.EventMetadata{}, nil, tc.skipNormalize, false) + require.NoError(t, err) + + processor, err := builder.Create(beat.ProcessingConfig{EventNormalization: tc.normalizeOverride}, false) + require.NoError(t, err) + group := processor.(*group) + + if tc.hasGeneralizeProcessor { + if assert.NotEmpty(t, group.list) { + assert.Equal(t, "generalizeEvent", group.list[0].String()) + } + } else { + assert.Empty(t, group.list) + } + } +} + func TestNormalization(t *testing.T) { cases := map[string]struct { normalize bool