Skip to content

Commit

Permalink
Add support for kubernetes provider to recognize namespace level defa…
Browse files Browse the repository at this point in the history
…ults (#16321)

* Add support for kubernetes provider to recognize namespace level default hints
  • Loading branch information
vjsamuel authored Mar 10, 2020
1 parent e64a18f commit 004d36c
Show file tree
Hide file tree
Showing 8 changed files with 389 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Remove experimental flag from `setup.template.append_fields` {pull}16576[16576]
- Add `add_cloudfoundry_metadata` processor to annotate events with Cloud Foundry application data. {pull}16621[16621]
- Add Kerberos support to Kafka input and output. {pull}16781[16781]
- Add `add_cloudfoundry_metadata` processor to annotate events with Cloud Foundry application data. {pull}16621[16621
- Add support for kubernetes provider to recognize namespace level defaults {pull}16321[16321]

*Auditbeat*

Expand Down
18 changes: 18 additions & 0 deletions filebeat/docs/autodiscover-hints.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,24 @@ annotations:
co.elastic.logs.sidecar/exclude_lines: '^DBG'
-----

[float]
===== Configuring Namespace Defaults

Hints can be configured on the Namespace's annotations as defaults to use when Pod level annotations are missing.
The resultant hints are a combination of Pod annotations and Namespace annotations with the Pod's taking precedence. To
enable Namespace defaults configure the `add_resource_metadata` for Namespace objects as follows:

["source","yaml",subs="attributes"]
-------------------------------------------------------------------------------------
filebeat.autodiscover:
providers:
- type: kubernetes
hints.enabled: true
add_resource_metadata:
namespace:
enabled: true
-------------------------------------------------------------------------------------



[float]
Expand Down
3 changes: 2 additions & 1 deletion libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ func (p *Provider) publish(event bus.Event) {
event["config"] = config
} else {
// If there isn't a default template then attempt to use builders
if config := p.builders.GetConfig(p.eventer.GenerateHints(event)); config != nil {
e := p.eventer.GenerateHints(event)
if config := p.builders.GetConfig(e); config != nil {
event["config"] = config
}
}
Expand Down
34 changes: 31 additions & 3 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/gofrs/uuid"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/elastic/beats/v7/libbeat/autodiscover/builder"
"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -42,6 +43,7 @@ type pod struct {
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
namespaceWatcher kubernetes.Watcher
namespaceStore cache.Store
}

// NewPodEventer creates an eventer that can discover and process pod objects
Expand Down Expand Up @@ -159,15 +161,27 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event {
// Try to build a config with enabled builders. Send a provider agnostic payload.
// Builders are Beat specific.
e := bus.Event{}
var annotations common.MapStr
var kubeMeta, container common.MapStr

annotations := make(common.MapStr, 0)
rawMeta, ok := event["kubernetes"]
if ok {
kubeMeta = rawMeta.(common.MapStr)
// The builder base config can configure any of the field values of kubernetes if need be.
e["kubernetes"] = kubeMeta
if rawAnn, ok := kubeMeta["annotations"]; ok {
annotations = rawAnn.(common.MapStr)
anns, _ := rawAnn.(common.MapStr)
if len(anns) != 0 {
annotations = anns.Clone()
}
}

// Look at all the namespace level default annotations and do a merge with priority going to the pod annotations.
if rawNsAnn, ok := kubeMeta["namespace_annotations"]; ok {
nsAnn, _ := rawNsAnn.(common.MapStr)
if len(nsAnn) != 0 {
annotations.DeepUpdateNoOverwrite(nsAnn)
}
}
}
if host, ok := event["host"]; ok {
Expand All @@ -186,12 +200,14 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event {
}

cname := builder.GetContainerName(container)

// Generate hints based on the cumulative of both namespace and pod annotations.
hints := builder.GenerateHints(annotations, cname, p.config.Prefix)
p.logger.Debugf("Generated hints %+v", hints)

if len(hints) != 0 {
e["hints"] = hints
}

p.logger.Debugf("Generated builder event %+v", e)

return e
Expand Down Expand Up @@ -296,6 +312,18 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
safemapstr.Put(annotations, k, v)
}
kubemeta["annotations"] = annotations
if p.namespaceWatcher != nil {
if rawNs, ok, err := p.namespaceWatcher.Store().GetByKey(pod.Namespace); ok && err == nil {
if namespace, ok := rawNs.(*kubernetes.Namespace); ok {
nsAnn := common.MapStr{}

for k, v := range namespace.GetAnnotations() {
safemapstr.Put(nsAnn, k, v)
}
kubemeta["namespace_annotations"] = nsAnn
}
}
}

// Without this check there would be overlapping configurations with and without ports.
if len(c.Ports) == 0 {
Expand Down
168 changes: 168 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,174 @@ func TestGenerateHints(t *testing.T) {
},
},
},
// Scenarios tested:
// Have one set of hints come from the pod and the other come from namespaces
// The resultant hints should have a combination of both
{
event: bus.Event{
"kubernetes": common.MapStr{
"annotations": getNestedAnnotations(common.MapStr{
"co.elastic.logs/multiline.pattern": "^test",
"co.elastic.logs/json.keys_under_root": "true",
"not.to.include": "true",
}),
"namespace_annotations": getNestedAnnotations(common.MapStr{
"co.elastic.metrics/module": "prometheus",
"co.elastic.metrics/period": "10s",
"co.elastic.metrics.foobar/period": "15s",
}),
"container": common.MapStr{
"name": "foobar",
"id": "abc",
"runtime": "docker",
},
"namespace": "ns",
},
},
result: bus.Event{
"kubernetes": common.MapStr{
"annotations": getNestedAnnotations(common.MapStr{
"co.elastic.logs/multiline.pattern": "^test",
"co.elastic.logs/json.keys_under_root": "true",
"not.to.include": "true",
}),
"namespace_annotations": getNestedAnnotations(common.MapStr{
"co.elastic.metrics/period": "10s",
"co.elastic.metrics.foobar/period": "15s",
"co.elastic.metrics/module": "prometheus",
}),
"container": common.MapStr{
"name": "foobar",
"id": "abc",
"runtime": "docker",
},
"namespace": "ns",
},
"hints": common.MapStr{
"logs": common.MapStr{
"multiline": common.MapStr{
"pattern": "^test",
},
"json": common.MapStr{
"keys_under_root": "true",
},
},
"metrics": common.MapStr{
"module": "prometheus",
"period": "15s",
},
},
"container": common.MapStr{
"name": "foobar",
"id": "abc",
"runtime": "docker",
},
},
},
// Scenarios tested:
// Have one set of hints come from the pod and the same keys come from namespaces
// The resultant hints should honor only pods and not namespace.
{
event: bus.Event{
"kubernetes": common.MapStr{
"annotations": getNestedAnnotations(common.MapStr{
"co.elastic.metrics/module": "prometheus",
"co.elastic.metrics/period": "10s",
"co.elastic.metrics.foobar/period": "15s",
"not.to.include": "true",
}),
"namespace_annotations": getNestedAnnotations(common.MapStr{
"co.elastic.metrics/module": "dropwizard",
"co.elastic.metrics/period": "60s",
"co.elastic.metrics.foobar/period": "25s",
}),
"namespace": "ns",
"container": common.MapStr{
"name": "foobar",
"id": "abc",
"runtime": "docker",
},
},
},
result: bus.Event{
"kubernetes": common.MapStr{
"annotations": getNestedAnnotations(common.MapStr{
"co.elastic.metrics/module": "prometheus",
"co.elastic.metrics/period": "10s",
"co.elastic.metrics.foobar/period": "15s",
"not.to.include": "true",
}),
"namespace_annotations": getNestedAnnotations(common.MapStr{
"co.elastic.metrics/module": "dropwizard",
"co.elastic.metrics/period": "60s",
"co.elastic.metrics.foobar/period": "25s",
}),
"container": common.MapStr{
"name": "foobar",
"id": "abc",
"runtime": "docker",
},
"namespace": "ns",
},
"hints": common.MapStr{
"metrics": common.MapStr{
"module": "prometheus",
"period": "15s",
},
},
"container": common.MapStr{
"name": "foobar",
"id": "abc",
"runtime": "docker",
},
},
},
// Scenarios tested:
// Have no hints on the pod and have namespace level defaults.
// The resultant hints should honor only namespace defaults.
{
event: bus.Event{
"kubernetes": common.MapStr{
"namespace_annotations": getNestedAnnotations(common.MapStr{
"co.elastic.metrics/module": "prometheus",
"co.elastic.metrics/period": "10s",
"co.elastic.metrics.foobar/period": "15s",
}),
"container": common.MapStr{
"name": "foobar",
"id": "abc",
"runtime": "docker",
},
"namespace": "ns",
},
},
result: bus.Event{
"kubernetes": common.MapStr{
"namespace_annotations": getNestedAnnotations(common.MapStr{
"co.elastic.metrics/module": "prometheus",
"co.elastic.metrics/period": "10s",
"co.elastic.metrics.foobar/period": "15s",
}),
"container": common.MapStr{
"name": "foobar",
"id": "abc",
"runtime": "docker",
},
"namespace": "ns",
},
"hints": common.MapStr{
"metrics": common.MapStr{
"module": "prometheus",
"period": "15s",
},
},
"container": common.MapStr{
"name": "foobar",
"id": "abc",
"runtime": "docker",
},
},
},
}

cfg := defaultConfig()
Expand Down
30 changes: 28 additions & 2 deletions libbeat/autodiscover/providers/kubernetes/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,27 @@ func (s *service) GenerateHints(event bus.Event) bus.Event {
// Try to build a config with enabled builders. Send a provider agnostic payload.
// Builders are Beat specific.
e := bus.Event{}
var annotations common.MapStr
var kubeMeta common.MapStr

annotations := make(common.MapStr, 0)
rawMeta, ok := event["kubernetes"]
if ok {
kubeMeta = rawMeta.(common.MapStr)
// The builder base config can configure any of the field values of kubernetes if need be.
e["kubernetes"] = kubeMeta
if rawAnn, ok := kubeMeta["annotations"]; ok {
annotations = rawAnn.(common.MapStr)
anns, _ := rawAnn.(common.MapStr)
if len(anns) != 0 {
annotations = anns.Clone()
}
}

// Look at all the namespace level default annotations and do a merge with priority going to the pod annotations.
if rawNsAnn, ok := kubeMeta["namespace_annotations"]; ok {
nsAnn, _ := rawNsAnn.(common.MapStr)
if len(nsAnn) != 0 {
annotations.DeepUpdateNoOverwrite(nsAnn)
}
}
}
if host, ok := event["host"]; ok {
Expand All @@ -144,6 +156,7 @@ func (s *service) GenerateHints(event bus.Event) bus.Event {

hints := builder.GenerateHints(annotations, "", s.config.Prefix)
s.logger.Debugf("Generated hints %+v", hints)

if len(hints) != 0 {
e["hints"] = hints
}
Expand Down Expand Up @@ -191,6 +204,19 @@ func (s *service) emit(svc *kubernetes.Service, flag string) {
}
kubemeta["annotations"] = annotations

if s.namespaceWatcher != nil {
if rawNs, ok, err := s.namespaceWatcher.Store().GetByKey(svc.Namespace); ok && err == nil {
if namespace, ok := rawNs.(*kubernetes.Namespace); ok {
nsAnns := common.MapStr{}

for k, v := range namespace.GetAnnotations() {
safemapstr.Put(nsAnns, k, v)
}
kubemeta["namespace_annotations"] = nsAnns
}
}
}

for _, port := range svc.Spec.Ports {
event := bus.Event{
"provider": s.uuid,
Expand Down
Loading

0 comments on commit 004d36c

Please sign in to comment.