From b02f663a5c2357881c14e458d26eea0fe9a12a63 Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 5 Oct 2021 12:01:02 +0300 Subject: [PATCH 1/6] Port IncludeAnnotations setting to Agent and small mainfest fix Signed-off-by: chrismark --- deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml | 2 +- .../elastic-agent-standalone-daemonset-configmap.yaml | 2 +- .../elastic-agent/pkg/composable/providers/kubernetes/config.go | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml b/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml index fd413ee7912..9663a899b31 100644 --- a/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml +++ b/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml @@ -441,7 +441,7 @@ data: # maxconn: 10 # network: tcp # period: 10s - # condition: ${kubernetes.pod.labels.app} == 'redis' + # condition: ${kubernetes.labels.app} == 'redis' --- apiVersion: apps/v1 kind: DaemonSet diff --git a/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-daemonset-configmap.yaml b/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-daemonset-configmap.yaml index f5cb508d367..2688bbfc61c 100644 --- a/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-daemonset-configmap.yaml +++ b/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-daemonset-configmap.yaml @@ -441,4 +441,4 @@ data: # maxconn: 10 # network: tcp # period: 10s - # condition: ${kubernetes.pod.labels.app} == 'redis' + # condition: ${kubernetes.labels.app} == 'redis' diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go index ddad6fd2540..dd40c53587c 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go @@ -30,6 +30,7 @@ type Config struct { AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"` IncludeLabels []string `config:"include_labels"` ExcludeLabels []string `config:"exclude_labels"` + IncludeAnnotations []string `config:"include_annotations"` LabelsDedot bool `config:"labels.dedot"` AnnotationsDedot bool `config:"annotations.dedot"` From 4ba9b8c8fd7de04d396ae8d7655ce6f43dbb7ffd Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 5 Oct 2021 13:56:22 +0300 Subject: [PATCH 2/6] Fix Eventers' start/stop Signed-off-by: chrismark --- .../providers/kubernetes/kubernetes.go | 30 +++++++----- .../composable/providers/kubernetes/node.go | 25 +++++++--- .../composable/providers/kubernetes/pod.go | 47 ++++++++++++++++--- .../providers/kubernetes/service.go | 34 +++++++++++--- 4 files changed, 107 insertions(+), 29 deletions(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go index c43e5f98430..6a612b2896e 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go @@ -110,42 +110,50 @@ func (p *dynamicProvider) watchResource( p.config.Node = "" } - watcher, err := p.newWatcher(resourceType, comm, client) + eventer, err := p.newEventer(resourceType, comm, client) if err != nil { return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType) } - err = watcher.Start() + err = eventer.Start() if err != nil { - return errors.New(err, "couldn't start kubernetes watcher for resource %s", resourceType) + return errors.New(err, "couldn't start kubernetes eventer for resource %s", resourceType) } + return nil } +// Eventer allows defining ways in which kubernetes resource events are observed and processed +type Eventer interface { + kubernetes.ResourceEventHandler + Start() error + Stop() +} + // newWatcher initializes the proper watcher according to the given resource (pod, node, service). -func (p *dynamicProvider) newWatcher( +func (p *dynamicProvider) newEventer( resourceType string, comm composable.DynamicProviderComm, - client k8s.Interface) (kubernetes.Watcher, error) { + client k8s.Interface) (Eventer, error) { switch resourceType { case "pod": - watcher, err := NewPodWatcher(comm, p.config, p.logger, client, p.config.Scope) + eventer, err := NewPodEventer(comm, p.config, p.logger, client, p.config.Scope) if err != nil { return nil, err } - return watcher, nil + return eventer, nil case "node": - watcher, err := NewNodeWatcher(comm, p.config, p.logger, client, p.config.Scope) + eventer, err := NewNodeEventer(comm, p.config, p.logger, client, p.config.Scope) if err != nil { return nil, err } - return watcher, nil + return eventer, nil case "service": - watcher, err := NewServiceWatcher(comm, p.config, p.logger, client, p.config.Scope) + eventer, err := NewServiceEventer(comm, p.config, p.logger, client, p.config.Scope) if err != nil { return nil, err } - return watcher, nil + return eventer, nil default: return nil, fmt.Errorf("unsupported autodiscover resource %s", resourceType) } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go index 17802735d07..12143ba4404 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go @@ -27,6 +27,7 @@ type node struct { scope string config *Config metagen metadata.MetaGen + watcher kubernetes.Watcher } type nodeData struct { @@ -35,13 +36,13 @@ type nodeData struct { processors []map[string]interface{} } -// NewNodeWatcher creates a watcher that can discover and process node objects -func NewNodeWatcher( +// NewNodeEventer creates a watcher that can discover and process node objects +func NewNodeEventer( comm composable.DynamicProviderComm, cfg *Config, logger *logp.Logger, client k8s.Interface, - scope string) (kubernetes.Watcher, error) { + scope string) (Eventer, error) { watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, @@ -57,15 +58,17 @@ func NewNodeWatcher( return nil, errors.New(err, "failed to unpack configuration") } metaGen := metadata.NewNodeMetadataGenerator(rawConfig, watcher.Store(), client) - watcher.AddEventHandler(&node{ + n := &node{ logger, cfg.CleanupTimeout, comm, scope, cfg, - metaGen}) + metaGen, + watcher} + watcher.AddEventHandler(n) - return watcher, nil + return n, nil } func (n *node) emitRunning(node *kubernetes.Node) { @@ -83,6 +86,16 @@ func (n *node) emitStopped(node *kubernetes.Node) { n.comm.Remove(string(node.GetUID())) } +// Start starts the eventer +func (n *node) Start() error { + return n.watcher.Start() +} + +// Stop stops the eventer +func (n *node) Stop() { + n.watcher.Stop() +} + // OnAdd ensures processing of node objects that are newly created func (n *node) OnAdd(obj interface{}) { n.logger.Debugf("Watcher Node add: %+v", obj) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index 859f7f29dfd..fb007ef3374 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -28,6 +28,8 @@ type pod struct { scope string config *Config metagen metadata.MetaGen + watcher kubernetes.Watcher + nodeWatcher kubernetes.Watcher namespaceWatcher kubernetes.Watcher // Mutex used by configuration updates not triggered by the main watcher, @@ -65,13 +67,13 @@ type namespacePodUpdater struct { locker sync.Locker } -// NewPodWatcher creates a watcher that can discover and process pod objects -func NewPodWatcher( +// NewPodEventer creates a watcher that can discover and process pod objects +func NewPodEventer( comm composable.DynamicProviderComm, cfg *Config, logger *logp.Logger, client k8s.Interface, - scope string) (kubernetes.Watcher, error) { + scope string) (Eventer, error) { watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, @@ -107,24 +109,57 @@ func NewPodWatcher( } metaGen := metadata.GetPodMetaGen(rawConfig, watcher, nodeWatcher, namespaceWatcher, metaConf) - p := pod{ + p := &pod{ logger: logger, cleanupTimeout: cfg.CleanupTimeout, comm: comm, scope: scope, config: cfg, metagen: metaGen, + watcher: watcher, + nodeWatcher: nodeWatcher, namespaceWatcher: namespaceWatcher, } - watcher.AddEventHandler(&p) + watcher.AddEventHandler(p) if namespaceWatcher != nil && metaConf.Namespace.Enabled() { updater := newNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate) namespaceWatcher.AddEventHandler(updater) } - return watcher, nil + return p, nil +} + +// Start starts the eventer +func (p *pod) Start() error { + if p.nodeWatcher != nil { + err := p.nodeWatcher.Start() + if err != nil { + return err + } + } + + if p.namespaceWatcher != nil { + if err := p.namespaceWatcher.Start(); err != nil { + return err + } + } + + return p.watcher.Start() +} + +// Stop stops the eventer +func (p *pod) Stop() { + p.watcher.Stop() + + if p.namespaceWatcher != nil { + p.namespaceWatcher.Stop() + } + + if p.nodeWatcher != nil { + p.nodeWatcher.Stop() + } } func (p *pod) emitRunning(pod *kubernetes.Pod) { diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go index 03686ca7045..71bf82ca66a 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go @@ -27,6 +27,7 @@ type service struct { scope string config *Config metagen metadata.MetaGen + watcher kubernetes.Watcher namespaceWatcher kubernetes.Watcher } @@ -36,13 +37,13 @@ type serviceData struct { processors []map[string]interface{} } -// NewServiceWatcher creates a watcher that can discover and process service objects -func NewServiceWatcher( +// NewServiceEventer creates a watcher that can discover and process service objects +func NewServiceEventer( comm composable.DynamicProviderComm, cfg *Config, logger *logp.Logger, client k8s.Interface, - scope string) (kubernetes.Watcher, error) { + scope string) (Eventer, error) { watcher, err := kubernetes.NewWatcher(client, &kubernetes.Service{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, @@ -68,17 +69,38 @@ func NewServiceWatcher( } metaGen := metadata.NewServiceMetadataGenerator(rawConfig, watcher.Store(), namespaceMeta, client) - watcher.AddEventHandler(&service{ + s := &service{ logger, cfg.CleanupTimeout, comm, scope, cfg, metaGen, + watcher, namespaceWatcher, - }) + } + watcher.AddEventHandler(s) + + return s, nil +} - return watcher, nil +// Start starts the eventer +func (s *service) Start() error { + if s.namespaceWatcher != nil { + if err := s.namespaceWatcher.Start(); err != nil { + return err + } + } + return s.watcher.Start() +} + +// Stop stops the eventer +func (s *service) Stop() { + s.watcher.Stop() + + if s.namespaceWatcher != nil { + s.namespaceWatcher.Stop() + } } func (s *service) emitRunning(service *kubernetes.Service) { From 33815e136000f91a50e06a17e19291c6352ba610 Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Tue, 5 Oct 2021 15:14:03 +0300 Subject: [PATCH 3/6] Update x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go Co-authored-by: Michael Katsoulis --- .../pkg/composable/providers/kubernetes/kubernetes.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go index 6a612b2896e..e542d8afb32 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go @@ -130,7 +130,7 @@ type Eventer interface { Stop() } -// newWatcher initializes the proper watcher according to the given resource (pod, node, service). +// newEventer initializes the proper eventer according to the given resource (pod, node, service). func (p *dynamicProvider) newEventer( resourceType string, comm composable.DynamicProviderComm, From ca67350e75eb4dbd835e620720cb84ccee29326f Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Tue, 5 Oct 2021 15:14:10 +0300 Subject: [PATCH 4/6] Update x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go Co-authored-by: Michael Katsoulis --- .../elastic-agent/pkg/composable/providers/kubernetes/node.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go index 12143ba4404..db63a1cc2ab 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go @@ -36,7 +36,7 @@ type nodeData struct { processors []map[string]interface{} } -// NewNodeEventer creates a watcher that can discover and process node objects +// NewNodeEventer creates an eventer that can discover and process node objects func NewNodeEventer( comm composable.DynamicProviderComm, cfg *Config, From a11bb95d2d97d5925587729aa92c6888856c234d Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Tue, 5 Oct 2021 15:14:19 +0300 Subject: [PATCH 5/6] Update x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go Co-authored-by: Michael Katsoulis --- x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index fb007ef3374..80b28fcab47 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -67,7 +67,7 @@ type namespacePodUpdater struct { locker sync.Locker } -// NewPodEventer creates a watcher that can discover and process pod objects +// NewPodEventer creates an eventer that can discover and process pod objects func NewPodEventer( comm composable.DynamicProviderComm, cfg *Config, From 722010b0205be8e8fdef08921897b286adb8fb52 Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Tue, 5 Oct 2021 15:14:30 +0300 Subject: [PATCH 6/6] Update x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go Co-authored-by: Michael Katsoulis --- .../pkg/composable/providers/kubernetes/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go index 71bf82ca66a..0e3f0055af1 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go @@ -37,7 +37,7 @@ type serviceData struct { processors []map[string]interface{} } -// NewServiceEventer creates a watcher that can discover and process service objects +// NewServiceEventer creates an eventer that can discover and process service objects func NewServiceEventer( comm composable.DynamicProviderComm, cfg *Config,