Skip to content

Commit

Permalink
Add cluster level add_kubernetes_metadata support for centralized enr…
Browse files Browse the repository at this point in the history
…ichment (#24621)

(cherry picked from commit fc964d8)
  • Loading branch information
vjsamuel authored and mergify-bot committed Sep 28, 2021
1 parent 9d52690 commit 73ca847
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Allow non-padded base64 data to be decoded by decode_base64_field {pull}27311[27311], {issue}27021[27021]
- The Kafka support library Sarama has been updated to 1.29.1. {pull}27717[27717]
- Kafka is now supported up to version 2.8.0. {pull}27720[27720]
- Add Huawei Cloud provider to add_cloud_metadata. {pull}27607[27607]
- Add default seccomp policy for linux arm64. {pull}27955[27955]
- Add cluster level add_kubernetes_metadata support for centralized enrichment {pull}24621[24621]

*Auditbeat*

Expand Down
15 changes: 15 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package add_kubernetes_metadata

import (
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -27,6 +28,7 @@ import (
type kubeAnnotatorConfig struct {
KubeConfig string `config:"kube_config"`
Host string `config:"host"`
Scope string `config:"scope"`
Namespace string `config:"namespace"`
SyncPeriod time.Duration `config:"sync_period"`
// Annotations are kept after pod is removed, until they haven't been accessed
Expand All @@ -52,5 +54,18 @@ func defaultKubernetesAnnotatorConfig() kubeAnnotatorConfig {
CleanupTimeout: 60 * time.Second,
DefaultMatchers: Enabled{true},
DefaultIndexers: Enabled{true},
Scope: "node",
}
}

func (k *kubeAnnotatorConfig) Validate() error {
if k.Scope != "node" && k.Scope != "cluster" {
return fmt.Errorf("invalid scope %s, valid values include `cluster`, `node`", k.Scope)
}

if k.Scope == "cluster" {
k.Host = ""
}

return nil
}
62 changes: 62 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package add_kubernetes_metadata

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
)

func TestConfigValidate(t *testing.T) {
tests := []struct {
cfg map[string]interface{}
error bool
}{
{
cfg: map[string]interface{}{
"scope": "foo",
},
error: true,
},
{
cfg: map[string]interface{}{
"scope": "cluster",
},
error: false,
},
{
cfg: map[string]interface{}{},
error: false,
},
}

for _, test := range tests {
cfg := common.MustNewConfigFrom(test.cfg)
c := defaultKubernetesAnnotatorConfig()

err := cfg.Unpack(&c)
if test.error {
require.NotNil(t, err)
} else {
require.Nil(t, err)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ The `add_kubernetes_metadata` processor has the following configuration settings
`host`:: (Optional) Specify the node to scope {beatname_lc} to in case it
cannot be accurately detected, as when running {beatname_lc} in host network
mode.
`scope`:: (Optional) Specify if the processor should have visibility at the node level or at the entire cluster
level. Possible values are `node` and `cluster`. Scope is `node` by default.
`namespace`:: (Optional) Select the namespace from which to collect the
metadata. If it is not set, the processor collects metadata from all namespaces.
It is unset by default.
Expand Down
12 changes: 7 additions & 5 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Confi
IsInCluster: kubernetes.IsInCluster(config.KubeConfig),
HostUtils: &kubernetes.DefaultDiscoveryUtils{},
}
config.Host, err = kubernetes.DiscoverKubernetesNode(k.log, nd)
if err != nil {
k.log.Errorf("Couldn't discover Kubernetes node: %w", err)
return
if config.Scope == "node" {
config.Host, err = kubernetes.DiscoverKubernetesNode(k.log, nd)
if err != nil {
k.log.Errorf("Couldn't discover Kubernetes node: %w", err)
return
}
k.log.Debugf("Initializing a new Kubernetes watcher using host: %s", config.Host)
}
k.log.Debugf("Initializing a new Kubernetes watcher using host: %s", config.Host)

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Expand Down

0 comments on commit 73ca847

Please sign in to comment.