Skip to content

Commit

Permalink
moved ShouldPut and ShouldDelete to kubernetes/util/kubernetes.go
Browse files Browse the repository at this point in the history
  • Loading branch information
gsantoro committed Aug 15, 2022
1 parent 4f31d50 commit f5cb102
Show file tree
Hide file tree
Showing 14 changed files with 62 additions and 66 deletions.
14 changes: 0 additions & 14 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,17 +354,3 @@ func (p *leaderElectionManager) startLeaderElector(ctx context.Context, lec lead
p.logger.Debugf("Starting Leader Elector")
go le.Run(ctx)
}

func ShouldPut(event mapstr.M, field string, value interface{}, logger *logp.Logger) {
_, err := event.Put(field, value)
if err != nil {
logger.Debugf("Failed to put field '%s' with value '%s': %s", field, value, err)
}
}

func ShouldDelete(event mapstr.M, field string, logger *logp.Logger) {
err := event.Delete(field)
if err != nil {
logger.Debugf("Failed to delete field '%s': %s", field, err)
}
}
3 changes: 2 additions & 1 deletion libbeat/autodiscover/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"time"

"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-autodiscover/utils"

"github.com/gofrs/uuid"
Expand Down Expand Up @@ -197,7 +198,7 @@ func (n *node) emit(node *kubernetes.Node, flag string) {
// Pass annotations to all events so that it can be used in templating and by annotation builders.
annotations := mapstr.M{}
for k, v := range node.GetObjectMeta().GetAnnotations() {
ShouldPut(annotations, k, v, n.logger)
util.ShouldPut(annotations, k, v, n.logger)
}
kubemeta["annotations"] = annotations
event := bus.Event{
Expand Down
3 changes: 2 additions & 1 deletion libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/gofrs/uuid"
k8s "k8s.io/client-go/kubernetes"

"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-autodiscover/bus"
"github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata"
Expand Down Expand Up @@ -359,7 +360,7 @@ func (p *pod) containerPodEvents(flag string, pod *kubernetes.Pod, c *kubernetes
var events []bus.Event
portsMap := mapstr.M{}

ShouldPut(meta, "container", cmeta, p.logger)
util.ShouldPut(meta, "container", cmeta, p.logger)

for _, port := range ports {
event := bus.Event{
Expand Down
5 changes: 3 additions & 2 deletions libbeat/autodiscover/providers/kubernetes/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"time"

"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-autodiscover/utils"

"github.com/gofrs/uuid"
Expand Down Expand Up @@ -204,7 +205,7 @@ func (s *service) emit(svc *kubernetes.Service, flag string) {
// Pass annotations to all events so that it can be used in templating and by annotation builders.
annotations := mapstr.M{}
for k, v := range svc.GetObjectMeta().GetAnnotations() {
ShouldPut(annotations, k, v, s.logger)
util.ShouldPut(annotations, k, v, s.logger)
}
kubemeta["annotations"] = annotations

Expand All @@ -214,7 +215,7 @@ func (s *service) emit(svc *kubernetes.Service, flag string) {
nsAnns := mapstr.M{}

for k, v := range namespace.GetAnnotations() {
ShouldPut(nsAnns, k, v, s.logger)
util.ShouldPut(nsAnns, k, v, s.logger)
}
kubemeta["namespace_annotations"] = nsAnns
}
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/docker/container/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (

"github.com/docker/docker/api/types"

"github.com/elastic/beats/v7/libbeat/autodiscover/providers/kubernetes"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-autodiscover/docker"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -63,7 +63,7 @@ func eventMapping(r mb.ReporterV2, cont *types.Container, dedot bool, logger *lo
labels := docker.DeDotLabels(cont.Labels, dedot)

if len(labels) > 0 {
kubernetes.ShouldPut(event, "docker.container.labels", labels, logger)
util.ShouldPut(event, "docker.container.labels", labels, logger)
}

r.Event(mb.Event{
Expand Down
19 changes: 9 additions & 10 deletions metricbeat/module/kubernetes/container/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/json"
"fmt"

kubernetes2 "github.com/elastic/beats/v7/libbeat/autodiscover/providers/kubernetes"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
Expand Down Expand Up @@ -130,15 +129,15 @@ func eventMapping(content []byte, metricsRepo *util.MetricsRepo, logger *logp.Lo
}

if container.StartTime != "" {
kubernetes2.ShouldPut(containerEvent, "start_time", container.StartTime, logger)
util.ShouldPut(containerEvent, "start_time", container.StartTime, logger)
}

if nodeCores > 0 {
kubernetes2.ShouldPut(containerEvent, "cpu.usage.node.pct", float64(container.CPU.UsageNanoCores)/1e9/nodeCores, logger)
util.ShouldPut(containerEvent, "cpu.usage.node.pct", float64(container.CPU.UsageNanoCores)/1e9/nodeCores, logger)
}

if nodeMem > 0 {
kubernetes2.ShouldPut(containerEvent, "memory.usage.node.pct", float64(container.Memory.UsageBytes)/nodeMem, logger)
util.ShouldPut(containerEvent, "memory.usage.node.pct", float64(container.Memory.UsageBytes)/nodeMem, logger)
}

containerStore := podStore.GetContainerStore(container.Name)
Expand All @@ -160,12 +159,12 @@ func eventMapping(content []byte, metricsRepo *util.MetricsRepo, logger *logp.Lo
// the container limits can be greater than the node limits. We assume here the user can set correct limits on containers.

if containerCoresLimit > 0 {
kubernetes2.ShouldPut(containerEvent, "cpu.usage.limit.pct", float64(container.CPU.UsageNanoCores)/1e9/containerCoresLimit, logger)
util.ShouldPut(containerEvent, "cpu.usage.limit.pct", float64(container.CPU.UsageNanoCores)/1e9/containerCoresLimit, logger)
}

if containerMemLimit > 0 {
kubernetes2.ShouldPut(containerEvent, "memory.usage.limit.pct", float64(container.Memory.UsageBytes)/containerMemLimit, logger)
kubernetes2.ShouldPut(containerEvent, "memory.workingset.limit.pct", float64(container.Memory.WorkingSetBytes)/containerMemLimit, logger)
util.ShouldPut(containerEvent, "memory.usage.limit.pct", float64(container.Memory.UsageBytes)/containerMemLimit, logger)
util.ShouldPut(containerEvent, "memory.workingset.limit.pct", float64(container.Memory.WorkingSetBytes)/containerMemLimit, logger)
}

events = append(events, containerEvent)
Expand All @@ -182,17 +181,17 @@ func ecsfields(containerEvent mapstr.M, logger *logp.Logger) mapstr.M {

name, err := containerEvent.GetValue("name")
if err == nil {
kubernetes2.ShouldPut(ecsfields, "name", name, logger)
util.ShouldPut(ecsfields, "name", name, logger)
}

cpuUsage, err := containerEvent.GetValue("cpu.usage.node.pct")
if err == nil {
kubernetes2.ShouldPut(ecsfields, "cpu.usage", cpuUsage, logger)
util.ShouldPut(ecsfields, "cpu.usage", cpuUsage, logger)
}

memUsage, err := containerEvent.GetValue("memory.usage.node.pct")
if err == nil {
kubernetes2.ShouldPut(ecsfields, "memory.usage", memUsage, logger)
util.ShouldPut(ecsfields, "memory.usage", memUsage, logger)
}

return ecsfields
Expand Down
5 changes: 2 additions & 3 deletions metricbeat/module/kubernetes/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"time"

kubernetes2 "github.com/elastic/beats/v7/libbeat/autodiscover/providers/kubernetes"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
Expand Down Expand Up @@ -181,7 +180,7 @@ func generateMapStrFromEvent(eve *kubernetes.Event, dedotConfig dedotConfig, log
for k, v := range eve.ObjectMeta.Labels {
if dedotConfig.LabelsDedot {
label := common.DeDot(k)
kubernetes2.ShouldPut(labels, label, v, logger)
util.ShouldPut(labels, label, v, logger)

} else {
err := safemapstr.Put(labels, k, v)
Expand All @@ -199,7 +198,7 @@ func generateMapStrFromEvent(eve *kubernetes.Event, dedotConfig dedotConfig, log
for k, v := range eve.ObjectMeta.Annotations {
if dedotConfig.AnnotationsDedot {
annotation := common.DeDot(k)
kubernetes2.ShouldPut(annotations, annotation, v, logger)
util.ShouldPut(annotations, annotation, v, logger)
} else {
err := safemapstr.Put(annotations, k, v)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/kubernetes/node/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"encoding/json"
"fmt"

kubernetes2 "github.com/elastic/beats/v7/libbeat/autodiscover/providers/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -108,7 +108,7 @@ func eventMapping(content []byte, logger *logp.Logger) (mapstr.M, error) {
}

if node.StartTime != "" {
kubernetes2.ShouldPut(nodeEvent, "start_time", node.StartTime, logger)
util.ShouldPut(nodeEvent, "start_time", node.StartTime, logger)
}

return nodeEvent, nil
Expand Down
23 changes: 11 additions & 12 deletions metricbeat/module/kubernetes/pod/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/json"
"fmt"

kubernetes2 "github.com/elastic/beats/v7/libbeat/autodiscover/providers/kubernetes"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
Expand Down Expand Up @@ -129,7 +128,7 @@ func eventMapping(content []byte, metricsRepo *util.MetricsRepo, logger *logp.Lo
}

if pod.StartTime != "" {
kubernetes2.ShouldPut(podEvent, "start_time", pod.StartTime, logger)
util.ShouldPut(podEvent, "start_time", pod.StartTime, logger)
}

// NOTE:
Expand Down Expand Up @@ -159,31 +158,31 @@ func eventMapping(content []byte, metricsRepo *util.MetricsRepo, logger *logp.Lo
}

if nodeCores > 0 {
kubernetes2.ShouldPut(podEvent, "cpu.usage.node.pct", float64(usageNanoCores)/1e9/nodeCores, logger)
util.ShouldPut(podEvent, "cpu.usage.node.pct", float64(usageNanoCores)/1e9/nodeCores, logger)
}

if podCoreLimit > 0 {
kubernetes2.ShouldPut(podEvent, "cpu.usage.limit.pct", float64(usageNanoCores)/1e9/podCoreLimit, logger)
util.ShouldPut(podEvent, "cpu.usage.limit.pct", float64(usageNanoCores)/1e9/podCoreLimit, logger)
}

if usageMem > 0 {
if nodeMem > 0 {
kubernetes2.ShouldPut(podEvent, "memory.usage.node.pct", float64(usageMem)/nodeMem, logger)
util.ShouldPut(podEvent, "memory.usage.node.pct", float64(usageMem)/nodeMem, logger)
}
if podMemLimit > 0 {
kubernetes2.ShouldPut(podEvent, "memory.usage.limit.pct", float64(usageMem)/podMemLimit, logger)
kubernetes2.ShouldPut(podEvent, "memory.working_set.limit.pct", float64(workingSet)/podMemLimit, logger)
util.ShouldPut(podEvent, "memory.usage.limit.pct", float64(usageMem)/podMemLimit, logger)
util.ShouldPut(podEvent, "memory.working_set.limit.pct", float64(workingSet)/podMemLimit, logger)
}
}

if workingSet > 0 && usageMem == 0 {
if nodeMem > 0 {
kubernetes2.ShouldPut(podEvent, "memory.usage.node.pct", float64(workingSet)/nodeMem, logger)
util.ShouldPut(podEvent, "memory.usage.node.pct", float64(workingSet)/nodeMem, logger)
}
if podMemLimit > 0 {
kubernetes2.ShouldPut(podEvent, "memory.usage.limit.pct", float64(workingSet)/podMemLimit, logger)
util.ShouldPut(podEvent, "memory.usage.limit.pct", float64(workingSet)/podMemLimit, logger)

kubernetes2.ShouldPut(podEvent, "memory.working_set.limit.pct", float64(workingSet)/podMemLimit, logger)
util.ShouldPut(podEvent, "memory.working_set.limit.pct", float64(workingSet)/podMemLimit, logger)
}
}

Expand All @@ -198,12 +197,12 @@ func ecsfields(podEvent mapstr.M, logger *logp.Logger) mapstr.M {

egressBytes, err := podEvent.GetValue("network.tx.bytes")
if err == nil {
kubernetes2.ShouldPut(ecsfields, "network.egress.bytes", egressBytes, logger)
util.ShouldPut(ecsfields, "network.egress.bytes", egressBytes, logger)
}

ingressBytes, err := podEvent.GetValue("network.rx.bytes")
if err == nil {
kubernetes2.ShouldPut(ecsfields, "network.ingress.bytes", ingressBytes, logger)
util.ShouldPut(ecsfields, "network.ingress.bytes", ingressBytes, logger)
}

return ecsfields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"strings"

"github.com/elastic/beats/v7/libbeat/autodiscover/providers/kubernetes"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
Expand Down Expand Up @@ -154,9 +153,9 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
}
split := strings.Index(cID, "://")
if split != -1 {
kubernetes.ShouldPut(containerFields, "runtime", cID[:split], m.Logger())
util.ShouldPut(containerFields, "runtime", cID[:split], m.Logger())

kubernetes.ShouldPut(containerFields, "id", cID[split+3:], m.Logger())
util.ShouldPut(containerFields, "id", cID[split+3:], m.Logger())
}
}
if containerImage, ok := event["image"]; ok {
Expand All @@ -165,9 +164,9 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.Logger().Debugf("Error while casting containerImage: %s", ok)
}

kubernetes.ShouldPut(containerFields, "image.name", cImage, m.Logger())
util.ShouldPut(containerFields, "image.name", cImage, m.Logger())
// remove kubernetes.container.image field as value is the same as ECS container.image.name field
kubernetes.ShouldDelete(event, "image", m.Logger())
util.ShouldDelete(event, "image", m.Logger())
}

e, err := util.CreateEvent(event, "kubernetes.container")
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/kubernetes/system/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"encoding/json"
"fmt"

kubernetes2 "github.com/elastic/beats/v7/libbeat/autodiscover/providers/kubernetes"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -71,7 +71,7 @@ func eventMapping(content []byte, logger *logp.Logger) ([]mapstr.M, error) {
}

if syscontainer.StartTime != "" {
kubernetes2.ShouldPut(containerEvent, "start_time", syscontainer.StartTime, logger)
util.ShouldPut(containerEvent, "start_time", syscontainer.StartTime, logger)
}

events = append(events, containerEvent)
Expand Down
23 changes: 18 additions & 5 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"

kubernetes2 "github.com/elastic/beats/v7/libbeat/autodiscover/providers/kubernetes"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata"
Expand Down Expand Up @@ -260,9 +259,9 @@ func NewContainerMetadataEnricher(
// which is in the form of <container.runtime>://<container.id>
split := strings.Index(s.ContainerID, "://")
if split != -1 {
kubernetes2.ShouldPut(meta, "container.id", s.ContainerID[split+3:], base.Logger())
ShouldPut(meta, "container.id", s.ContainerID[split+3:], base.Logger())

kubernetes2.ShouldPut(meta, "container.runtime", s.ContainerID[:split], base.Logger())
ShouldPut(meta, "container.runtime", s.ContainerID[:split], base.Logger())
}
}

Expand Down Expand Up @@ -567,10 +566,24 @@ func GetClusterECSMeta(cfg *conf.C, client k8sclient.Interface, logger *logp.Log
}
ecsClusterMeta := mapstr.M{}
if clusterInfo.URL != "" {
kubernetes2.ShouldPut(ecsClusterMeta, "orchestrator.cluster.url", clusterInfo.URL, logger)
ShouldPut(ecsClusterMeta, "orchestrator.cluster.url", clusterInfo.URL, logger)
}
if clusterInfo.Name != "" {
kubernetes2.ShouldPut(ecsClusterMeta, "orchestrator.cluster.name", clusterInfo.Name, logger)
ShouldPut(ecsClusterMeta, "orchestrator.cluster.name", clusterInfo.Name, logger)
}
return ecsClusterMeta, nil
}

func ShouldPut(event mapstr.M, field string, value interface{}, logger *logp.Logger) {
_, err := event.Put(field, value)
if err != nil {
logger.Debugf("Failed to put field '%s' with value '%s': %s", field, value, err)
}
}

func ShouldDelete(event mapstr.M, field string, logger *logp.Logger) {
err := event.Delete(field)
if err != nil {
logger.Debugf("Failed to delete field '%s': %s", field, err)
}
}
6 changes: 2 additions & 4 deletions metricbeat/module/kubernetes/util/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
"github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

kubernetes2 "github.com/elastic/beats/v7/libbeat/autodiscover/providers/kubernetes"
)

var (
Expand Down Expand Up @@ -135,9 +133,9 @@ func (f *mockFuncs) update(m map[string]mapstr.M, obj kubernetes.Resource) {
},
}
for k, v := range accessor.GetLabels() {
kubernetes2.ShouldPut(meta, fmt.Sprintf("kubernetes.%v", k), v, logger)
ShouldPut(meta, fmt.Sprintf("kubernetes.%v", k), v, logger)
}
kubernetes2.ShouldPut(meta, "orchestrator.cluster.name", "gke-4242", logger)
ShouldPut(meta, "orchestrator.cluster.name", "gke-4242", logger)
m[accessor.GetName()] = meta
}

Expand Down
Loading

0 comments on commit f5cb102

Please sign in to comment.