Skip to content

Commit

Permalink
Only create watchers if needed
Browse files Browse the repository at this point in the history
Signed-off-by: ChrsMark <[email protected]>
  • Loading branch information
ChrsMark committed May 18, 2023
1 parent d4e5e0c commit 0405168
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 44 deletions.
26 changes: 16 additions & 10 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type pod struct {
func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish func(event []bus.Event)) (Eventer, error) {
logger := logp.NewLogger("autodiscover.pod")

var replicaSetWatcher, jobWatcher kubernetes.Watcher

config := defaultConfig()
err := cfg.Unpack(&config)
if err != nil {
Expand Down Expand Up @@ -117,17 +119,21 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu
// in order to be able to retrieve 2nd layer Owner metadata like in case of:
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
replicaSetWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
if metaConf.Deployment {
replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}
}
jobWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
if metaConf.CronJob {
jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
}
}

metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf)
Expand Down
26 changes: 16 additions & 10 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ func newProcessorConfig(cfg *config.C, register *Register) (kubeAnnotatorConfig,

func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
k.initOnce.Do(func() {
var replicaSetWatcher, jobWatcher kubernetes.Watcher

client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
if kubernetes.IsInCluster(config.KubeConfig) {
Expand Down Expand Up @@ -214,17 +216,21 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
// in order to be able to retrieve 2nd layer Owner metadata like in case of:
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
replicaSetWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
if metaConf.Deployment {
replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}
}
jobWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
if metaConf.CronJob {
jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
}
}

// TODO: refactor the above section to a common function to be used by NeWPodEventer too
Expand Down
57 changes: 33 additions & 24 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,24 @@ func NewResourceMetadataEnricher(
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if resourceName == PodResource {
replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
return &nilEnricher{}
if config.AddResourceMetadata.Deployment {
replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
return &nilEnricher{}
}
}

jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
return &nilEnricher{}
if config.AddResourceMetadata.CronJob {
jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
return &nilEnricher{}
}
}
}

Expand Down Expand Up @@ -283,6 +287,7 @@ func NewContainerMetadataEnricher(
metricsRepo *MetricsRepo,
nodeScope bool) Enricher {

var replicaSetWatcher, jobWatcher kubernetes.Watcher
config, err := GetValidatedConfig(base)
if err != nil {
logp.Info("Kubernetes metricset enriching is disabled")
Expand All @@ -304,19 +309,23 @@ func NewContainerMetadataEnricher(
// in order to be able to retrieve 2nd layer Owner metadata like in case of:
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
replicaSetWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
return &nilEnricher{}
if config.AddResourceMetadata.Deployment {
replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
return &nilEnricher{}
}
}
jobWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
return &nilEnricher{}
if config.AddResourceMetadata.CronJob {
jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
return &nilEnricher{}
}
}

commonMetaConfig := metadata.Config{}
Expand Down

0 comments on commit 0405168

Please sign in to comment.