Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix BeatV2Manager to configure inputs and set log level #34066

Merged
merged 14 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9867,11 +9867,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-a

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-client/v7
Version: v7.0.2-0.20221129150247-15881a8e64ef
Version: v7.0.3
Licence type (autodetected): Elastic
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/[email protected].2-0.20221129150247-15881a8e64ef/LICENSE.txt:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/[email protected].3/LICENSE.txt:

ELASTIC LICENSE AGREEMENT

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/eapache/go-resiliency v1.2.0
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/elastic/elastic-agent-client/v7 v7.0.2-0.20221129150247-15881a8e64ef
github.com/elastic/elastic-agent-client/v7 v7.0.3
github.com/elastic/go-concert v0.2.0
github.com/elastic/go-libaudit/v2 v2.3.2
github.com/elastic/go-licenser v0.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -617,8 +617,8 @@ github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqr
github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY=
github.com/elastic/elastic-agent-autodiscover v0.5.0 h1:fiDJQKB148RsRISqqbGINtJ/ZPupeVjwn0fbPz9ZDOc=
github.com/elastic/elastic-agent-autodiscover v0.5.0/go.mod h1:p3MSf9813JEnolCTD0GyVAr3+Eptg2zQ9aZVFjl4tJ4=
github.com/elastic/elastic-agent-client/v7 v7.0.2-0.20221129150247-15881a8e64ef h1:+3AWaimDL826eoU06qOFBtA3xmyuTr9YUMVWvnim4mU=
github.com/elastic/elastic-agent-client/v7 v7.0.2-0.20221129150247-15881a8e64ef/go.mod h1:cHviLpA5fAwMbfBIHBVNl16qp90bO7pKHMAQaG+9raU=
github.com/elastic/elastic-agent-client/v7 v7.0.3 h1:YqZPnO7Z9rlj25sFZEUaxGGK3mZR4v0uSOcfO8GRv7s=
github.com/elastic/elastic-agent-client/v7 v7.0.3/go.mod h1:cHviLpA5fAwMbfBIHBVNl16qp90bO7pKHMAQaG+9raU=
github.com/elastic/elastic-agent-libs v0.2.11/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE=
github.com/elastic/elastic-agent-libs v0.2.15 h1:hdAbrZZ2mCPcQLRCE3E8xw3mHKl8HFMt36w7jan/XGo=
github.com/elastic/elastic-agent-libs v0.2.15/go.mod h1:0J9lzJh+BjttIiVjYDLncKYCEWUUHiiqnuI64y6C6ss=
Expand Down
2 changes: 2 additions & 0 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const (
Failed
// Stopping is status describing application is stopping.
Stopping
// Stopped is status describing application is stopped.
Stopped
)

// Namespace is the feature namespace for queue definition.
Expand Down
6 changes: 4 additions & 2 deletions x-pack/libbeat/management/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (

// Config for central management
type Config struct {
Enabled bool `config:"enabled" yaml:"enabled"`
Blacklist ConfigBlacklistSettings `config:"blacklist" yaml:"blacklist"`
Enabled bool `config:"enabled" yaml:"enabled"`
Blacklist ConfigBlacklistSettings `config:"blacklist" yaml:"blacklist"`
RestartOnOutputChange bool `config:"restart_on_output_change" yaml:"restart_on_output_change"`
}

// ConfigBlock stores a piece of config from central management
Expand All @@ -29,6 +30,7 @@ type ConfigBlocksWithType struct {
// ConfigBlocks holds a list of type + configs objects
type ConfigBlocks []ConfigBlocksWithType

// DefaultConfig returns the default config for the V2 manager
func DefaultConfig() *Config {
return &Config{
Blacklist: ConfigBlacklistSettings{
Expand Down
46 changes: 34 additions & 12 deletions x-pack/libbeat/management/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,33 @@ func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, inputType string, ag
streamSource := raw.GetStreams()[iter].GetSource().AsMap()

streamSource = injectIndexStream(inputType, raw, stream, streamSource)
streamSource, err := injectStreamProcessors(raw, inputType, stream, streamSource)

// the order of building the processors is important
// prepend is used to ensure that the processors defined directly on the stream
// come last in the order as they take priority over the others as they are the
// most specific to this one stream

// 1. global processors
streamSource = injectGlobalProcesssors(raw, streamSource)

// 2. agentInfo
streamSource, err := injectAgentInfoRule(streamSource, agentInfo)
if err != nil {
return nil, fmt.Errorf("Error injecting stream processors: %w", err)
return nil, fmt.Errorf("Error injecting agent processors: %w", err)
}
streamSource, err = injectAgentInfoRule(streamSource, agentInfo)

// 3. stream processors
streamSource, err = injectStreamProcessors(raw, inputType, stream, streamSource)
if err != nil {
return nil, fmt.Errorf("Error injecting agent processors: %w", err)
return nil, fmt.Errorf("Error injecting stream processors: %w", err)
}
streamSource = injectGlobalProcesssors(raw, streamSource)

// now the order of the processors on this input is as follows
// 1. stream processors
// 2. agentInfo processors
// 3. global processors
// 4. stream specific processors

cmacknz marked this conversation as resolved.
Show resolved Hide resolved
inputs[iter] = streamSource
}

Expand Down Expand Up @@ -128,7 +146,7 @@ func injectAgentInfoRule(inputs map[string]interface{}, agentInfo *client.AgentI
mapstr.M{"id": agentInfo.ID},
"agent"))

inputs["processors"] = appendProcessors(inputs, processors)
inputs["processors"] = prependProcessors(inputs, processors)

return inputs, nil
}
Expand All @@ -144,7 +162,10 @@ func injectGlobalProcesssors(expected *proto.UnitExpectedConfig, stream map[stri
if !ok {
return stream
}
newProcs := appendProcessors(stream, globalList)
// copy global processors to ensure that each stream gets its own copy
// if the stream doesn't have any processors it will take the slice as the new value
// without copying its possible that the processors appended to the streams will be shared
newProcs := prependProcessors(stream, append([]interface{}{}, globalList...))
stream["processors"] = newProcs
return stream
}
Expand Down Expand Up @@ -191,8 +212,9 @@ func injectStreamProcessors(expected *proto.UnitExpectedConfig, dataStreamType s
processors = append(processors, sourceStream)
}

// append with existing processors
stream["processors"] = appendProcessors(stream, processors)
// prepend with existing processors
// streams processors should be first as other processors might adjust values in those fields
stream["processors"] = prependProcessors(stream, processors)

return stream, nil
}
Expand Down Expand Up @@ -227,9 +249,9 @@ func generateAddFieldsProcessor(fields mapstr.M, target string) mapstr.M {
}
}

// appendProcessors takes an existing intput or stream-level config, extracts any existing processors in the config,
// prependProcessors takes an existing input or stream-level config, extracts any existing processors in the config,
// and appends them to a new list of configs. Mostly a helper to deal with all the typecasting
func appendProcessors(existingConfig map[string]interface{}, newProcs []interface{}) []interface{} {
func prependProcessors(existingConfig map[string]interface{}, newProcs []interface{}) []interface{} {
currentProcs, ok := existingConfig["processors"]
if !ok {
return newProcs
Expand All @@ -238,7 +260,7 @@ func appendProcessors(existingConfig map[string]interface{}, newProcs []interfac
if !ok {
return newProcs
}
return append(currentList, newProcs...)
return append(newProcs, currentList...)
}

// metadataFromDatastreamValues takes the various data_stream values from across the expected config and returns a set of "good" that can be used to add fields
Expand Down
Loading