diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 8875b834e66..f74e4a72782 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -70,6 +70,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. ==== Bugfixes +- Handle the starting of namespace and node watchers for metadata enrichment according to `add_resource_metadata` configuration.{pull}38762[38762] - Fix multiple metricbeat instances reporting same metrics when using autodiscover with provider kubernetes, and ensure leader elector is always running in autodiscover mode.{pull}38471[38471] - Fix how Prometheus histograms are calculated when percentiles are provide.{pull}36537[36537] - Stop using `mage:import` in community beats. This was ignoring the vendorized beats directory for some mage targets, using the code available in GOPATH, this causes inconsistencies and compilation problems if the version of the code in the GOPATH is different to the vendored one. Use of `mage:import` will continue to be unsupported in custom beats till beats is migrated to go modules, or mage supports vendored dependencies. {issue}13998[13998] {pull}14162[14162] diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index d849039a66e..569b2d21cd4 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -59,7 +59,7 @@ type pod struct { func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish func(event []bus.Event)) (Eventer, error) { logger := logp.NewLogger("autodiscover.pod") - var replicaSetWatcher, jobWatcher kubernetes.Watcher + var replicaSetWatcher, jobWatcher, namespaceWatcher, nodeWatcher kubernetes.Watcher config := defaultConfig() err := cfg.Unpack(&config) @@ -96,22 +96,27 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Pod{}, err) } - options := kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - Node: config.Node, - Namespace: config.Namespace, - } - metaConf := config.AddResourceMetadata - nodeWatcher, err := kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, options, nil) - if err != nil { - logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + + if metaConf.Node.Enabled() || config.Hints.Enabled() { + options := kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + Node: config.Node, + Namespace: config.Namespace, + } + nodeWatcher, err = kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, options, nil) + if err != nil { + logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + } } - namespaceWatcher, err := kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + + if metaConf.Namespace.Enabled() || config.Hints.Enabled() { + namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + } } // Resource is Pod so we need to create watchers for Replicasets and Jobs that it might belongs to diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 4cc2d8bb393..3a60342444a 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -2108,6 +2108,113 @@ func TestNodePodUpdater(t *testing.T) { } } +func TestPodEventer_Namespace_Node_Watcher(t *testing.T) { + client := k8sfake.NewSimpleClientset() + uuid, err := uuid.NewV4() + if err != nil { + t.Fatal(err) + } + + tests := []struct { + cfg mapstr.M + expectedNil bool + name string + msg string + }{ + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + "node.enabled": false, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: true, + name: "add_resource_metadata.namespace and add_resource_metadata.node disabled and hints disabled.", + msg: "Watcher should be nil.", + }, + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + "node.enabled": false, + }, + "hints.enabled": true, + }, + expectedNil: false, + name: "add_resource_metadata.namespace and add_resource_metadata.node disabled and hints enabled.", + msg: "Watcher should not be nil.", + }, + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": true, + "node.enabled": true, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: false, + name: "add_resource_metadata.namespace and add_resource_metadata.node enabled and hints disabled.", + msg: "Watcher should not be nil.", + }, + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: false, + name: "add_resource_metadata default and hints default.", + msg: "Watcher should not be nil.", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + config := conf.MustNewConfigFrom(&test.cfg) + c := defaultConfig() + err = config.Unpack(&c) + assert.NoError(t, err) + + eventer, err := NewPodEventer(uuid, config, client, nil) + if err != nil { + t.Fatal(err) + } + + namespaceWatcher := eventer.(*pod).namespaceWatcher + nodeWatcher := eventer.(*pod).nodeWatcher + + if test.expectedNil { + assert.Equalf(t, nil, namespaceWatcher, "Namespace "+test.msg) + assert.Equalf(t, nil, nodeWatcher, "Node "+test.msg) + } else { + assert.NotEqualf(t, nil, namespaceWatcher, "Namespace "+test.msg) + assert.NotEqualf(t, nil, nodeWatcher, "Node "+test.msg) + } + }) + } +} + type mockUpdaterHandler struct { objects []interface{} } diff --git a/libbeat/autodiscover/providers/kubernetes/service.go b/libbeat/autodiscover/providers/kubernetes/service.go index 5a0c6b3cc3f..de6287f7466 100644 --- a/libbeat/autodiscover/providers/kubernetes/service.go +++ b/libbeat/autodiscover/providers/kubernetes/service.go @@ -70,17 +70,19 @@ func NewServiceEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publis var namespaceMeta metadata.MetaGen var namespaceWatcher kubernetes.Watcher - metaConf := metadata.GetDefaultResourceMetadataConfig() - namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - Namespace: config.Namespace, - }, nil) - if err != nil { - return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Namespace{}, err) + metaConf := config.AddResourceMetadata + + if metaConf.Namespace.Enabled() || config.Hints.Enabled() { + namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + Namespace: config.Namespace, + }, nil) + if err != nil { + return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Namespace{}, err) + } + namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), client) } - namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), client) - p := &service{ config: config, uuid: uuid, diff --git a/libbeat/autodiscover/providers/kubernetes/service_test.go b/libbeat/autodiscover/providers/kubernetes/service_test.go index 510ac6ebd0d..90ff678e11c 100644 --- a/libbeat/autodiscover/providers/kubernetes/service_test.go +++ b/libbeat/autodiscover/providers/kubernetes/service_test.go @@ -432,6 +432,104 @@ func TestEmitEvent_Service(t *testing.T) { } } +func TestServiceEventer_NamespaceWatcher(t *testing.T) { + client := k8sfake.NewSimpleClientset() + uuid, err := uuid.NewV4() + if err != nil { + t.Fatal(err) + } + + tests := []struct { + cfg mapstr.M + expectedNil bool + name string + msg string + }{ + { + cfg: mapstr.M{ + "resource": "service", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: true, + name: "add_resource_metadata.namespace disabled and hints disabled.", + msg: "Namespace watcher should be nil.", + }, + { + cfg: mapstr.M{ + "resource": "service", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + }, + "hints.enabled": true, + }, + expectedNil: false, + name: "add_resource_metadata.namespace disabled and hints enabled.", + msg: "Namespace watcher should not be nil.", + }, + { + cfg: mapstr.M{ + "resource": "service", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": true, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: false, + name: "add_resource_metadata.namespace enabled and hints disabled.", + msg: "Namespace watcher should not be nil.", + }, + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: false, + name: "add_resource_metadata default and hints default.", + msg: "Watcher should not be nil.", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + config := conf.MustNewConfigFrom(&test.cfg) + + eventer, err := NewServiceEventer(uuid, config, client, nil) + if err != nil { + t.Fatal(err) + } + + namespaceWatcher := eventer.(*service).namespaceWatcher + + if test.expectedNil { + assert.Equalf(t, nil, namespaceWatcher, test.msg) + } else { + assert.NotEqualf(t, nil, namespaceWatcher, test.msg) + } + }) + } +} + func NewMockServiceEventerManager(svc *service) EventManager { em := &eventerManager{} em.eventer = svc diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 954a59ab3f1..f9143cdf289 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -27,13 +27,14 @@ import ( k8sclient "k8s.io/client-go/kubernetes" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/elastic-agent-autodiscover/kubernetes" "github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/processors" ) const ( @@ -144,7 +145,7 @@ func newProcessorConfig(cfg *config.C, register *Register) (kubeAnnotatorConfig, func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) { k.initOnce.Do(func() { - var replicaSetWatcher, jobWatcher kubernetes.Watcher + var replicaSetWatcher, jobWatcher, namespaceWatcher, nodeWatcher kubernetes.Watcher client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions) if err != nil { @@ -203,15 +204,20 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) { Namespace: config.Namespace, } - nodeWatcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_node", client, &kubernetes.Node{}, options, nil) - if err != nil { - k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + if metaConf.Node.Enabled() { + nodeWatcher, err = kubernetes.NewNamedWatcher("add_kubernetes_metadata_node", client, &kubernetes.Node{}, options, nil) + if err != nil { + k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + } } - namespaceWatcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + + if metaConf.Namespace.Enabled() { + namespaceWatcher, err = kubernetes.NewNamedWatcher("add_kubernetes_metadata_namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + } } // Resource is Pod so we need to create watchers for Replicasets and Jobs that it might belongs to diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index d89ca006f0e..a0c409ca14e 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -166,7 +166,14 @@ func getResource(resourceName string) kubernetes.Resource { func getExtraWatchers(resourceName string, addResourceMetadata *metadata.AddResourceMetadataConfig) []string { switch resourceName { case PodResource: - extra := []string{NamespaceResource, NodeResource} + extra := []string{} + if addResourceMetadata.Node.Enabled() { + extra = append(extra, NodeResource) + } + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + // We need to create watchers for ReplicaSets and Jobs that it might belong to, // in order to be able to retrieve 2nd layer Owner metadata like in case of: // Deployment -> Replicaset -> Pod @@ -179,23 +186,55 @@ func getExtraWatchers(resourceName string, addResourceMetadata *metadata.AddReso } return extra case ServiceResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case DeploymentResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case ReplicaSetResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case StatefulSetResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case DaemonSetResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case JobResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case CronJobResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case PersistentVolumeResource: return []string{} case PersistentVolumeClaimResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case StorageClassResource: return []string{} case NodeResource: diff --git a/metricbeat/module/kubernetes/util/kubernetes_test.go b/metricbeat/module/kubernetes/util/kubernetes_test.go index b4e528100a6..8235e73a277 100644 --- a/metricbeat/module/kubernetes/util/kubernetes_test.go +++ b/metricbeat/module/kubernetes/util/kubernetes_test.go @@ -265,6 +265,12 @@ func TestCreateMetaGenSpecific(t *testing.T) { require.NoError(t, err) log := logp.NewLogger("test") + + namespaceConfig, err := conf.NewConfigFrom(map[string]interface{}{ + "enabled": true, + }) + require.NoError(t, err) + config := &kubernetesConfig{ Namespace: "test-ns", SyncPeriod: time.Minute, @@ -272,6 +278,7 @@ func TestCreateMetaGenSpecific(t *testing.T) { AddResourceMetadata: &metadata.AddResourceMetadataConfig{ CronJob: false, Deployment: true, + Namespace: namespaceConfig, }, } client := k8sfake.NewSimpleClientset() @@ -326,6 +333,10 @@ func TestBuildMetadataEnricher_Start_Stop(t *testing.T) { resourceWatchers.lock.Unlock() funcs := mockFuncs{} + namespaceConfig, err := conf.NewConfigFrom(map[string]interface{}{ + "enabled": true, + }) + require.NoError(t, err) config := &kubernetesConfig{ Namespace: "test-ns", SyncPeriod: time.Minute, @@ -333,6 +344,7 @@ func TestBuildMetadataEnricher_Start_Stop(t *testing.T) { AddResourceMetadata: &metadata.AddResourceMetadataConfig{ CronJob: false, Deployment: false, + Namespace: namespaceConfig, }, }