Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add k8s metadata in state_cronjob metricset #29572

Merged
merged 6 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add `add_resource_metadata` configuration to Kubernetes module. {pull}29133[29133]
- Add `container.id` and `container.runtime` ECS fields in container metricset. {pull}29560[29560]
- Add `memory.workingset.limit.pct` field in Kubernetes container/pod metricset. {pull}29547[29547]
- Add k8s metadata in state_cronjob metricset. {pull}29572[29572]
- Add `elasticsearch.cluster.id` field to Beat and Kibana modules. {pull}29577[29577]
- Add `elasticsearch.cluster.id` field to Logstash module. {pull}29625[29625]

Expand Down
2 changes: 2 additions & 0 deletions deploy/kubernetes/elastic-agent-managed-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ rules:
- apiGroups: [ "batch" ]
resources:
- jobs
# Uncomment if need metadata for cronjob objects in versions >= v1.21
#- cronjobs
verbs: [ "get", "list", "watch" ]
# required for apiserver
- nonResourceURLs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ rules:
- apiGroups: [ "batch" ]
resources:
- jobs
# Uncomment if need metadata for cronjob objects in versions >= v1.21
#- cronjobs
verbs: [ "get", "list", "watch" ]
# required for apiserver
- nonResourceURLs:
Expand Down
2 changes: 2 additions & 0 deletions deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,8 @@ rules:
- apiGroups: ["batch"]
resources:
- jobs
# Uncomment if need metadata for cronjob objects in versions >= v1.21
#- cronjobs
verbs: ["get", "list", "watch"]
- apiGroups:
- ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ rules:
- apiGroups: ["batch"]
resources:
- jobs
# Uncomment if need metadata for cronjob objects in versions >= v1.21
#- cronjobs
verbs: ["get", "list", "watch"]
- apiGroups:
- ""
Expand Down
2 changes: 2 additions & 0 deletions deploy/kubernetes/metricbeat-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ rules:
- apiGroups: ["batch"]
resources:
- jobs
# Uncomment if need metadata for cronjob objects in versions >= v1.21
#- cronjobs
verbs: ["get", "list", "watch"]
- apiGroups:
- ""
Expand Down
2 changes: 2 additions & 0 deletions deploy/kubernetes/metricbeat/metricbeat-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ rules:
- apiGroups: ["batch"]
resources:
- jobs
# Uncomment if need metadata for cronjob objects in versions >= v1.21
#- cronjobs
verbs: ["get", "list", "watch"]
- apiGroups:
- ""
Expand Down
12 changes: 12 additions & 0 deletions libbeat/common/kubernetes/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio
}

objType = "service"
case *CronJob:
cronjob := client.BatchV1().CronJobs(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return cronjob.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return cronjob.Watch(ctx, options)
},
}

objType = "cronjob"
case *Job:
job := client.BatchV1().Jobs(opts.Namespace)
listwatch = &cache.ListWatch{
Expand Down
1 change: 1 addition & 0 deletions libbeat/common/kubernetes/metadata/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (p *pod) GenerateK8s(obj kubernetes.Resource, opts ...FieldOptions) common.
out := p.resource.GenerateK8s("pod", obj, opts...)

// check if Pod is handled by a ReplicaSet which is controlled by a Deployment
// TODO: same happens with CronJob vs Job. The hierarcy there is CronJob->Job->Pod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a ticket for that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if p.addResourceMetadata.Deployment {
rsName, _ := out.GetValue("replicaset.name")
if rsName, ok := rsName.(string); ok {
Expand Down
3 changes: 2 additions & 1 deletion libbeat/common/kubernetes/metadata/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ func (r *Resource) GenerateK8s(kind string, obj kubernetes.Resource, options ...
"ReplicaSet",
"StatefulSet",
"DaemonSet",
"Job":
"Job",
Copy link
Contributor

@tetianakravchenko tetianakravchenko Dec 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this change we also can close this PR: #28954 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tetianakravchenko my PR is a little bit tricky cause has some tricky parts around k8s api versions. I think we can move on with the community PR and I can rebase mine on top of it. wdyt?

"CronJob":
safemapstr.Put(meta, strings.ToLower(ref.Kind)+".name", ref.Name)
}
}
Expand Down
3 changes: 3 additions & 0 deletions libbeat/common/kubernetes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type Service = v1.Service
// Job data
type Job = batchv1.Job

// CronJob data
type CronJob = batchv1.CronJob

const (
// PodPending phase
PodPending = v1.PodPending
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
This is the `state_cronjob` metricset of the Kubernetes module.

This metricset does not add metadata by default and hence in order to
add metadata for this one need to configure the metricset with `add_metadata: true`
and uncomment the proper `apiGroup` in the `ClusterRole`. Metadata are only available
for versions of k8s >= v1.21.
66 changes: 44 additions & 22 deletions metricbeat/module/kubernetes/state_cronjob/state_cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package state_cronjob
import (
"fmt"

"github.com/pkg/errors"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"

"github.com/elastic/beats/v7/libbeat/common"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
Expand All @@ -44,6 +44,7 @@ type CronJobMetricSet struct {
prometheus p.Prometheus
mapping *p.MetricsMapping
mod k8smod.Module
enricher util.Enricher
}

// NewCronJobMetricSet returns a prometheus based metricset for CronJobs
Expand All @@ -58,7 +59,12 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, fmt.Errorf("must be child of kubernetes module")
}

return &CronJobMetricSet{
config := util.GetDefaultDisabledMetaConfig()
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, fmt.Errorf("error loading config of kubernetes module")
}

ms := CronJobMetricSet{
BaseMetricSet: base,
prometheus: prometheus,
mod: mod,
Expand All @@ -79,42 +85,58 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
"concurrency_policy": p.KeyLabel("concurrency"),
},
},
}, nil
}
if config.AddMetadata {
ms.enricher = util.NewResourceMetadataEnricher(
base, &kubernetes.CronJob{}, false)
}
return &ms, nil
}

// Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as
// module rooted fields at the event that gets reported
//
// Copied from other kube state metrics.
func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) error {
func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) {
if m.enricher != nil {
m.enricher.Start()
}

families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error getting family metrics")
m.Logger().Error(err)
reporter.Error(err)
return
}
events, err := m.prometheus.ProcessMetrics(families, m.mapping)
if err != nil {
return errors.Wrap(err, "error getting metrics")
m.Logger().Error(err)
reporter.Error(err)
return
}

if m.enricher != nil {
m.enricher.Enrich(events)
}
for _, event := range events {
var moduleFieldsMapStr common.MapStr
moduleFields, ok := event[mb.ModuleDataKey]
if ok {
moduleFieldsMapStr, ok = moduleFields.(common.MapStr)
if !ok {
m.Logger().Errorf("error trying to convert '%s' from event to common.MapStr", mb.ModuleDataKey)
}
e, err := util.CreateEvent(event, "kubernetes.cronjob")
if err != nil {
m.Logger().Error(err)
}
delete(event, mb.ModuleDataKey)

if reported := reporter.Event(mb.Event{
MetricSetFields: event,
ModuleFields: moduleFieldsMapStr,
Namespace: "kubernetes.cronjob",
}); !reported {
return nil

if reported := reporter.Event(e); !reported {
m.Logger().Debug("error trying to emit event")
return
}
}

return
}

// Close stops this metricset
func (m *CronJobMetricSet) Close() error {
if m.enricher != nil {
m.enricher.Stop()
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

func TestFetchMetricset(t *testing.T) {
config := test.GetKubeStateMetricsConfig(t, "state_cronjob")
config := test.GetKubeStateMetricsConfigWithMetaDisabled(t, "state_cronjob")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why for cronjob only add_metadata must be disabled? as I see for other k8s resources used default value - true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cause in version before v1.21 the the current stable api is not available and hence we cannot support metadata in tests for v1.20

metricSet := mbtest.NewFetcher(t, config)
events, errs := metricSet.FetchEvents()
if len(errs) > 0 {
Expand Down
12 changes: 12 additions & 0 deletions metricbeat/module/kubernetes/test/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ func GetKubeStateMetricsConfig(t *testing.T, metricSetName string) map[string]in
}
}

// GetKubeStateMetricsConfigWithMetaDisabled function returns configuration for talking to kube-state-metrics.
func GetKubeStateMetricsConfigWithMetaDisabled(t *testing.T, metricSetName string) map[string]interface{} {
t.Helper()
return map[string]interface{}{
"module": "kubernetes",
"metricsets": []string{metricSetName},
"host": "${NODE_NAME}",
"hosts": []string{"kube-state-metrics:8080"},
"add_metadata": false,
}
}

// GetKubeletConfig function returns configuration for talking to Kubelet API.
func GetKubeletConfig(t *testing.T, metricSetName string) map[string]interface{} {
t.Helper()
Expand Down
70 changes: 40 additions & 30 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ type kubernetesConfig struct {

type enricher struct {
sync.RWMutex
metadata map[string]common.MapStr
index func(common.MapStr) string
watcher kubernetes.Watcher
watcherStarted bool
watcherStartedLock sync.Mutex
namespaceWatcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
isPod bool
metadata map[string]common.MapStr
index func(common.MapStr) string
watcher kubernetes.Watcher
watchersStarted bool
watchersStartedLock sync.Mutex
namespaceWatcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
isPod bool
}

const selector = "kubernetes"
Expand Down Expand Up @@ -134,6 +134,8 @@ func NewResourceMetadataEnricher(
m[id] = metaGen.Generate("deployment", r)
case *kubernetes.Job:
m[id] = metaGen.Generate("job", r)
case *kubernetes.CronJob:
m[id] = metaGen.Generate("cronjob", r)
case *kubernetes.Service:
m[id] = serviceMetaGen.Generate(r)
case *kubernetes.StatefulSet:
Expand Down Expand Up @@ -304,6 +306,12 @@ func getResourceMetadataWatchers(config *kubernetesConfig, resource kubernetes.R
return watcher, nodeWatcher, namespaceWatcher
}

func GetDefaultDisabledMetaConfig() *kubernetesConfig {
return &kubernetesConfig{
AddMetadata: false,
}
}

func validatedConfig(base mb.BaseMetricSet) *kubernetesConfig {
config := kubernetesConfig{
AddMetadata: true,
Expand Down Expand Up @@ -373,42 +381,44 @@ func buildMetadataEnricher(
}

func (m *enricher) Start() {
m.watcherStartedLock.Lock()
defer m.watcherStartedLock.Unlock()
if m.nodeWatcher != nil {
if err := m.nodeWatcher.Start(); err != nil {
logp.Warn("Error starting node watcher: %s", err)
m.watchersStartedLock.Lock()
defer m.watchersStartedLock.Unlock()
if !m.watchersStarted {
if m.nodeWatcher != nil {
if err := m.nodeWatcher.Start(); err != nil {
logp.Warn("Error starting node watcher: %s", err)
}
}
}

if m.namespaceWatcher != nil {
if err := m.namespaceWatcher.Start(); err != nil {
logp.Warn("Error starting namespace watcher: %s", err)
if m.namespaceWatcher != nil {
if err := m.namespaceWatcher.Start(); err != nil {
logp.Warn("Error starting namespace watcher: %s", err)
}
}
}

if !m.watcherStarted {
err := m.watcher.Start()
if err != nil {
logp.Warn("Error starting Kubernetes watcher: %s", err)
}
m.watcherStarted = true
m.watchersStarted = true
}
}

func (m *enricher) Stop() {
m.watcherStartedLock.Lock()
defer m.watcherStartedLock.Unlock()
if m.watcherStarted {
m.watchersStartedLock.Lock()
defer m.watchersStartedLock.Unlock()
if m.watchersStarted {
m.watcher.Stop()
m.watcherStarted = false
}
if m.namespaceWatcher != nil {
m.namespaceWatcher.Stop()
}

if m.nodeWatcher != nil {
m.nodeWatcher.Stop()
if m.namespaceWatcher != nil {
m.namespaceWatcher.Stop()
}

if m.nodeWatcher != nil {
m.nodeWatcher.Stop()
}

m.watchersStarted = false
}
}

Expand Down