diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 28fb87e303a..f4e7905e951 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -503,6 +503,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Kafka is now supported up to version 2.8.0. {pull}27720[27720] - Add Huawei Cloud provider to add_cloud_metadata. {pull}27607[27607] - Add default seccomp policy for linux arm64. {pull}27955[27955] +- Add cluster level add_kubernetes_metadata support for centralized enrichment {pull}24621[24621] *Auditbeat* diff --git a/libbeat/processors/add_kubernetes_metadata/config.go b/libbeat/processors/add_kubernetes_metadata/config.go index 11285f2c36b..24c68d55f1d 100644 --- a/libbeat/processors/add_kubernetes_metadata/config.go +++ b/libbeat/processors/add_kubernetes_metadata/config.go @@ -18,6 +18,7 @@ package add_kubernetes_metadata import ( + "fmt" "time" "github.com/elastic/beats/v7/libbeat/common" @@ -27,6 +28,7 @@ import ( type kubeAnnotatorConfig struct { KubeConfig string `config:"kube_config"` Host string `config:"host"` + Scope string `config:"scope"` Namespace string `config:"namespace"` SyncPeriod time.Duration `config:"sync_period"` // Annotations are kept after pod is removed, until they haven't been accessed @@ -52,5 +54,18 @@ func defaultKubernetesAnnotatorConfig() kubeAnnotatorConfig { CleanupTimeout: 60 * time.Second, DefaultMatchers: Enabled{true}, DefaultIndexers: Enabled{true}, + Scope: "node", } } + +func (k *kubeAnnotatorConfig) Validate() error { + if k.Scope != "node" && k.Scope != "cluster" { + return fmt.Errorf("invalid scope %s, valid values include `cluster`, `node`", k.Scope) + } + + if k.Scope == "cluster" { + k.Host = "" + } + + return nil +} diff --git a/libbeat/processors/add_kubernetes_metadata/config_test.go b/libbeat/processors/add_kubernetes_metadata/config_test.go new file mode 100644 index 00000000000..3bdcf34a1d7 --- /dev/null +++ b/libbeat/processors/add_kubernetes_metadata/config_test.go @@ -0,0 +1,62 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_kubernetes_metadata + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestConfigValidate(t *testing.T) { + tests := []struct { + cfg map[string]interface{} + error bool + }{ + { + cfg: map[string]interface{}{ + "scope": "foo", + }, + error: true, + }, + { + cfg: map[string]interface{}{ + "scope": "cluster", + }, + error: false, + }, + { + cfg: map[string]interface{}{}, + error: false, + }, + } + + for _, test := range tests { + cfg := common.MustNewConfigFrom(test.cfg) + c := defaultKubernetesAnnotatorConfig() + + err := cfg.Unpack(&c) + if test.error { + require.NotNil(t, err) + } else { + require.Nil(t, err) + } + } +} diff --git a/libbeat/processors/add_kubernetes_metadata/docs/add_kubernetes_metadata.asciidoc b/libbeat/processors/add_kubernetes_metadata/docs/add_kubernetes_metadata.asciidoc index 50152e3ccaf..9bb81adf9af 100644 --- a/libbeat/processors/add_kubernetes_metadata/docs/add_kubernetes_metadata.asciidoc +++ b/libbeat/processors/add_kubernetes_metadata/docs/add_kubernetes_metadata.asciidoc @@ -115,6 +115,8 @@ The `add_kubernetes_metadata` processor has the following configuration settings `host`:: (Optional) Specify the node to scope {beatname_lc} to in case it cannot be accurately detected, as when running {beatname_lc} in host network mode. +`scope`:: (Optional) Specify if the processor should have visibility at the node level or at the entire cluster +level. Possible values are `node` and `cluster`. Scope is `node` by default. `namespace`:: (Optional) Select the namespace from which to collect the metadata. If it is not set, the processor collects metadata from all namespaces. It is unset by default. diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index d9302bebd75..5fc0de08171 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -170,12 +170,14 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Confi IsInCluster: kubernetes.IsInCluster(config.KubeConfig), HostUtils: &kubernetes.DefaultDiscoveryUtils{}, } - config.Host, err = kubernetes.DiscoverKubernetesNode(k.log, nd) - if err != nil { - k.log.Errorf("Couldn't discover Kubernetes node: %w", err) - return + if config.Scope == "node" { + config.Host, err = kubernetes.DiscoverKubernetesNode(k.log, nd) + if err != nil { + k.log.Errorf("Couldn't discover Kubernetes node: %w", err) + return + } + k.log.Debugf("Initializing a new Kubernetes watcher using host: %s", config.Host) } - k.log.Debugf("Initializing a new Kubernetes watcher using host: %s", config.Host) watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod,