From 0405168f7e1f66396dadd2dce298744bbe3ce4d9 Mon Sep 17 00:00:00 2001 From: ChrsMark Date: Thu, 18 May 2023 16:27:55 +0300 Subject: [PATCH] Only create watchers if needed Signed-off-by: ChrsMark --- .../autodiscover/providers/kubernetes/pod.go | 26 +++++---- .../add_kubernetes_metadata/kubernetes.go | 26 +++++---- .../module/kubernetes/util/kubernetes.go | 57 +++++++++++-------- 3 files changed, 65 insertions(+), 44 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 599d07ff521a..1a53dba4c042 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -60,6 +60,8 @@ type pod struct { func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish func(event []bus.Event)) (Eventer, error) { logger := logp.NewLogger("autodiscover.pod") + var replicaSetWatcher, jobWatcher kubernetes.Watcher + config := defaultConfig() err := cfg.Unpack(&config) if err != nil { @@ -117,17 +119,21 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu // in order to be able to retrieve 2nd layer Owner metadata like in case of: // Deployment -> Replicaset -> Pod // CronJob -> job -> Pod - replicaSetWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + if metaConf.Deployment { + replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + } } - jobWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err) + if metaConf.CronJob { + jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err) + } } metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf) diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 4480733adc19..f8099a2a8b28 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -142,6 +142,8 @@ func newProcessorConfig(cfg *config.C, register *Register) (kubeAnnotatorConfig, func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) { k.initOnce.Do(func() { + var replicaSetWatcher, jobWatcher kubernetes.Watcher + client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions) if err != nil { if kubernetes.IsInCluster(config.KubeConfig) { @@ -214,17 +216,21 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) { // in order to be able to retrieve 2nd layer Owner metadata like in case of: // Deployment -> Replicaset -> Pod // CronJob -> job -> Pod - replicaSetWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + if metaConf.Deployment { + replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + } } - jobWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err) + if metaConf.CronJob { + jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err) + } } // TODO: refactor the above section to a common function to be used by NeWPodEventer too diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 34b399f02875..d0a0b9a620c9 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -171,20 +171,24 @@ func NewResourceMetadataEnricher( // Deployment -> Replicaset -> Pod // CronJob -> job -> Pod if resourceName == PodResource { - replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err) - return &nilEnricher{} + if config.AddResourceMetadata.Deployment { + replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err) + return &nilEnricher{} + } } - jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err) - return &nilEnricher{} + if config.AddResourceMetadata.CronJob { + jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err) + return &nilEnricher{} + } } } @@ -283,6 +287,7 @@ func NewContainerMetadataEnricher( metricsRepo *MetricsRepo, nodeScope bool) Enricher { + var replicaSetWatcher, jobWatcher kubernetes.Watcher config, err := GetValidatedConfig(base) if err != nil { logp.Info("Kubernetes metricset enriching is disabled") @@ -304,19 +309,23 @@ func NewContainerMetadataEnricher( // in order to be able to retrieve 2nd layer Owner metadata like in case of: // Deployment -> Replicaset -> Pod // CronJob -> job -> Pod - replicaSetWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err) - return &nilEnricher{} + if config.AddResourceMetadata.Deployment { + replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + return &nilEnricher{} + } } - jobWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err) - return &nilEnricher{} + if config.AddResourceMetadata.CronJob { + jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err) + return &nilEnricher{} + } } commonMetaConfig := metadata.Config{}