Skip to content

Commit

Permalink
Add orchestrator cluster ECS fields in k8s events (#31341)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored and chrisberkhout committed Jun 1, 2023
1 parent 6e084eb commit 0ef27e3
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Populate new container ECS fields in Kubernetes module. {pull}30181[30181]
- Populate ecs container fields in Containerd module. {pull}31025[31025]
- Enhance Oracle Module: Change tablespace metricset collection period {issue}30948[30948] {pull}31259[#31259]
- Add orchestrator cluster ECS fields in kubernetes events {pull}31341[31341]

*Packetbeat*

Expand Down
50 changes: 44 additions & 6 deletions metricbeat/module/kubernetes/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"fmt"
"time"

k8sclient "k8s.io/client-go/kubernetes"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/v7/libbeat/common/safemapstr"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/mb"
Expand All @@ -44,6 +47,7 @@ type MetricSet struct {
watchOptions kubernetes.WatchOptions
dedotConfig dedotConfig
skipOlder bool
clusterMeta common.MapStr
}

// dedotConfig defines LabelsDedot and AnnotationsDedot.
Expand Down Expand Up @@ -85,26 +89,51 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
AnnotationsDedot: config.AnnotationsDedot,
}

return &MetricSet{
ms := &MetricSet{
BaseMetricSet: base,
dedotConfig: dedotConfig,
watcher: watcher,
watchOptions: watchOptions,
skipOlder: config.SkipOlder,
}, nil
}

// add ECS orchestrator fields
cfg, _ := common.NewConfigFrom(&config)
ecsClusterMeta, err := getClusterECSMeta(cfg, client, ms.Logger())
if err != nil {
ms.Logger().Debugf("could not retrieve cluster metadata: %w", err)
}
if ecsClusterMeta != nil {
ms.clusterMeta = ecsClusterMeta
}

return ms, nil
}

func getClusterECSMeta(cfg *common.Config, client k8sclient.Interface, logger *logp.Logger) (common.MapStr, error) {
clusterInfo, err := metadata.GetKubernetesClusterIdentifier(cfg, client)
if err != nil {
return nil, fmt.Errorf("fail to get kubernetes cluster metadata: %w", err)
}
ecsClusterMeta := common.MapStr{}
if clusterInfo.Url != "" {
util.ShouldPut(ecsClusterMeta, "orchestrator.cluster.url", clusterInfo.Url, logger)
}
if clusterInfo.Name != "" {
util.ShouldPut(ecsClusterMeta, "orchestrator.cluster.name", clusterInfo.Name, logger)
}
return ecsClusterMeta, nil
}

// Run method provides the Kubernetes event watcher with a reporter with which events can be reported.
func (m *MetricSet) Run(reporter mb.PushReporterV2) {
now := time.Now()
handler := kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mapStrEvent := generateMapStrFromEvent(obj.(*kubernetes.Event), m.dedotConfig, m.Logger())
reporter.Event(mb.TransformMapStrToEvent("kubernetes", mapStrEvent, nil))
m.reportEvent(obj, reporter)
},
UpdateFunc: func(obj interface{}) {
mapStrEvent := generateMapStrFromEvent(obj.(*kubernetes.Event), m.dedotConfig, m.Logger())
reporter.Event(mb.TransformMapStrToEvent("kubernetes", mapStrEvent, nil))
m.reportEvent(obj, reporter)
},
// ignore events that are deleted
DeleteFunc: nil,
Expand Down Expand Up @@ -140,6 +169,15 @@ func (m *MetricSet) Run(reporter mb.PushReporterV2) {
m.watcher.Stop()
}

func (m *MetricSet) reportEvent(obj interface{}, reporter mb.PushReporterV2) {
mapStrEvent := generateMapStrFromEvent(obj.(*kubernetes.Event), m.dedotConfig, m.Logger())
event := mb.TransformMapStrToEvent("kubernetes", mapStrEvent, nil)
if m.clusterMeta != nil {
event.RootFields.DeepUpdate(m.clusterMeta)
}
reporter.Event(event)
}

func generateMapStrFromEvent(eve *kubernetes.Event, dedotConfig dedotConfig, logger *logp.Logger) common.MapStr {
eventMeta := common.MapStr{
"timestamp": common.MapStr{
Expand Down

0 comments on commit 0ef27e3

Please sign in to comment.