Skip to content

Commit

Permalink
Use indexers and matchers in config when defaults are enabled (elasti…
Browse files Browse the repository at this point in the history
…c#18818) (elastic#18859)

Before 7.7.0, indexers and matchers defined in `add_kubernetes_metadata`
configuration were used even when defaults were not disabled. Revert to
this behaviour and add tests to avoid changing it unexpectedly in the
future.

(cherry picked from commit 5c65f6d)

Co-authored-by: Chris Mark <[email protected]>
  • Loading branch information
jsoriano and ChrsMark authored Jun 2, 2020
1 parent 91775b4 commit 168cbc2
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Change `decode_json_fields` processor, to merge parsed json objects with existing objects in the event instead of fully replacing them. {pull}17958[17958]
- Gives monitoring reporter hosts, if configured, total precedence over corresponding output hosts. {issue}17937[17937] {pull}17991[17991]
- Fix `keystore add` hanging under Windows. {issue}18649[18649] {pull}18654[18654]
- Fix regression in `add_kubernetes_metadata`, so configured `indexers` and `matchers` are used if defaults are not disabled. {issue}18481[18481] {pull}18818[18818]

*Auditbeat*

Expand Down
39 changes: 24 additions & 15 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,24 +90,12 @@ func isKubernetesAvailableWithRetry(client k8sclient.Interface) bool {

// New constructs a new add_kubernetes_metadata processor.
func New(cfg *common.Config) (processors.Processor, error) {
config := defaultKubernetesAnnotatorConfig()
log := logp.NewLogger(selector).With("libbeat.processor", "add_kubernetes_metadata")

err := cfg.Unpack(&config)
config, err := newProcessorConfig(cfg, Indexing)
if err != nil {
return nil, fmt.Errorf("fail to unpack the kubernetes configuration: %s", err)
}

//Load default indexer configs
if config.DefaultIndexers.Enabled == true {
config.Indexers = Indexing.GetDefaultIndexerConfigs()
}

//Load default matcher configs
if config.DefaultMatchers.Enabled == true {
config.Matchers = Indexing.GetDefaultMatcherConfigs()
return nil, err
}

log := logp.NewLogger(selector).With("libbeat.processor", "add_kubernetes_metadata")
processor := &kubernetesAnnotator{
log: log,
cache: newCache(config.CleanupTimeout),
Expand All @@ -121,6 +109,27 @@ func New(cfg *common.Config) (processors.Processor, error) {
return processor, nil
}

func newProcessorConfig(cfg *common.Config, register *Register) (kubeAnnotatorConfig, error) {
config := defaultKubernetesAnnotatorConfig()

err := cfg.Unpack(&config)
if err != nil {
return config, fmt.Errorf("fail to unpack the kubernetes configuration: %s", err)
}

//Load and append default indexer configs
if config.DefaultIndexers.Enabled {
config.Indexers = append(config.Indexers, register.GetDefaultIndexerConfigs()...)
}

//Load and append default matcher configs
if config.DefaultMatchers.Enabled {
config.Matchers = append(config.Matchers, register.GetDefaultMatcherConfigs()...)
}

return config, nil
}

func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Config) {
k.initOnce.Do(func() {
client, err := kubernetes.GetKubernetesClient(config.KubeConfig)
Expand Down
101 changes: 101 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -128,3 +129,103 @@ func TestAnnotatorWithNoKubernetesAvailable(t *testing.T) {

assert.Equal(t, intialEventMap, event.Fields)
}

// TestNewProcessorConfigDefaultIndexers validates the behaviour of default indexers and
// matchers settings
func TestNewProcessorConfigDefaultIndexers(t *testing.T) {
emptyRegister := NewRegister()
registerWithDefaults := NewRegister()
registerWithDefaults.AddDefaultIndexerConfig("ip_port", *common.NewConfig())
registerWithDefaults.AddDefaultMatcherConfig("field_format", *common.MustNewConfigFrom(map[string]interface{}{
"format": "%{[destination.ip]}:%{[destination.port]}",
}))

configWithIndexersAndMatchers := common.MustNewConfigFrom(map[string]interface{}{
"indexers": []map[string]interface{}{
{
"container": map[string]interface{}{},
},
},
"matchers": []map[string]interface{}{
{
"fields": map[string]interface{}{
"lookup_fields": []string{"container.id"},
},
},
},
})
configOverrideDefaults := common.MustNewConfigFrom(map[string]interface{}{
"default_indexers.enabled": "false",
"default_matchers.enabled": "false",
})
require.NoError(t, configOverrideDefaults.Merge(configWithIndexersAndMatchers))

cases := map[string]struct {
register *Register
config *common.Config
expectedMatchers []string
expectedIndexers []string
}{
"no matchers": {
register: emptyRegister,
config: common.NewConfig(),
},
"one configured indexer and matcher": {
register: emptyRegister,
config: configWithIndexersAndMatchers,
expectedIndexers: []string{"container"},
expectedMatchers: []string{"fields"},
},
"default indexers and matchers": {
register: registerWithDefaults,
config: common.NewConfig(),
expectedIndexers: []string{"ip_port"},
expectedMatchers: []string{"field_format"},
},
"default indexers and matchers, don't use indexers": {
register: registerWithDefaults,
config: common.MustNewConfigFrom(map[string]interface{}{
"default_indexers.enabled": "false",
}),
expectedMatchers: []string{"field_format"},
},
"default indexers and matchers, don't use matchers": {
register: registerWithDefaults,
config: common.MustNewConfigFrom(map[string]interface{}{
"default_matchers.enabled": "false",
}),
expectedIndexers: []string{"ip_port"},
},
"one configured indexer and matcher and defaults, configured should come first": {
register: registerWithDefaults,
config: configWithIndexersAndMatchers,
expectedIndexers: []string{"container", "ip_port"},
expectedMatchers: []string{"fields", "field_format"},
},
"override defaults": {
register: registerWithDefaults,
config: configOverrideDefaults,
expectedIndexers: []string{"container"},
expectedMatchers: []string{"fields"},
},
}

names := func(plugins PluginConfig) []string {
var ns []string
for _, plugin := range plugins {
for name := range plugin {
ns = append(ns, name)
}
}
return ns
}

for title, c := range cases {
t.Run(title, func(t *testing.T) {
config, err := newProcessorConfig(c.config, c.register)
require.NoError(t, err)
assert.Equal(t, c.expectedMatchers, names(config.Matchers), "expected matchers")
assert.Equal(t, c.expectedIndexers, names(config.Indexers), "expected indexers")
})
}
}

0 comments on commit 168cbc2

Please sign in to comment.