diff --git a/libbeat/cmd/instance/locker.go b/libbeat/cmd/instance/locker.go index d8a24a02423f..ebf14c43ac4e 100644 --- a/libbeat/cmd/instance/locker.go +++ b/libbeat/cmd/instance/locker.go @@ -44,7 +44,7 @@ func newLocker(b *Beat) *locker { } } -// lock attemps to acquire a lock on the data path for the currently-running +// lock attempts to acquire a lock on the data path for the currently-running // Beat instance. If another Beats instance already has a lock on the same data path // an ErrAlreadyLocked error is returned. func (l *locker) lock() error { diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 1a3b2c39c6f9..465842613424 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -141,4 +141,5 @@ - Add new --enroll-delay option for install and enroll commands. {pull}27118[27118] - Add link to troubleshooting guide on fatal exits. {issue}26367[26367] {pull}27236[27236] - Agent now adapts the beats queue size based on output settings. {issue}26638[26638] {pull}27429[27429] -- Support ephemeral containers in Kubernetes dynamic provider. {issue}#27020[#27020] {pull}27707[27707] +- Support ephemeral containers in Kubernetes dynamic provider. {issue}27020[#27020] {pull}27707[27707] +- Add complete k8s metadata through composable provider. {pull}27691[27691] diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go index d0538f433639..3d34db03a8b8 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go @@ -10,6 +10,7 @@ package kubernetes import ( "time" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -25,6 +26,16 @@ type Config struct { // Needed when resource is a Pod or Node Node string `config:"node"` + + AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"` + IncludeLabels []string `config:"include_labels"` + ExcludeLabels []string `config:"exclude_labels"` + + LabelsDedot bool `config:"labels.dedot"` + AnnotationsDedot bool `config:"annotations.dedot"` + + // Undocumented settings, to be deprecated in favor of `drop_fields` processor: + IncludeCreatorMetadata bool `config:"include_creator_metadata"` } // Resources config section for resources' config blocks @@ -44,6 +55,9 @@ func (c *Config) InitDefaults() { c.CleanupTimeout = 60 * time.Second c.SyncPeriod = 10 * time.Minute c.Scope = "node" + c.IncludeCreatorMetadata = true + c.LabelsDedot = true + c.AnnotationsDedot = true } // Validate ensures correctness of config diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go index 2c81c59230b3..c43e5f984300 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go @@ -46,25 +46,26 @@ func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable if err != nil { return nil, errors.New(err, "failed to unpack configuration") } + return &dynamicProvider{logger, &cfg}, nil } // Run runs the kubernetes context provider. func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error { if p.config.Resources.Pod.Enabled { - err := p.watchResource(comm, "pod", p.config) + err := p.watchResource(comm, "pod") if err != nil { return err } } if p.config.Resources.Node.Enabled { - err := p.watchResource(comm, "node", p.config) + err := p.watchResource(comm, "node") if err != nil { return err } } if p.config.Resources.Service.Enabled { - err := p.watchResource(comm, "service", p.config) + err := p.watchResource(comm, "service") if err != nil { return err } @@ -76,9 +77,8 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error { // and starts watching for such resource's events. func (p *dynamicProvider) watchResource( comm composable.DynamicProviderComm, - resourceType string, - config *Config) error { - client, err := kubernetes.GetKubernetesClient(config.KubeConfig) + resourceType string) error { + client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig) if err != nil { // info only; return nil (do nothing) p.logger.Debugf("Kubernetes provider for resource %s skipped, unable to connect: %s", resourceType, err) @@ -93,24 +93,24 @@ func (p *dynamicProvider) watchResource( p.logger.Debugf( "Initializing Kubernetes watcher for resource %s using node: %v", resourceType, - config.Node) + p.config.Node) nd := &kubernetes.DiscoverKubernetesNodeParams{ - ConfigHost: config.Node, + ConfigHost: p.config.Node, Client: client, - IsInCluster: kubernetes.IsInCluster(config.KubeConfig), + IsInCluster: kubernetes.IsInCluster(p.config.KubeConfig), HostUtils: &kubernetes.DefaultDiscoveryUtils{}, } - config.Node, err = kubernetes.DiscoverKubernetesNode(p.logger, nd) + p.config.Node, err = kubernetes.DiscoverKubernetesNode(p.logger, nd) if err != nil { p.logger.Debugf("Kubernetes provider skipped, unable to discover node: %w", err) return nil } } else { - config.Node = "" + p.config.Node = "" } - watcher, err := p.newWatcher(resourceType, comm, client, config) + watcher, err := p.newWatcher(resourceType, comm, client) if err != nil { return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType) } @@ -126,23 +126,22 @@ func (p *dynamicProvider) watchResource( func (p *dynamicProvider) newWatcher( resourceType string, comm composable.DynamicProviderComm, - client k8s.Interface, - config *Config) (kubernetes.Watcher, error) { + client k8s.Interface) (kubernetes.Watcher, error) { switch resourceType { case "pod": - watcher, err := NewPodWatcher(comm, config, p.logger, client, p.config.Scope) + watcher, err := NewPodWatcher(comm, p.config, p.logger, client, p.config.Scope) if err != nil { return nil, err } return watcher, nil case "node": - watcher, err := NewNodeWatcher(comm, config, p.logger, client, p.config.Scope) + watcher, err := NewNodeWatcher(comm, p.config, p.logger, client, p.config.Scope) if err != nil { return nil, err } return watcher, nil case "service": - watcher, err := NewServiceWatcher(comm, config, p.logger, client, p.config.Scope) + watcher, err := NewServiceWatcher(comm, p.config, p.logger, client, p.config.Scope) if err != nil { return nil, err } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go index 455a06107efe..17802735d070 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" "github.com/elastic/beats/v7/libbeat/common/safemapstr" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" @@ -25,6 +26,7 @@ type node struct { comm composable.DynamicProviderComm scope string config *Config + metagen metadata.MetaGen } type nodeData struct { @@ -49,13 +51,25 @@ func NewNodeWatcher( if err != nil { return nil, errors.New(err, "couldn't create kubernetes watcher") } - watcher.AddEventHandler(&node{logger, cfg.CleanupTimeout, comm, scope, cfg}) + + rawConfig, err := common.NewConfigFrom(cfg) + if err != nil { + return nil, errors.New(err, "failed to unpack configuration") + } + metaGen := metadata.NewNodeMetadataGenerator(rawConfig, watcher.Store(), client) + watcher.AddEventHandler(&node{ + logger, + cfg.CleanupTimeout, + comm, + scope, + cfg, + metaGen}) return watcher, nil } func (n *node) emitRunning(node *kubernetes.Node) { - data := generateNodeData(node, n.config) + data := generateNodeData(node, n.config, n.metagen) if data == nil { return } @@ -165,7 +179,7 @@ func isNodeReady(node *kubernetes.Node) bool { return false } -func generateNodeData(node *kubernetes.Node, cfg *Config) *nodeData { +func generateNodeData(node *kubernetes.Node, cfg *Config, kubeMetaGen metadata.MetaGen) *nodeData { host := getAddress(node) // If a node doesn't have an IP then dont monitor it @@ -178,7 +192,11 @@ func generateNodeData(node *kubernetes.Node, cfg *Config) *nodeData { return nil } - //TODO: add metadata here too ie -> meta := n.metagen.Generate(node) + meta := kubeMetaGen.Generate(node) + kubemetaMap, err := meta.GetValue("kubernetes") + if err != nil { + return &nodeData{} + } // Pass annotations to all events so that it can be used in templating and by annotation builders. annotations := common.MapStr{} @@ -186,33 +204,28 @@ func generateNodeData(node *kubernetes.Node, cfg *Config) *nodeData { safemapstr.Put(annotations, k, v) } - labels := common.MapStr{} - for k, v := range node.GetObjectMeta().GetLabels() { - // TODO: add dedoting option - safemapstr.Put(labels, k, v) - } + // k8sMapping includes only the metadata that fall under kubernetes.* + // and these are available as dynamic vars through the provider + k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone()) - mapping := map[string]interface{}{ - "node": map[string]interface{}{ - "uid": string(node.GetUID()), - "name": node.GetName(), - "labels": labels, - "annotations": annotations, - "ip": host, - }, - } + // add annotations to be discoverable by templates + k8sMapping["annotations"] = annotations - processors := []map[string]interface{}{ - { + processors := []map[string]interface{}{} + // meta map includes metadata that go under kubernetes.* + // but also other ECS fields like orchestrator.* + for field, metaMap := range meta { + processor := map[string]interface{}{ "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", + "fields": metaMap, + "target": field, }, - }, + } + processors = append(processors, processor) } return &nodeData{ node: node, - mapping: mapping, + mapping: k8sMapping, processors: processors, } } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go index 68c35878490e..f42af49f9881 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go @@ -7,6 +7,8 @@ package kubernetes import ( "testing" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" + "github.com/elastic/beats/v7/libbeat/common" "github.com/stretchr/testify/assert" @@ -41,32 +43,96 @@ func TestGenerateNodeData(t *testing.T) { }, } - data := generateNodeData(node, &Config{}) + data := generateNodeData(node, &Config{}, &nodeMeta{}) mapping := map[string]interface{}{ - "node": map[string]interface{}{ + "node": common.MapStr{ "uid": string(node.GetUID()), "name": node.GetName(), - "labels": common.MapStr{ - "foo": "bar", - }, - "annotations": common.MapStr{ - "baz": "ban", - }, - "ip": "node1", + "ip": "node1", + }, + "annotations": common.MapStr{ + "baz": "ban", + }, + "labels": common.MapStr{ + "foo": "bar", }, } - processors := []map[string]interface{}{ - { - "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", - }, + processors := map[string]interface{}{ + "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090"}, + }, "kubernetes": common.MapStr{ + "labels": common.MapStr{"foo": "bar"}, + "annotations": common.MapStr{"baz": "ban"}, + "node": common.MapStr{ + "ip": "node1", + "name": "testnode", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}, }, } - assert.Equal(t, node, data.node) assert.Equal(t, mapping, data.mapping) - assert.Equal(t, processors, data.processors) + for _, v := range data.processors { + k := v["add_fields"].(map[string]interface{}) + target := k["target"].(string) + fields := k["fields"] + assert.Equal(t, processors[target], fields) + } +} + +type nodeMeta struct{} + +// Generate generates node metadata from a resource object +// Metadata map is in the following form: +// { +// "kubernetes": {}, +// "some.ecs.field": "asdf" +// } +// All Kubernetes fields that need to be stored under kubernetes. prefix are populated by +// GenerateK8s method while fields that are part of ECS are generated by GenerateECS method +func (n *nodeMeta) Generate(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr { + ecsFields := n.GenerateECS(obj) + meta := common.MapStr{ + "kubernetes": n.GenerateK8s(obj, opts...), + } + meta.DeepUpdate(ecsFields) + return meta +} + +// GenerateECS generates node ECS metadata from a resource object +func (n *nodeMeta) GenerateECS(obj kubernetes.Resource) common.MapStr { + return common.MapStr{ + "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090", + }, + }, + } +} + +// GenerateK8s generates node metadata from a resource object +func (n *nodeMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr { + k8sNode := obj.(*kubernetes.Node) + return common.MapStr{ + "node": common.MapStr{ + "uid": string(k8sNode.GetUID()), + "name": k8sNode.GetName(), + "ip": "node1", + }, + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "baz": "ban", + }, + } +} + +// GenerateFromName generates node metadata from a node name +func (n *nodeMeta) GenerateFromName(name string, opts ...metadata.FieldOptions) common.MapStr { + return nil } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index 7c48f7559763..859f7f29dfde 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -6,6 +6,7 @@ package kubernetes import ( "fmt" + "sync" "time" "github.com/elastic/beats/v7/libbeat/common" @@ -14,17 +15,25 @@ import ( k8s "k8s.io/client-go/kubernetes" "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" ) type pod struct { - logger *logp.Logger - cleanupTimeout time.Duration - comm composable.DynamicProviderComm - scope string - config *Config + logger *logp.Logger + cleanupTimeout time.Duration + comm composable.DynamicProviderComm + scope string + config *Config + metagen metadata.MetaGen + namespaceWatcher kubernetes.Watcher + + // Mutex used by configuration updates not triggered by the main watcher, + // to avoid race conditions between cross updates and deletions. + // Other updaters must use a write lock. + crossUpdate sync.RWMutex } type providerData struct { @@ -33,6 +42,29 @@ type providerData struct { processors []map[string]interface{} } +type containerInPod struct { + id string + runtime string + spec kubernetes.Container + status kubernetes.PodContainerStatus +} + +// podUpdaterHandlerFunc is a function that handles pod updater notifications. +type podUpdaterHandlerFunc func(interface{}) + +// podUpdaterStore is the interface that an object needs to implement to be +// used as a pod updater store. +type podUpdaterStore interface { + List() []interface{} +} + +// namespacePodUpdater notifies updates on pods when their namespaces are updated. +type namespacePodUpdater struct { + handler podUpdaterHandlerFunc + store podUpdaterStore + locker sync.Locker +} + // NewPodWatcher creates a watcher that can discover and process pod objects func NewPodWatcher( comm composable.DynamicProviderComm, @@ -49,13 +81,57 @@ func NewPodWatcher( if err != nil { return nil, errors.New(err, "couldn't create kubernetes watcher") } - watcher.AddEventHandler(&pod{logger, cfg.CleanupTimeout, comm, scope, cfg}) + + options := kubernetes.WatchOptions{ + SyncTimeout: cfg.SyncPeriod, + Node: cfg.Node, + } + metaConf := cfg.AddResourceMetadata + if metaConf == nil { + metaConf = metadata.GetDefaultResourceMetadataConfig() + } + nodeWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil) + if err != nil { + logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + } + namespaceWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: cfg.SyncPeriod, + }, nil) + if err != nil { + logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + } + + rawConfig, err := common.NewConfigFrom(cfg) + if err != nil { + return nil, errors.New(err, "failed to unpack configuration") + } + metaGen := metadata.GetPodMetaGen(rawConfig, watcher, nodeWatcher, namespaceWatcher, metaConf) + + p := pod{ + logger: logger, + cleanupTimeout: cfg.CleanupTimeout, + comm: comm, + scope: scope, + config: cfg, + metagen: metaGen, + namespaceWatcher: namespaceWatcher, + } + + watcher.AddEventHandler(&p) + + if namespaceWatcher != nil && metaConf.Namespace.Enabled() { + updater := newNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate) + namespaceWatcher.AddEventHandler(updater) + } return watcher, nil } func (p *pod) emitRunning(pod *kubernetes.Pod) { - data := generatePodData(pod, p.config) + + namespaceAnnotations := podNamespaceAnnotations(pod, p.namespaceWatcher) + + data := generatePodData(pod, p.config, p.metagen, namespaceAnnotations) data.mapping["scope"] = p.scope // Emit the pod // We emit Pod + containers to ensure that configs matching Pod only @@ -63,19 +139,12 @@ func (p *pod) emitRunning(pod *kubernetes.Pod) { p.comm.AddOrUpdate(data.uid, PodPriority, data.mapping, data.processors) // Emit all containers in the pod - p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses) - // TODO: deal with init containers stopping after initialization - p.emitContainers(pod, pod.Spec.InitContainers, pod.Status.InitContainerStatuses) - - // Get ephemeral containers and their status - ephContainers, ephContainersStatuses := getEphemeralContainers(pod) - p.emitContainers(pod, ephContainers, ephContainersStatuses) - + p.emitContainers(pod, namespaceAnnotations) } -func (p *pod) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus) { - generateContainerData(p.comm, pod, containers, containerstatuses, p.config) +func (p *pod) emitContainers(pod *kubernetes.Pod, namespaceAnnotations common.MapStr) { + generateContainerData(p.comm, pod, p.config, p.metagen, namespaceAnnotations) } func (p *pod) emitStopped(pod *kubernetes.Pod) { @@ -96,6 +165,9 @@ func (p *pod) emitStopped(pod *kubernetes.Pod) { // OnAdd ensures processing of pod objects that are newly added func (p *pod) OnAdd(obj interface{}) { + p.crossUpdate.RLock() + defer p.crossUpdate.RUnlock() + p.logger.Debugf("pod add: %+v", obj) p.emitRunning(obj.(*kubernetes.Pod)) } @@ -104,138 +176,267 @@ func (p *pod) OnAdd(obj interface{}) { // if it is terminating, a stop event is scheduled, if not, a stop and a start // events are sent sequentially to recreate the resources assotiated to the pod. func (p *pod) OnUpdate(obj interface{}) { - pod := obj.(*kubernetes.Pod) + p.crossUpdate.RLock() + defer p.crossUpdate.RUnlock() - p.logger.Debugf("pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) - switch pod.Status.Phase { - case kubernetes.PodSucceeded, kubernetes.PodFailed: - time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) - return - case kubernetes.PodPending: - p.logger.Debugf("pod update (pending): don't know what to do with this pod yet, skipping for now: %+v", obj) - return - } + p.unlockedUpdate(obj) +} - p.logger.Debugf("pod update: %+v", obj) +func (p *pod) unlockedUpdate(obj interface{}) { + p.logger.Debugf("Watcher Pod update: %+v", obj) + pod := obj.(*kubernetes.Pod) p.emitRunning(pod) } // OnDelete stops pod objects that are deleted func (p *pod) OnDelete(obj interface{}) { + p.crossUpdate.RLock() + defer p.crossUpdate.RUnlock() + p.logger.Debugf("pod delete: %+v", obj) pod := obj.(*kubernetes.Pod) - time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) + time.AfterFunc(p.cleanupTimeout, func() { + p.emitStopped(pod) + }) } -func generatePodData(pod *kubernetes.Pod, cfg *Config) providerData { - // TODO: add metadata here too ie -> meta := s.metagen.Generate(pod) +func generatePodData( + pod *kubernetes.Pod, + cfg *Config, + kubeMetaGen metadata.MetaGen, + namespaceAnnotations common.MapStr) providerData { + + meta := kubeMetaGen.Generate(pod) + kubemetaMap, err := meta.GetValue("kubernetes") + if err != nil { + return providerData{} + } + + // k8sMapping includes only the metadata that fall under kubernetes.* + // and these are available as dynamic vars through the provider + k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone()) + + if len(namespaceAnnotations) != 0 { + // TODO: convert it to namespace.annotations for 8.0 + k8sMapping["namespace_annotations"] = namespaceAnnotations + } // Pass annotations to all events so that it can be used in templating and by annotation builders. annotations := common.MapStr{} for k, v := range pod.GetObjectMeta().GetAnnotations() { safemapstr.Put(annotations, k, v) } - - labels := common.MapStr{} - for k, v := range pod.GetObjectMeta().GetLabels() { - // TODO: add dedoting option - safemapstr.Put(labels, k, v) + k8sMapping["annotations"] = annotations + + processors := []map[string]interface{}{} + // meta map includes metadata that go under kubernetes.* + // but also other ECS fields like orchestrator.* + for field, metaMap := range meta { + processor := map[string]interface{}{ + "add_fields": map[string]interface{}{ + "fields": metaMap, + "target": field, + }, + } + processors = append(processors, processor) } - mapping := map[string]interface{}{ - "namespace": pod.GetNamespace(), - "pod": map[string]interface{}{ - "uid": string(pod.GetUID()), - "name": pod.GetName(), - "labels": labels, - "annotations": annotations, - "ip": pod.Status.PodIP, - }, - } return providerData{ - uid: string(pod.GetUID()), - mapping: mapping, - processors: []map[string]interface{}{ - { - "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", - }, - }, - }, + uid: string(pod.GetUID()), + mapping: k8sMapping, + processors: processors, } } func generateContainerData( comm composable.DynamicProviderComm, pod *kubernetes.Pod, - containers []kubernetes.Container, - containerstatuses []kubernetes.PodContainerStatus, - cfg *Config) { - //TODO: add metadata here too ie -> meta := s.metagen.Generate() + cfg *Config, + kubeMetaGen metadata.MetaGen, + namespaceAnnotations common.MapStr) { - containerIDs := map[string]string{} - runtimes := map[string]string{} - for _, c := range containerstatuses { - cid, runtime := kubernetes.ContainerIDWithRuntime(c) - containerIDs[c.Name] = cid - runtimes[c.Name] = runtime - } + containers := getContainersInPod(pod) - labels := common.MapStr{} - for k, v := range pod.GetObjectMeta().GetLabels() { - safemapstr.Put(labels, k, v) + // Pass annotations to all events so that it can be used in templating and by annotation builders. + annotations := common.MapStr{} + for k, v := range pod.GetObjectMeta().GetAnnotations() { + safemapstr.Put(annotations, k, v) } for _, c := range containers { // If it doesn't have an ID, container doesn't exist in // the runtime, emit only an event if we are stopping, so // we are sure of cleaning up configurations. - cid := containerIDs[c.Name] - if cid == "" { + if c.id == "" { continue } // ID is the combination of pod UID + container name - eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) + eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.spec.Name) - mapping := map[string]interface{}{ - "namespace": pod.GetNamespace(), - "pod": map[string]interface{}{ - "uid": string(pod.GetUID()), - "name": pod.GetName(), - "labels": labels, - "ip": pod.Status.PodIP, - }, - "container": map[string]interface{}{ - "id": cid, - "name": c.Name, - "image": c.Image, - "runtime": runtimes[c.Name], + meta := kubeMetaGen.Generate(pod, metadata.WithFields("container.name", c.spec.Name)) + kubemetaMap, err := meta.GetValue("kubernetes") + if err != nil { + continue + } + + // k8sMapping includes only the metadata that fall under kubernetes.* + // and these are available as dynamic vars through the provider + k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone()) + + if len(namespaceAnnotations) != 0 { + // TODO: convert it to namespace.annotations for 8.0 + k8sMapping["namespace_annotations"] = namespaceAnnotations + } + + // add annotations to be discoverable by templates + k8sMapping["annotations"] = annotations + + //container ECS fields + cmeta := common.MapStr{ + "id": c.id, + "runtime": c.runtime, + "image": common.MapStr{ + "name": c.spec.Image, }, } processors := []map[string]interface{}{ { "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", + "fields": cmeta, + "target": "container", }, }, } - comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors) + // meta map includes metadata that go under kubernetes.* + // but also other ECS fields like orchestrator.* + for field, metaMap := range meta { + processor := map[string]interface{}{ + "add_fields": map[string]interface{}{ + "fields": metaMap, + "target": field, + }, + } + processors = append(processors, processor) + } + + // add container metadata under kubernetes.container.* to + // make them available to dynamic var resolution + containerMeta := common.MapStr{ + "id": c.id, + "name": c.spec.Name, + "image": c.spec.Image, + "runtime": c.runtime, + } + if len(c.spec.Ports) > 0 { + for _, port := range c.spec.Ports { + containerMeta.Put("port", fmt.Sprintf("%v", port.ContainerPort)) + containerMeta.Put("port_name", port.Name) + k8sMapping["container"] = containerMeta + comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors) + } + } else { + k8sMapping["container"] = containerMeta + comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors) + } } } -func getEphemeralContainers(pod *kubernetes.Pod) ([]kubernetes.Container, []kubernetes.PodContainerStatus) { - var ephContainers []kubernetes.Container - var ephContainersStatuses []kubernetes.PodContainerStatus +// podNamespaceAnnotations returns the annotations of the namespace of the pod +func podNamespaceAnnotations(pod *kubernetes.Pod, watcher kubernetes.Watcher) common.MapStr { + if watcher == nil { + return nil + } + + rawNs, ok, err := watcher.Store().GetByKey(pod.Namespace) + if !ok || err != nil { + return nil + } + + namespace, ok := rawNs.(*kubernetes.Namespace) + if !ok { + return nil + } + + annotations := common.MapStr{} + for k, v := range namespace.GetAnnotations() { + safemapstr.Put(annotations, k, v) + } + return annotations +} + +// newNamespacePodUpdater creates a namespacePodUpdater +func newNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *namespacePodUpdater { + return &namespacePodUpdater{ + handler: handler, + store: store, + locker: locker, + } +} + +// OnUpdate handles update events on namespaces. +func (n *namespacePodUpdater) OnUpdate(obj interface{}) { + ns, ok := obj.(*kubernetes.Namespace) + if !ok { + return + } + + // n.store.List() returns a snapshot at this point. If a delete is received + // from the main watcher, this loop may generate an update event after the + // delete is processed, leaving configurations that would never be deleted. + // Also this loop can miss updates, what could leave outdated configurations. + // Avoid these issues by locking the processing of events from the main watcher. + if n.locker != nil { + n.locker.Lock() + defer n.locker.Unlock() + } + for _, pod := range n.store.List() { + pod, ok := pod.(*kubernetes.Pod) + if ok && pod.Namespace == ns.Name { + n.handler(pod) + } + } +} + +// OnAdd handles add events on namespaces. Nothing to do, if pods are added to this +// namespace they will generate their own add events. +func (*namespacePodUpdater) OnAdd(interface{}) {} + +// OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this +// namespace they will generate their own delete events. +func (*namespacePodUpdater) OnDelete(interface{}) {} + +// getContainersInPod returns all the containers defined in a pod and their statuses. +// It includes init and ephemeral containers. +func getContainersInPod(pod *kubernetes.Pod) []*containerInPod { + var containers []*containerInPod + for _, c := range pod.Spec.Containers { + containers = append(containers, &containerInPod{spec: c}) + } + for _, c := range pod.Spec.InitContainers { + containers = append(containers, &containerInPod{spec: c}) + } for _, c := range pod.Spec.EphemeralContainers { c := kubernetes.Container(c.EphemeralContainerCommon) - ephContainers = append(ephContainers, c) + containers = append(containers, &containerInPod{spec: c}) } - for _, s := range pod.Status.EphemeralContainerStatuses { - ephContainersStatuses = append(ephContainersStatuses, s) + + statuses := make(map[string]*kubernetes.PodContainerStatus) + mapStatuses := func(s []kubernetes.PodContainerStatus) { + for i := range s { + statuses[s[i].Name] = &s[i] + } + } + mapStatuses(pod.Status.ContainerStatuses) + mapStatuses(pod.Status.InitContainerStatuses) + mapStatuses(pod.Status.EphemeralContainerStatuses) + for _, c := range containers { + if s, ok := statuses[c.spec.Name]; ok { + c.id, c.runtime = kubernetes.ContainerIDWithRuntime(*s) + c.status = *s + } } - return ephContainers, ephContainersStatuses + + return containers } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go index 00c7ee84766e..62d4a199892e 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go @@ -9,15 +9,15 @@ import ( "fmt" "testing" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" ) func TestGeneratePodData(t *testing.T) { @@ -44,38 +44,77 @@ func TestGeneratePodData(t *testing.T) { Status: kubernetes.PodStatus{PodIP: "127.0.0.5"}, } - data := generatePodData(pod, &Config{}) + namespaceAnnotations := common.MapStr{ + "nsa": "nsb", + } + data := generatePodData(pod, &Config{}, &podMeta{}, namespaceAnnotations) mapping := map[string]interface{}{ "namespace": pod.GetNamespace(), - "pod": map[string]interface{}{ + "pod": common.MapStr{ "uid": string(pod.GetUID()), "name": pod.GetName(), - "labels": common.MapStr{ - "foo": "bar", - }, - "annotations": common.MapStr{ - "app": "production", - }, - "ip": pod.Status.PodIP, + "ip": pod.Status.PodIP, }, - } - processors := []map[string]interface{}{ - { - "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", - }, + "namespace_annotations": common.MapStr{ + "nsa": "nsb", + }, + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "app": "production", }, } + processors := map[string]interface{}{ + "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090"}, + }, "kubernetes": common.MapStr{ + "namespace": "testns", + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{"app": "production"}, + "pod": common.MapStr{ + "ip": "127.0.0.5", + "name": "testpod", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}}, + } assert.Equal(t, string(pod.GetUID()), data.uid) assert.Equal(t, mapping, data.mapping) - assert.Equal(t, processors, data.processors) + for _, v := range data.processors { + k := v["add_fields"].(map[string]interface{}) + target := k["target"].(string) + fields := k["fields"] + assert.Equal(t, processors[target], fields) + } } func TestGenerateContainerPodData(t *testing.T) { uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + containers := []kubernetes.Container{ + { + Name: "nginx", + Image: "nginx:1.120", + Ports: []kubernetes.ContainerPort{ + { + Name: "http", + Protocol: v1.ProtocolTCP, + ContainerPort: 80, + }, + }, + }, + } + containerStatuses := []kubernetes.PodContainerStatus{ + { + Name: "nginx", + Ready: true, + ContainerID: "crio://asdfghdeadbeef", + }, + } pod := &kubernetes.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "testpod", @@ -93,23 +132,94 @@ func TestGenerateContainerPodData(t *testing.T) { APIVersion: "v1", }, Spec: kubernetes.PodSpec{ - NodeName: "testnode", + NodeName: "testnode", + Containers: containers, + }, + Status: kubernetes.PodStatus{ + PodIP: "127.0.0.5", + ContainerStatuses: containerStatuses, }, - Status: kubernetes.PodStatus{PodIP: "127.0.0.5"}, } providerDataChan := make(chan providerData, 1) - containers := []kubernetes.Container{ + comm := MockDynamicComm{ + context.TODO(), + providerDataChan, + } + generateContainerData( + &comm, + pod, + &Config{}, + &podMeta{}, + common.MapStr{ + "nsa": "nsb", + }) + + mapping := map[string]interface{}{ + "namespace": pod.GetNamespace(), + "pod": common.MapStr{ + "uid": string(pod.GetUID()), + "name": pod.GetName(), + "ip": pod.Status.PodIP, + }, + "container": common.MapStr{ + "id": "asdfghdeadbeef", + "name": "nginx", + "image": "nginx:1.120", + "runtime": "crio", + "port": "80", + "port_name": "http", + }, + "namespace_annotations": common.MapStr{ + "nsa": "nsb", + }, + "annotations": common.MapStr{ + "app": "production", + }, + "labels": common.MapStr{ + "foo": "bar", + }, + } + + processors := map[string]interface{}{ + "container": common.MapStr{ + "id": "asdfghdeadbeef", + "image": common.MapStr{"name": "nginx:1.120"}, + "runtime": "crio", + }, "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090"}, + }, "kubernetes": common.MapStr{ + "namespace": "testns", + "annotations": common.MapStr{"app": "production"}, + "labels": common.MapStr{"foo": "bar"}, + "pod": common.MapStr{ + "ip": "127.0.0.5", + "name": "testpod", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}}, + } + cuid := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), "nginx") + data := <-providerDataChan + assert.Equal(t, cuid, data.uid) + assert.Equal(t, mapping, data.mapping) + for _, v := range data.processors { + k := v["add_fields"].(map[string]interface{}) + target := k["target"].(string) + fields := k["fields"] + assert.Equal(t, processors[target], fields) + } + +} + +func TestEphemeralContainers(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + containers := []v1.EphemeralContainer{ { - Name: "nginx", - Image: "nginx:1.120", - Ports: []kubernetes.ContainerPort{ - { - Name: "http", - Protocol: v1.ProtocolTCP, - ContainerPort: 80, - }, + EphemeralContainerCommon: v1.EphemeralContainerCommon{ + Image: "nginx:1.120", + Name: "nginx", }, }, } @@ -120,6 +230,34 @@ func TestGenerateContainerPodData(t *testing.T) { ContainerID: "crio://asdfghdeadbeef", }, } + pod := &kubernetes.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + UID: types.UID(uid), + Namespace: "testns", + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{ + "app": "production", + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + Spec: kubernetes.PodSpec{ + NodeName: "testnode", + EphemeralContainers: containers, + }, + Status: kubernetes.PodStatus{ + PodIP: "127.0.0.5", + EphemeralContainerStatuses: containerStatuses, + }, + } + + providerDataChan := make(chan providerData, 1) + comm := MockDynamicComm{ context.TODO(), providerDataChan, @@ -127,116 +265,65 @@ func TestGenerateContainerPodData(t *testing.T) { generateContainerData( &comm, pod, - containers, - containerStatuses, - &Config{}) + &Config{}, + &podMeta{}, + common.MapStr{ + "nsa": "nsb", + }) mapping := map[string]interface{}{ "namespace": pod.GetNamespace(), - "pod": map[string]interface{}{ + "pod": common.MapStr{ "uid": string(pod.GetUID()), "name": pod.GetName(), - "labels": common.MapStr{ - "foo": "bar", - }, - "ip": pod.Status.PodIP, + "ip": pod.Status.PodIP, + }, + "labels": common.MapStr{ + "foo": "bar", }, - "container": map[string]interface{}{ + "container": common.MapStr{ "id": "asdfghdeadbeef", "name": "nginx", "image": "nginx:1.120", "runtime": "crio", }, - } - - processors := []map[string]interface{}{ - { - "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", - }, + "namespace_annotations": common.MapStr{ + "nsa": "nsb", + }, + "annotations": common.MapStr{ + "app": "production", }, } + processors := map[string]interface{}{ + "container": common.MapStr{ + "id": "asdfghdeadbeef", + "image": common.MapStr{"name": "nginx:1.120"}, + "runtime": "crio", + }, "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090"}, + }, "kubernetes": common.MapStr{ + "namespace": "testns", + "labels": common.MapStr{"foo": "bar"}, + "annotations": common.MapStr{"app": "production"}, + "pod": common.MapStr{ + "ip": "127.0.0.5", + "name": "testpod", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}}, + } cuid := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), "nginx") data := <-providerDataChan assert.Equal(t, cuid, data.uid) assert.Equal(t, mapping, data.mapping) - assert.Equal(t, processors, data.processors) - -} - -func TestGetEphemeralContainers(t *testing.T) { - name := "filebeat" - namespace := "default" - podIP := "127.0.0.1" - containerID := "docker://foobar" - uid := "005f3b90-4b9d-12f8-acf0-31020a840133" - containerImage := "elastic/filebeat:6.3.0" - node := "node" - - expectedEphemeralContainers := - []kubernetes.Container{ - { - Name: "filebeat", - Image: "elastic/filebeat:6.3.0", - }, - } - expectedephemeralContainersStatuses := - []kubernetes.PodContainerStatus{ - { - Name: "filebeat", - State: v1.ContainerState{ - Running: &v1.ContainerStateRunning{ - StartedAt: metav1.Time{}, - }, - }, - Ready: false, - ContainerID: "docker://foobar", - }, - } + for _, v := range data.processors { + k := v["add_fields"].(map[string]interface{}) + target := k["target"].(string) + fields := k["fields"] + assert.Equal(t, processors[target], fields) + } - pod := - &kubernetes.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - UID: types.UID(uid), - Namespace: namespace, - Labels: map[string]string{}, - Annotations: map[string]string{}, - }, - TypeMeta: metav1.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - Status: v1.PodStatus{ - PodIP: podIP, - Phase: kubernetes.PodRunning, - EphemeralContainerStatuses: []kubernetes.PodContainerStatus{ - { - Name: name, - ContainerID: containerID, - State: v1.ContainerState{ - Running: &v1.ContainerStateRunning{}, - }, - }, - }, - }, - Spec: v1.PodSpec{ - NodeName: node, - EphemeralContainers: []v1.EphemeralContainer{ - { - EphemeralContainerCommon: v1.EphemeralContainerCommon{ - Image: containerImage, - Name: name, - }, - }, - }, - }, - } - ephContainers, ephContainersStatuses := getEphemeralContainers(pod) - assert.Equal(t, expectedEphemeralContainers, ephContainers) - assert.Equal(t, expectedephemeralContainersStatuses, ephContainersStatuses) } // MockDynamicComm is used in tests. @@ -258,3 +345,58 @@ func (t *MockDynamicComm) AddOrUpdate(id string, priority int, mapping map[strin // Remove func (t *MockDynamicComm) Remove(id string) { } + +type podMeta struct{} + +// Generate generates pod metadata from a resource object +// Metadata map is in the following form: +// { +// "kubernetes": {}, +// "some.ecs.field": "asdf" +// } +// All Kubernetes fields that need to be stored under kubernetes. prefix are populated by +// GenerateK8s method while fields that are part of ECS are generated by GenerateECS method +func (p *podMeta) Generate(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr { + ecsFields := p.GenerateECS(obj) + meta := common.MapStr{ + "kubernetes": p.GenerateK8s(obj, opts...), + } + meta.DeepUpdate(ecsFields) + return meta +} + +// GenerateECS generates pod ECS metadata from a resource object +func (p *podMeta) GenerateECS(obj kubernetes.Resource) common.MapStr { + return common.MapStr{ + "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090", + }, + }, + } +} + +// GenerateK8s generates pod metadata from a resource object +func (p *podMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr { + k8sPod := obj.(*kubernetes.Pod) + return common.MapStr{ + "namespace": k8sPod.GetNamespace(), + "pod": common.MapStr{ + "uid": string(k8sPod.GetUID()), + "name": k8sPod.GetName(), + "ip": k8sPod.Status.PodIP, + }, + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "app": "production", + }, + } +} + +// GenerateFromName generates pod metadata from a node name +func (p *podMeta) GenerateFromName(name string, opts ...metadata.FieldOptions) common.MapStr { + return nil +} diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go index a0f73b16382e..03686ca70455 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go @@ -5,24 +5,29 @@ package kubernetes import ( + "fmt" "time" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" + "github.com/elastic/beats/v7/libbeat/common/safemapstr" + k8s "k8s.io/client-go/kubernetes" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" - "github.com/elastic/beats/v7/libbeat/common/safemapstr" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" ) type service struct { - logger *logp.Logger - cleanupTimeout time.Duration - comm composable.DynamicProviderComm - scope string - config *Config + logger *logp.Logger + cleanupTimeout time.Duration + comm composable.DynamicProviderComm + scope string + config *Config + metagen metadata.MetaGen + namespaceWatcher kubernetes.Watcher } type serviceData struct { @@ -46,13 +51,39 @@ func NewServiceWatcher( if err != nil { return nil, errors.New(err, "couldn't create kubernetes watcher") } - watcher.AddEventHandler(&service{logger, cfg.CleanupTimeout, comm, scope, cfg}) + + metaConf := metadata.GetDefaultResourceMetadataConfig() + namespaceWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: cfg.SyncPeriod, + Namespace: cfg.Namespace, + }, nil) + if err != nil { + return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + } + namespaceMeta := metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), client) + + rawConfig, err := common.NewConfigFrom(cfg) + if err != nil { + return nil, errors.New(err, "failed to unpack configuration") + } + + metaGen := metadata.NewServiceMetadataGenerator(rawConfig, watcher.Store(), namespaceMeta, client) + watcher.AddEventHandler(&service{ + logger, + cfg.CleanupTimeout, + comm, + scope, + cfg, + metaGen, + namespaceWatcher, + }) return watcher, nil } func (s *service) emitRunning(service *kubernetes.Service) { - data := generateServiceData(service, s.config) + namespaceAnnotations := svcNamespaceAnnotations(service, s.namespaceWatcher) + data := generateServiceData(service, s.config, s.metagen, namespaceAnnotations) if data == nil { return } @@ -62,6 +93,29 @@ func (s *service) emitRunning(service *kubernetes.Service) { s.comm.AddOrUpdate(string(service.GetUID()), ServicePriority, data.mapping, data.processors) } +// svcNamespaceAnnotations returns the annotations of the namespace of the service +func svcNamespaceAnnotations(svc *kubernetes.Service, watcher kubernetes.Watcher) common.MapStr { + if watcher == nil { + return nil + } + + rawNs, ok, err := watcher.Store().GetByKey(svc.Namespace) + if !ok || err != nil { + return nil + } + + namespace, ok := rawNs.(*kubernetes.Namespace) + if !ok { + return nil + } + + annotations := common.MapStr{} + for k, v := range namespace.GetAnnotations() { + safemapstr.Put(annotations, k, v) + } + return annotations +} + func (s *service) emitStopped(service *kubernetes.Service) { s.comm.Remove(string(service.GetUID())) } @@ -92,7 +146,11 @@ func (s *service) OnDelete(obj interface{}) { time.AfterFunc(s.cleanupTimeout, func() { s.emitStopped(service) }) } -func generateServiceData(service *kubernetes.Service, cfg *Config) *serviceData { +func generateServiceData( + service *kubernetes.Service, + cfg *Config, + kubeMetaGen metadata.MetaGen, + namespaceAnnotations common.MapStr) *serviceData { host := service.Spec.ClusterIP // If a service doesn't have an IP then dont monitor it @@ -100,7 +158,20 @@ func generateServiceData(service *kubernetes.Service, cfg *Config) *serviceData return nil } - //TODO: add metadata here too ie -> meta := s.metagen.Generate(service) + meta := kubeMetaGen.Generate(service) + kubemetaMap, err := meta.GetValue("kubernetes") + if err != nil { + return &serviceData{} + } + + // k8sMapping includes only the metadata that fall under kubernetes.* + // and these are available as dynamic vars through the provider + k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone()) + + if len(namespaceAnnotations) != 0 { + // TODO: convert it to namespace.annotations for 8.0 + k8sMapping["namespace_annotations"] = namespaceAnnotations + } // Pass annotations to all events so that it can be used in templating and by annotation builders. annotations := common.MapStr{} @@ -108,33 +179,25 @@ func generateServiceData(service *kubernetes.Service, cfg *Config) *serviceData safemapstr.Put(annotations, k, v) } - labels := common.MapStr{} - for k, v := range service.GetObjectMeta().GetLabels() { - // TODO: add dedoting option - safemapstr.Put(labels, k, v) - } - - mapping := map[string]interface{}{ - "service": map[string]interface{}{ - "uid": string(service.GetUID()), - "name": service.GetName(), - "labels": labels, - "annotations": annotations, - "ip": host, - }, - } + // add annotations to be discoverable by templates + k8sMapping["annotations"] = annotations - processors := []map[string]interface{}{ - { + processors := []map[string]interface{}{} + // meta map includes metadata that go under kubernetes.* + // but also other ECS fields like orchestrator.* + for field, metaMap := range meta { + processor := map[string]interface{}{ "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", + "fields": metaMap, + "target": field, }, - }, + } + processors = append(processors, processor) } + return &serviceData{ service: service, - mapping: mapping, + mapping: k8sMapping, processors: processors, } } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go index c52a1069728e..c183541e6a73 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go @@ -7,6 +7,8 @@ package kubernetes import ( "testing" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" + "github.com/elastic/beats/v7/libbeat/common" "github.com/stretchr/testify/assert" @@ -45,32 +47,111 @@ func TestGenerateServiceData(t *testing.T) { }, } - data := generateServiceData(service, &Config{}) + data := generateServiceData( + service, + &Config{}, + &svcMeta{}, + common.MapStr{ + "nsa": "nsb", + }) mapping := map[string]interface{}{ - "service": map[string]interface{}{ + "service": common.MapStr{ "uid": string(service.GetUID()), "name": service.GetName(), + "ip": service.Spec.ClusterIP, + }, + "namespace_annotations": common.MapStr{ + "nsa": "nsb", + }, + "annotations": common.MapStr{ + "baz": "ban", + }, + "labels": common.MapStr{ + "foo": "bar", + }, + } + + processors := map[string]interface{}{ + "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090"}, + }, "kubernetes": common.MapStr{ + "service": common.MapStr{ + "uid": string(service.GetUID()), + "name": service.GetName(), + "ip": "1.2.3.4", + }, "labels": common.MapStr{ "foo": "bar", }, "annotations": common.MapStr{ "baz": "ban", }, - "ip": service.Spec.ClusterIP, }, } - processors := []map[string]interface{}{ - { - "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", + assert.Equal(t, service, data.service) + assert.Equal(t, mapping, data.mapping) + for _, v := range data.processors { + k := v["add_fields"].(map[string]interface{}) + target := k["target"].(string) + fields := k["fields"] + assert.Equal(t, processors[target], fields) + } +} + +type svcMeta struct{} + +// Generate generates svc metadata from a resource object +// Metadata map is in the following form: +// { +// "kubernetes": {}, +// "some.ecs.field": "asdf" +// } +// All Kubernetes fields that need to be stored under kubernetes. prefix are populated by +// GenerateK8s method while fields that are part of ECS are generated by GenerateECS method +func (s *svcMeta) Generate(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr { + ecsFields := s.GenerateECS(obj) + meta := common.MapStr{ + "kubernetes": s.GenerateK8s(obj, opts...), + } + meta.DeepUpdate(ecsFields) + return meta +} + +// GenerateECS generates svc ECS metadata from a resource object +func (s *svcMeta) GenerateECS(obj kubernetes.Resource) common.MapStr { + return common.MapStr{ + "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090", }, }, } +} - assert.Equal(t, service, data.service) - assert.Equal(t, mapping, data.mapping) - assert.Equal(t, processors, data.processors) +// GenerateK8s generates svc metadata from a resource object +func (s *svcMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr { + k8sNode := obj.(*kubernetes.Service) + return common.MapStr{ + "service": common.MapStr{ + "uid": string(k8sNode.GetUID()), + "name": k8sNode.GetName(), + "ip": "1.2.3.4", + }, + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "baz": "ban", + }, + } +} + +// GenerateFromName generates svc metadata from a node name +func (s *svcMeta) GenerateFromName(name string, opts ...metadata.FieldOptions) common.MapStr { + return nil }