Skip to content

Commit

Permalink
Minor enhancements to kubernetes processor (#4068)
Browse files Browse the repository at this point in the history
* Move kubernetes processor to root processors folder
 * Restart watch call on error
 * Flag kubernetes proccesor as beta
 * Fix config validation for `in_cluster` cases
 * Add kubernetes metadata fields
 * Put kubernetes processor fields under a common namespace
  • Loading branch information
exekias authored and ruflin committed Apr 25, 2017
1 parent af60958 commit 5afda3d
Show file tree
Hide file tree
Showing 18 changed files with 313 additions and 27 deletions.
4 changes: 2 additions & 2 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/filebeat/spooler"

//Add filebeat level processors
_ "github.com/elastic/beats/filebeat/processor/annotate/kubernetes"
// Add filebeat level processors
_ "github.com/elastic/beats/filebeat/processors/kubernetes"
)

var (
Expand Down
48 changes: 48 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ grouped in the following categories:
* <<exported-fields-beat>>
* <<exported-fields-cloud>>
* <<exported-fields-icinga>>
* <<exported-fields-kubernetes>>
* <<exported-fields-log>>
* <<exported-fields-mysql>>
* <<exported-fields-nginx>>
Expand Down Expand Up @@ -682,6 +683,53 @@ type: text
The logged message.
[[exported-fields-kubernetes]]
== Kubernetes info Fields
Kubernetes metadata added by the kubernetes processor
[float]
=== kubernetes.pod.name
type: keyword
Kubernetes pod name
[float]
=== kubernetes.namespace
type: keyword
Kubernetes namespace
[float]
=== kubernetes.labels
type: object
Kubernetes labels map
[float]
=== kubernetes.annotations
type: object
Kubernetes annotations map
[float]
=== kubernetes.container.name
type: keyword
Kubernetes container name
[[exported-fields-log]]
== Log File Content Fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors/annotate/kubernetes"
"github.com/elastic/beats/libbeat/processors/kubernetes"
)

func init() {
Expand Down
48 changes: 48 additions & 0 deletions heartbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ grouped in the following categories:
* <<exported-fields-beat>>
* <<exported-fields-cloud>>
* <<exported-fields-common>>
* <<exported-fields-kubernetes>>
--
[[exported-fields-beat]]
Expand Down Expand Up @@ -356,3 +357,50 @@ required: True
Boolean indicator if monitor could validate the service to be available.
[[exported-fields-kubernetes]]
== Kubernetes info Fields
Kubernetes metadata added by the kubernetes processor
[float]
=== kubernetes.pod.name
type: keyword
Kubernetes pod name
[float]
=== kubernetes.namespace
type: keyword
Kubernetes namespace
[float]
=== kubernetes.labels
type: object
Kubernetes labels map
[float]
=== kubernetes.annotations
type: object
Kubernetes annotations map
[float]
=== kubernetes.container.name
type: keyword
Kubernetes container name
2 changes: 1 addition & 1 deletion libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ import (
_ "github.com/elastic/beats/libbeat/processors/actions"
_ "github.com/elastic/beats/libbeat/processors/add_cloud_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_locale"
_ "github.com/elastic/beats/libbeat/processors/annotate/kubernetes"
_ "github.com/elastic/beats/libbeat/processors/kubernetes"

// Register default monitoring reporting
_ "github.com/elastic/beats/libbeat/monitoring/report/elasticsearch"
Expand Down
29 changes: 29 additions & 0 deletions libbeat/processors/kubernetes/_meta/fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
- key: kubernetes
title: Kubernetes info
description: >
Kubernetes metadata added by the kubernetes processor
fields:
- name: kubernetes.pod.name
type: keyword
description: >
Kubernetes pod name
- name: kubernetes.namespace
type: keyword
description: >
Kubernetes namespace
- name: kubernetes.labels
type: object
description: >
Kubernetes labels map
- name: kubernetes.annotations
type: object
description: >
Kubernetes annotations map
- name: kubernetes.container.name
type: keyword
description: >
Kubernetes container name
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ func (g *GenDefaultMeta) GenerateMetaData(pod *corev1.Pod) common.MapStr {
annotationsMap = generateMapSubset(pod.Metadata.Annotations, g.annotations)

meta := common.MapStr{
"pod": pod.Metadata.GetName(),
"pod": common.MapStr{
"name": pod.Metadata.GetName(),
},
"namespace": pod.Metadata.GetNamespace(),
}

Expand Down Expand Up @@ -263,7 +265,9 @@ func (c *ContainerIndexer) GetMetadata(pod *corev1.Pod) []MetadataIndex {
var metadata []MetadataIndex
for i := 0; i < len(containers); i++ {
containerMeta := commonMeta.Clone()
containerMeta["container"] = pod.Status.ContainerStatuses[i].Name
containerMeta["container"] = common.MapStr{
"name": pod.Status.ContainerStatuses[i].Name,
}
metadata = append(metadata, MetadataIndex{
Index: containers[i],
Data: containerMeta,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package kubernetes

import (
"testing"

"github.com/elastic/beats/libbeat/common"
corev1 "github.com/ericchiang/k8s/api/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
"github.com/stretchr/testify/assert"
"testing"
)

var metagen = &GenDefaultMeta{}
Expand Down Expand Up @@ -34,7 +35,9 @@ func TestPodIndexer(t *testing.T) {
assert.Equal(t, indexers[0].Index, podName)

expected := common.MapStr{
"pod": "testpod",
"pod": common.MapStr{
"name": "testpod",
},
"namespace": "testns",
"labels": common.MapStr{
"labelkey": "labelvalue",
Expand Down Expand Up @@ -76,7 +79,9 @@ func TestContainerIndexer(t *testing.T) {
assert.Equal(t, len(indexers), 0)
assert.Equal(t, len(indices), 0)
expected := common.MapStr{
"pod": "testpod",
"pod": common.MapStr{
"name": "testpod",
},
"namespace": "testns",
"labels": common.MapStr{
"labelkey": "labelvalue",
Expand All @@ -91,7 +96,9 @@ func TestContainerIndexer(t *testing.T) {
ContainerID: &cid,
},
}
expected["container"] = container
expected["container"] = common.MapStr{
"name": container,
}

indexers = conIndexer.GetMetadata(&pod)
assert.Equal(t, len(indexers), 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func init() {
}

func newKubernetesAnnotator(cfg common.Config) (processors.Processor, error) {
logp.Beta("The kubernetes processor is beta")

config := defaultKuberentesAnnotatorConfig()

err := cfg.Unpack(&config)
Expand Down Expand Up @@ -182,7 +184,6 @@ func newKubernetesAnnotator(cfg common.Config) (processors.Processor, error) {
}

func (k kubernetesAnnotator) Run(event common.MapStr) (common.MapStr, error) {

index := k.matchers.MetadataIndex(event)
if index == "" {
return event, nil
Expand All @@ -194,23 +195,23 @@ func (k kubernetesAnnotator) Run(event common.MapStr) (common.MapStr, error) {
}

meta := common.MapStr{}
metaIface, ok := event["metadata"]
metaIface, ok := event["kubernetes"]
if !ok {
event["metadata"] = common.MapStr{}
event["kubernetes"] = common.MapStr{}
} else {
meta = metaIface.(common.MapStr)
}

meta["kubernetes"] = metadata
event["metadata"] = meta
meta.Update(metadata)
event["kubernetes"] = meta

return event, nil
}

func (k kubernetesAnnotator) String() string { return "kubernetes" }

func validate(config kubeAnnotatorConfig) error {
if config.KubeConfig == "" {
if !config.InCluster && config.KubeConfig == "" {
return errors.New("`kube_config` path can't be empty when in_cluster is set to false")
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ func (p *PodWatcher) watchPods() {
time.Sleep(time.Second)
continue
}

for {
_, pod, err := watcher.Next()
if err != nil {
logp.Err("kubernetes: Watching API eror %v", err)
time.Sleep(time.Second)
continue
logp.Err("kubernetes: Watching API error %v", err)
break
}

p.podQueue <- pod
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/pkg/errors"

//Add metricbeat specific processors
_ "github.com/elastic/beats/metricbeat/processor/annotate/kubernetes"
// Add metricbeat specific processors
_ "github.com/elastic/beats/metricbeat/processors/kubernetes"
)

// Metricbeat implements the Beater interface for metricbeat.
Expand Down
48 changes: 48 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ grouped in the following categories:
* <<exported-fields-kafka>>
* <<exported-fields-kibana>>
* <<exported-fields-kubelet>>
* <<exported-fields-kubernetes>>
* <<exported-fields-memcached>>
* <<exported-fields-mongodb>>
* <<exported-fields-mysql>>
Expand Down Expand Up @@ -4447,6 +4448,53 @@ type: long
Total inodes
[[exported-fields-kubernetes]]
== Kubernetes info Fields
Kubernetes metadata added by the kubernetes processor
[float]
=== kubernetes.pod.name
type: keyword
Kubernetes pod name
[float]
=== kubernetes.namespace
type: keyword
Kubernetes namespace
[float]
=== kubernetes.labels
type: object
Kubernetes labels map
[float]
=== kubernetes.annotations
type: object
Kubernetes annotations map
[float]
=== kubernetes.container.name
type: keyword
Kubernetes container name
[[exported-fields-memcached]]
== memcached Fields
Expand Down
Loading

0 comments on commit 5afda3d

Please sign in to comment.