Skip to content

Commit

Permalink
Fix BeatV2Manager to configure inputs and set log level (#34066)
Browse files Browse the repository at this point in the history
* Refactor the V2 manager.

* Add debounce to unit changes.

* add stop functionality for output config changes

* Add tests.

* Fix typo.

* Fix code review, add more to the test.

* Re-order the processor injection so proper order is maintained.

* Fix unit tests.

* Copy global processors per stream to ensure that multiple streams don't get the same slice.

Co-authored-by: Alex Kristiansen <[email protected]>
  • Loading branch information
2 people authored and chrisberkhout committed Jun 1, 2023
1 parent cc00fc0 commit 81bb01e
Show file tree
Hide file tree
Showing 10 changed files with 706 additions and 193 deletions.
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9867,11 +9867,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-a

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-client/v7
Version: v7.0.2-0.20221129150247-15881a8e64ef
Version: v7.0.3
Licence type (autodetected): Elastic
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/[email protected].2-0.20221129150247-15881a8e64ef/LICENSE.txt:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/[email protected].3/LICENSE.txt:

ELASTIC LICENSE AGREEMENT

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/eapache/go-resiliency v1.2.0
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/elastic/elastic-agent-client/v7 v7.0.2-0.20221129150247-15881a8e64ef
github.com/elastic/elastic-agent-client/v7 v7.0.3
github.com/elastic/go-concert v0.2.0
github.com/elastic/go-libaudit/v2 v2.3.2
github.com/elastic/go-licenser v0.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -617,8 +617,8 @@ github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqr
github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY=
github.com/elastic/elastic-agent-autodiscover v0.5.0 h1:fiDJQKB148RsRISqqbGINtJ/ZPupeVjwn0fbPz9ZDOc=
github.com/elastic/elastic-agent-autodiscover v0.5.0/go.mod h1:p3MSf9813JEnolCTD0GyVAr3+Eptg2zQ9aZVFjl4tJ4=
github.com/elastic/elastic-agent-client/v7 v7.0.2-0.20221129150247-15881a8e64ef h1:+3AWaimDL826eoU06qOFBtA3xmyuTr9YUMVWvnim4mU=
github.com/elastic/elastic-agent-client/v7 v7.0.2-0.20221129150247-15881a8e64ef/go.mod h1:cHviLpA5fAwMbfBIHBVNl16qp90bO7pKHMAQaG+9raU=
github.com/elastic/elastic-agent-client/v7 v7.0.3 h1:YqZPnO7Z9rlj25sFZEUaxGGK3mZR4v0uSOcfO8GRv7s=
github.com/elastic/elastic-agent-client/v7 v7.0.3/go.mod h1:cHviLpA5fAwMbfBIHBVNl16qp90bO7pKHMAQaG+9raU=
github.com/elastic/elastic-agent-libs v0.2.11/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE=
github.com/elastic/elastic-agent-libs v0.2.15 h1:hdAbrZZ2mCPcQLRCE3E8xw3mHKl8HFMt36w7jan/XGo=
github.com/elastic/elastic-agent-libs v0.2.15/go.mod h1:0J9lzJh+BjttIiVjYDLncKYCEWUUHiiqnuI64y6C6ss=
Expand Down
2 changes: 2 additions & 0 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const (
Failed
// Stopping is status describing application is stopping.
Stopping
// Stopped is status describing application is stopped.
Stopped
)

// Namespace is the feature namespace for queue definition.
Expand Down
6 changes: 4 additions & 2 deletions x-pack/libbeat/management/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (

// Config for central management
type Config struct {
Enabled bool `config:"enabled" yaml:"enabled"`
Blacklist ConfigBlacklistSettings `config:"blacklist" yaml:"blacklist"`
Enabled bool `config:"enabled" yaml:"enabled"`
Blacklist ConfigBlacklistSettings `config:"blacklist" yaml:"blacklist"`
RestartOnOutputChange bool `config:"restart_on_output_change" yaml:"restart_on_output_change"`
}

// ConfigBlock stores a piece of config from central management
Expand All @@ -29,6 +30,7 @@ type ConfigBlocksWithType struct {
// ConfigBlocks holds a list of type + configs objects
type ConfigBlocks []ConfigBlocksWithType

// DefaultConfig returns the default config for the V2 manager
func DefaultConfig() *Config {
return &Config{
Blacklist: ConfigBlacklistSettings{
Expand Down
46 changes: 34 additions & 12 deletions x-pack/libbeat/management/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,33 @@ func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, inputType string, ag
streamSource := raw.GetStreams()[iter].GetSource().AsMap()

streamSource = injectIndexStream(inputType, raw, stream, streamSource)
streamSource, err := injectStreamProcessors(raw, inputType, stream, streamSource)

// the order of building the processors is important
// prepend is used to ensure that the processors defined directly on the stream
// come last in the order as they take priority over the others as they are the
// most specific to this one stream

// 1. global processors
streamSource = injectGlobalProcesssors(raw, streamSource)

// 2. agentInfo
streamSource, err := injectAgentInfoRule(streamSource, agentInfo)
if err != nil {
return nil, fmt.Errorf("Error injecting stream processors: %w", err)
return nil, fmt.Errorf("Error injecting agent processors: %w", err)
}
streamSource, err = injectAgentInfoRule(streamSource, agentInfo)

// 3. stream processors
streamSource, err = injectStreamProcessors(raw, inputType, stream, streamSource)
if err != nil {
return nil, fmt.Errorf("Error injecting agent processors: %w", err)
return nil, fmt.Errorf("Error injecting stream processors: %w", err)
}
streamSource = injectGlobalProcesssors(raw, streamSource)

// now the order of the processors on this input is as follows
// 1. stream processors
// 2. agentInfo processors
// 3. global processors
// 4. stream specific processors

inputs[iter] = streamSource
}

Expand Down Expand Up @@ -128,7 +146,7 @@ func injectAgentInfoRule(inputs map[string]interface{}, agentInfo *client.AgentI
mapstr.M{"id": agentInfo.ID},
"agent"))

inputs["processors"] = appendProcessors(inputs, processors)
inputs["processors"] = prependProcessors(inputs, processors)

return inputs, nil
}
Expand All @@ -144,7 +162,10 @@ func injectGlobalProcesssors(expected *proto.UnitExpectedConfig, stream map[stri
if !ok {
return stream
}
newProcs := appendProcessors(stream, globalList)
// copy global processors to ensure that each stream gets its own copy
// if the stream doesn't have any processors it will take the slice as the new value
// without copying its possible that the processors appended to the streams will be shared
newProcs := prependProcessors(stream, append([]interface{}{}, globalList...))
stream["processors"] = newProcs
return stream
}
Expand Down Expand Up @@ -191,8 +212,9 @@ func injectStreamProcessors(expected *proto.UnitExpectedConfig, dataStreamType s
processors = append(processors, sourceStream)
}

// append with existing processors
stream["processors"] = appendProcessors(stream, processors)
// prepend with existing processors
// streams processors should be first as other processors might adjust values in those fields
stream["processors"] = prependProcessors(stream, processors)

return stream, nil
}
Expand Down Expand Up @@ -227,9 +249,9 @@ func generateAddFieldsProcessor(fields mapstr.M, target string) mapstr.M {
}
}

// appendProcessors takes an existing intput or stream-level config, extracts any existing processors in the config,
// prependProcessors takes an existing input or stream-level config, extracts any existing processors in the config,
// and appends them to a new list of configs. Mostly a helper to deal with all the typecasting
func appendProcessors(existingConfig map[string]interface{}, newProcs []interface{}) []interface{} {
func prependProcessors(existingConfig map[string]interface{}, newProcs []interface{}) []interface{} {
currentProcs, ok := existingConfig["processors"]
if !ok {
return newProcs
Expand All @@ -238,7 +260,7 @@ func appendProcessors(existingConfig map[string]interface{}, newProcs []interfac
if !ok {
return newProcs
}
return append(currentList, newProcs...)
return append(newProcs, currentList...)
}

// metadataFromDatastreamValues takes the various data_stream values from across the expected config and returns a set of "good" that can be used to add fields
Expand Down
Loading

0 comments on commit 81bb01e

Please sign in to comment.