Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port IncludeAnnotations setting to Agent and small manifest fix #28247

Merged
merged 6 commits into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ data:
# maxconn: 10
# network: tcp
# period: 10s
# condition: ${kubernetes.pod.labels.app} == 'redis'
# condition: ${kubernetes.labels.app} == 'redis'
---
apiVersion: apps/v1
kind: DaemonSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,4 +441,4 @@ data:
# maxconn: 10
# network: tcp
# period: 10s
# condition: ${kubernetes.pod.labels.app} == 'redis'
# condition: ${kubernetes.labels.app} == 'redis'
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Config struct {
AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"`
IncludeLabels []string `config:"include_labels"`
ExcludeLabels []string `config:"exclude_labels"`
IncludeAnnotations []string `config:"include_annotations"`

LabelsDedot bool `config:"labels.dedot"`
AnnotationsDedot bool `config:"annotations.dedot"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,42 +110,50 @@ func (p *dynamicProvider) watchResource(
p.config.Node = ""
}

watcher, err := p.newWatcher(resourceType, comm, client)
eventer, err := p.newEventer(resourceType, comm, client)
if err != nil {
return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType)
}

err = watcher.Start()
err = eventer.Start()
if err != nil {
return errors.New(err, "couldn't start kubernetes watcher for resource %s", resourceType)
return errors.New(err, "couldn't start kubernetes eventer for resource %s", resourceType)
}

return nil
}

// newWatcher initializes the proper watcher according to the given resource (pod, node, service).
func (p *dynamicProvider) newWatcher(
// Eventer allows defining ways in which kubernetes resource events are observed and processed
type Eventer interface {
kubernetes.ResourceEventHandler
Start() error
Stop()
}

// newEventer initializes the proper eventer according to the given resource (pod, node, service).
func (p *dynamicProvider) newEventer(
resourceType string,
comm composable.DynamicProviderComm,
client k8s.Interface) (kubernetes.Watcher, error) {
client k8s.Interface) (Eventer, error) {
switch resourceType {
case "pod":
watcher, err := NewPodWatcher(comm, p.config, p.logger, client, p.config.Scope)
eventer, err := NewPodEventer(comm, p.config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}
return watcher, nil
return eventer, nil
case "node":
watcher, err := NewNodeWatcher(comm, p.config, p.logger, client, p.config.Scope)
eventer, err := NewNodeEventer(comm, p.config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}
return watcher, nil
return eventer, nil
case "service":
watcher, err := NewServiceWatcher(comm, p.config, p.logger, client, p.config.Scope)
eventer, err := NewServiceEventer(comm, p.config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}
return watcher, nil
return eventer, nil
default:
return nil, fmt.Errorf("unsupported autodiscover resource %s", resourceType)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type node struct {
scope string
config *Config
metagen metadata.MetaGen
watcher kubernetes.Watcher
}

type nodeData struct {
Expand All @@ -35,13 +36,13 @@ type nodeData struct {
processors []map[string]interface{}
}

// NewNodeWatcher creates a watcher that can discover and process node objects
func NewNodeWatcher(
// NewNodeEventer creates an eventer that can discover and process node objects
func NewNodeEventer(
comm composable.DynamicProviderComm,
cfg *Config,
logger *logp.Logger,
client k8s.Interface,
scope string) (kubernetes.Watcher, error) {
scope string) (Eventer, error) {
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{
SyncTimeout: cfg.SyncPeriod,
Node: cfg.Node,
Expand All @@ -57,15 +58,17 @@ func NewNodeWatcher(
return nil, errors.New(err, "failed to unpack configuration")
}
metaGen := metadata.NewNodeMetadataGenerator(rawConfig, watcher.Store(), client)
watcher.AddEventHandler(&node{
n := &node{
logger,
cfg.CleanupTimeout,
comm,
scope,
cfg,
metaGen})
metaGen,
watcher}
watcher.AddEventHandler(n)

return watcher, nil
return n, nil
}

func (n *node) emitRunning(node *kubernetes.Node) {
Expand All @@ -83,6 +86,16 @@ func (n *node) emitStopped(node *kubernetes.Node) {
n.comm.Remove(string(node.GetUID()))
}

// Start starts the eventer
func (n *node) Start() error {
return n.watcher.Start()
}

// Stop stops the eventer
func (n *node) Stop() {
n.watcher.Stop()
}

// OnAdd ensures processing of node objects that are newly created
func (n *node) OnAdd(obj interface{}) {
n.logger.Debugf("Watcher Node add: %+v", obj)
Expand Down
47 changes: 41 additions & 6 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type pod struct {
scope string
config *Config
metagen metadata.MetaGen
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
namespaceWatcher kubernetes.Watcher

// Mutex used by configuration updates not triggered by the main watcher,
Expand Down Expand Up @@ -65,13 +67,13 @@ type namespacePodUpdater struct {
locker sync.Locker
}

// NewPodWatcher creates a watcher that can discover and process pod objects
func NewPodWatcher(
// NewPodEventer creates an eventer that can discover and process pod objects
func NewPodEventer(
comm composable.DynamicProviderComm,
cfg *Config,
logger *logp.Logger,
client k8s.Interface,
scope string) (kubernetes.Watcher, error) {
scope string) (Eventer, error) {
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: cfg.SyncPeriod,
Node: cfg.Node,
Expand Down Expand Up @@ -107,24 +109,57 @@ func NewPodWatcher(
}
metaGen := metadata.GetPodMetaGen(rawConfig, watcher, nodeWatcher, namespaceWatcher, metaConf)

p := pod{
p := &pod{
logger: logger,
cleanupTimeout: cfg.CleanupTimeout,
comm: comm,
scope: scope,
config: cfg,
metagen: metaGen,
watcher: watcher,
nodeWatcher: nodeWatcher,
namespaceWatcher: namespaceWatcher,
}

watcher.AddEventHandler(&p)
watcher.AddEventHandler(p)

if namespaceWatcher != nil && metaConf.Namespace.Enabled() {
updater := newNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate)
namespaceWatcher.AddEventHandler(updater)
}

return watcher, nil
return p, nil
}

// Start starts the eventer
func (p *pod) Start() error {
if p.nodeWatcher != nil {
err := p.nodeWatcher.Start()
if err != nil {
return err
}
}

if p.namespaceWatcher != nil {
if err := p.namespaceWatcher.Start(); err != nil {
return err
}
}

return p.watcher.Start()
}

// Stop stops the eventer
func (p *pod) Stop() {
p.watcher.Stop()

if p.namespaceWatcher != nil {
p.namespaceWatcher.Stop()
}

if p.nodeWatcher != nil {
p.nodeWatcher.Stop()
}
}

func (p *pod) emitRunning(pod *kubernetes.Pod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type service struct {
scope string
config *Config
metagen metadata.MetaGen
watcher kubernetes.Watcher
namespaceWatcher kubernetes.Watcher
}

Expand All @@ -36,13 +37,13 @@ type serviceData struct {
processors []map[string]interface{}
}

// NewServiceWatcher creates a watcher that can discover and process service objects
func NewServiceWatcher(
// NewServiceEventer creates an eventer that can discover and process service objects
func NewServiceEventer(
comm composable.DynamicProviderComm,
cfg *Config,
logger *logp.Logger,
client k8s.Interface,
scope string) (kubernetes.Watcher, error) {
scope string) (Eventer, error) {
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Service{}, kubernetes.WatchOptions{
SyncTimeout: cfg.SyncPeriod,
Node: cfg.Node,
Expand All @@ -68,17 +69,38 @@ func NewServiceWatcher(
}

metaGen := metadata.NewServiceMetadataGenerator(rawConfig, watcher.Store(), namespaceMeta, client)
watcher.AddEventHandler(&service{
s := &service{
logger,
cfg.CleanupTimeout,
comm,
scope,
cfg,
metaGen,
watcher,
namespaceWatcher,
})
}
watcher.AddEventHandler(s)

return s, nil
}

return watcher, nil
// Start starts the eventer
func (s *service) Start() error {
if s.namespaceWatcher != nil {
if err := s.namespaceWatcher.Start(); err != nil {
return err
}
}
return s.watcher.Start()
}

// Stop stops the eventer
func (s *service) Stop() {
s.watcher.Stop()

if s.namespaceWatcher != nil {
s.namespaceWatcher.Stop()
}
}

func (s *service) emitRunning(service *kubernetes.Service) {
Expand Down