Skip to content

Commit

Permalink
Watch kubernetes namespaces for autodiscover metadata for pods (elast…
Browse files Browse the repository at this point in the history
…ic#25117) (elastic#25157)

Namespaces metadata is used by autodiscover to include it in events or
to generate autodiscover hints for the pods in the namespace.

This change watches explicitly for changes in namespaces to update this
metadata instead of waiting for resyncs.

(cherry picked from commit 3f0c1af)
  • Loading branch information
jsoriano authored Apr 20, 2021
1 parent 36362e5 commit 04c62ef
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add new ECS 1.9 field `cloud.service.name` to `add_cloud_metadata` processor. {pull}24993[24993]
- Libbeat: report queue capacity, output batch size, and output client count to monitoring. {pull}24700[24700]
- Add kubernetes.pod.ip field in kubernetes metadata. {pull}25037[25037]
- Discover changes in Kubernetes namespace metadata as soon as they happen. {pull}25117[25117]

*Auditbeat*

Expand Down
84 changes: 82 additions & 2 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package kubernetes

import (
"fmt"
"sync"
"time"

"github.com/gofrs/uuid"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/elastic/beats/v7/libbeat/autodiscover/builder"
"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -43,7 +43,11 @@ type pod struct {
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
namespaceWatcher kubernetes.Watcher
namespaceStore cache.Store

// Mutex used by configuration updates not triggered by the main watcher,
// to avoid race conditions between cross updates and deletions.
// Other updaters must use a write lock.
crossUpdate sync.RWMutex
}

// NewPodEventer creates an eventer that can discover and process pod objects
Expand Down Expand Up @@ -111,11 +115,20 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub
}

watcher.AddEventHandler(p)

if namespaceWatcher != nil && (config.Hints.Enabled() || metaConf.Namespace.Enabled()) {
updater := newNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate)
namespaceWatcher.AddEventHandler(updater)
}

return p, nil
}

// OnAdd ensures processing of pod objects that are newly added
func (p *pod) OnAdd(obj interface{}) {
p.crossUpdate.RLock()
defer p.crossUpdate.RUnlock()

p.logger.Debugf("Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")
}
Expand All @@ -124,6 +137,13 @@ func (p *pod) OnAdd(obj interface{}) {
// if it is terminating, a stop event is scheduled, if not, a stop and a start
// events are sent sequentially to recreate the resources assotiated to the pod.
func (p *pod) OnUpdate(obj interface{}) {
p.crossUpdate.RLock()
defer p.crossUpdate.RUnlock()

p.unlockedUpdate(obj)
}

func (p *pod) unlockedUpdate(obj interface{}) {
pod := obj.(*kubernetes.Pod)

p.logger.Debugf("Watcher Pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase)
Expand Down Expand Up @@ -162,6 +182,9 @@ func (p *pod) OnUpdate(obj interface{}) {

// OnDelete stops pod objects that are deleted
func (p *pod) OnDelete(obj interface{}) {
p.crossUpdate.RLock()
defer p.crossUpdate.RUnlock()

p.logger.Debugf("Watcher Pod delete: %+v", obj)
time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
}
Expand Down Expand Up @@ -448,3 +471,60 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
p.publish(events)
}
}

// podUpdaterHandlerFunc is a function that handles pod updater notifications.
type podUpdaterHandlerFunc func(interface{})

// podUpdaterStore is the interface that an object needs to implement to be
// used as a pod updater store.
type podUpdaterStore interface {
List() []interface{}
}

// namespacePodUpdater notifies updates on pods when their namespaces are updated.
type namespacePodUpdater struct {
handler podUpdaterHandlerFunc
store podUpdaterStore
locker sync.Locker
}

// newNamespacePodUpdater creates a namespacePodUpdater
func newNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *namespacePodUpdater {
return &namespacePodUpdater{
handler: handler,
store: store,
locker: locker,
}
}

// OnUpdate handles update events on namespaces.
func (n *namespacePodUpdater) OnUpdate(obj interface{}) {
ns, ok := obj.(*kubernetes.Namespace)
if !ok {
return
}

// n.store.List() returns a snapshot at this point. If a delete is received
// from the main watcher, this loop may generate an update event after the
// delete is processed, leaving configurations that would never be deleted.
// Also this loop can miss updates, what could leave outdated configurations.
// Avoid these issues by locking the processing of events from the main watcher.
if n.locker != nil {
n.locker.Lock()
defer n.locker.Unlock()
}
for _, pod := range n.store.List() {
pod, ok := pod.(*kubernetes.Pod)
if ok && pod.Namespace == ns.Name {
n.handler(pod)
}
}
}

// OnAdd handles add events on namespaces. Nothing to do, if pods are added to this
// namespace they will generate their own add events.
func (*namespacePodUpdater) OnAdd(interface{}) {}

// OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this
// namespace they will generate their own delete events.
func (*namespacePodUpdater) OnDelete(interface{}) {}
67 changes: 67 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kubernetes

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1552,6 +1553,72 @@ func TestEmitEvent(t *testing.T) {
}
}

func TestNamespacePodUpdater(t *testing.T) {
pod := func(name, namespace string) *kubernetes.Pod {
return &kubernetes.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
}

cases := map[string]struct {
pods []interface{}
expected []interface{}
}{
"no pods": {},
"two pods but only one in namespace": {
pods: []interface{}{
pod("onepod", "foo"),
pod("onepod", "bar"),
},
expected: []interface{}{
pod("onepod", "foo"),
},
},
"two pods but none in namespace": {
pods: []interface{}{
pod("onepod", "bar"),
pod("otherpod", "bar"),
},
},
}

for title, c := range cases {
t.Run(title, func(t *testing.T) {
handler := &mockUpdaterHandler{}
store := &mockUpdaterStore{objects: c.pods}
updater := newNamespacePodUpdater(handler.OnUpdate, store, &sync.Mutex{})

namespace := &kubernetes.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
}
updater.OnUpdate(namespace)

assert.EqualValues(t, c.expected, handler.objects)
})
}
}

type mockUpdaterHandler struct {
objects []interface{}
}

func (h *mockUpdaterHandler) OnUpdate(obj interface{}) {
h.objects = append(h.objects, obj)
}

type mockUpdaterStore struct {
objects []interface{}
}

func (s *mockUpdaterStore) List() []interface{} {
return s.objects
}

func NewMockPodEventerManager(pod *pod) EventManager {
em := &eventerManager{}
em.eventer = pod
Expand Down

0 comments on commit 04c62ef

Please sign in to comment.