Skip to content

Commit

Permalink
libbeat - allow per beat.Client control of event normalization
Browse files Browse the repository at this point in the history
Control over the addition of the "generalizeEvent" processor into the publishing pipeline was
only available at the Beat level. This adds a new option that can be set by input's when they
create their beat.Client.

This allows inputs to override the Beat's default behavior. My expected use case it to disable
event normalization for inputs that are known to only produce beat.Events containing the
standard data types expected by the processors and outputs (i.e. map[string]interface{}
containing primitives, slices, or other map[string]interface{}).

Inputs would want to disable the event normalization processor if they can because it adds
unnecessary processing (recurses over the fields and often allocates).
  • Loading branch information
andrewkroh committed Nov 13, 2022
1 parent 3e928b7 commit 118bc11
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Added `.python-version` file {pull}32323[32323]
- Add support for multiple regions in GCP {pull}32964[32964]
- Use `T.TempDir` to create temporary test directory {pull}33082[33082]
- Add an option to disable event normalization when creating a `beat.Client`. {pull}33657[33657]

==== Deprecated

Expand Down
4 changes: 4 additions & 0 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ type ProcessingConfig struct {
// Disables the addition of host.name if it was enabled for the publisher.
DisableHost bool

// EventNormalization controls whether the event normalization processor
// is applied to events. If nil the Beat's default behavior prevails.
EventNormalization *bool

// Private contains additional information to be passed to the processing
// pipeline builder.
Private interface{}
Expand Down
8 changes: 6 additions & 2 deletions libbeat/publisher/processing/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,12 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor,
builtin = tmp
}

if !b.skipNormalize {
// setup 1: generalize/normalize output (P)
// setup 1: generalize/normalize output (P)
if cfg.EventNormalization != nil {
if *cfg.EventNormalization {
processors.add(newGeneralizeProcessor(cfg.KeepNull))
}
} else if !b.skipNormalize {
processors.add(newGeneralizeProcessor(cfg.KeepNull))
}

Expand Down
37 changes: 37 additions & 0 deletions libbeat/publisher/processing/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,43 @@ func TestProcessorsConfigs(t *testing.T) {
}
}

// TestEventNormalizationOverride verifies that the EventNormalization option
// in beat.ProcessingConfig overrides the "skipNormalize" setting that is
// specified in the builder (this is the default value set by the Beat).
func TestEventNormalizationOverride(t *testing.T) {
boolPtr := func(b bool) *bool { return &b }

testCases := []struct {
skipNormalize bool
normalizeOverride *bool
hasGeneralizeProcessor bool
}{
{false, nil, true},
{false, boolPtr(false), false},
{false, boolPtr(true), true},
{true, nil, false},
{true, boolPtr(false), false},
{true, boolPtr(true), true},
}

for _, tc := range testCases {
builder, err := newBuilder(beat.Info{}, logp.NewLogger(""), nil, mapstr.EventMetadata{}, nil, tc.skipNormalize, false)
require.NoError(t, err)

processor, err := builder.Create(beat.ProcessingConfig{EventNormalization: tc.normalizeOverride}, false)
require.NoError(t, err)
group := processor.(*group)

if tc.hasGeneralizeProcessor {
if assert.NotEmpty(t, group.list) {
assert.Equal(t, "generalizeEvent", group.list[0].String())
}
} else {
assert.Empty(t, group.list)
}
}
}

func TestNormalization(t *testing.T) {
cases := map[string]struct {
normalize bool
Expand Down

0 comments on commit 118bc11

Please sign in to comment.