diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7bec2527f4dc..cae0dcf919c2 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -371,6 +371,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add new ECS 1.9 field `cloud.service.name` to `add_cloud_metadata` processor. {pull}24993[24993] - Libbeat: report queue capacity, output batch size, and output client count to monitoring. {pull}24700[24700] - Add kubernetes.pod.ip field in kubernetes metadata. {pull}25037[25037] +- Discover changes in Kubernetes namespace metadata as soon as they happen. {pull}25117[25117] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 67a9aa2c109c..09c74ac4d37b 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -19,11 +19,11 @@ package kubernetes import ( "fmt" + "sync" "time" "github.com/gofrs/uuid" k8s "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "github.com/elastic/beats/v7/libbeat/autodiscover/builder" "github.com/elastic/beats/v7/libbeat/common" @@ -43,7 +43,11 @@ type pod struct { watcher kubernetes.Watcher nodeWatcher kubernetes.Watcher namespaceWatcher kubernetes.Watcher - namespaceStore cache.Store + + // Mutex used by configuration updates not triggered by the main watcher, + // to avoid race conditions between cross updates and deletions. + // Other updaters must use a write lock. + crossUpdate sync.RWMutex } // NewPodEventer creates an eventer that can discover and process pod objects @@ -111,11 +115,20 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub } watcher.AddEventHandler(p) + + if namespaceWatcher != nil && (config.Hints.Enabled() || metaConf.Namespace.Enabled()) { + updater := newNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate) + namespaceWatcher.AddEventHandler(updater) + } + return p, nil } // OnAdd ensures processing of pod objects that are newly added func (p *pod) OnAdd(obj interface{}) { + p.crossUpdate.RLock() + defer p.crossUpdate.RUnlock() + p.logger.Debugf("Watcher Pod add: %+v", obj) p.emit(obj.(*kubernetes.Pod), "start") } @@ -124,6 +137,13 @@ func (p *pod) OnAdd(obj interface{}) { // if it is terminating, a stop event is scheduled, if not, a stop and a start // events are sent sequentially to recreate the resources assotiated to the pod. func (p *pod) OnUpdate(obj interface{}) { + p.crossUpdate.RLock() + defer p.crossUpdate.RUnlock() + + p.unlockedUpdate(obj) +} + +func (p *pod) unlockedUpdate(obj interface{}) { pod := obj.(*kubernetes.Pod) p.logger.Debugf("Watcher Pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) @@ -162,6 +182,9 @@ func (p *pod) OnUpdate(obj interface{}) { // OnDelete stops pod objects that are deleted func (p *pod) OnDelete(obj interface{}) { + p.crossUpdate.RLock() + defer p.crossUpdate.RUnlock() + p.logger.Debugf("Watcher Pod delete: %+v", obj) time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") }) } @@ -448,3 +471,60 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet p.publish(events) } } + +// podUpdaterHandlerFunc is a function that handles pod updater notifications. +type podUpdaterHandlerFunc func(interface{}) + +// podUpdaterStore is the interface that an object needs to implement to be +// used as a pod updater store. +type podUpdaterStore interface { + List() []interface{} +} + +// namespacePodUpdater notifies updates on pods when their namespaces are updated. +type namespacePodUpdater struct { + handler podUpdaterHandlerFunc + store podUpdaterStore + locker sync.Locker +} + +// newNamespacePodUpdater creates a namespacePodUpdater +func newNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *namespacePodUpdater { + return &namespacePodUpdater{ + handler: handler, + store: store, + locker: locker, + } +} + +// OnUpdate handles update events on namespaces. +func (n *namespacePodUpdater) OnUpdate(obj interface{}) { + ns, ok := obj.(*kubernetes.Namespace) + if !ok { + return + } + + // n.store.List() returns a snapshot at this point. If a delete is received + // from the main watcher, this loop may generate an update event after the + // delete is processed, leaving configurations that would never be deleted. + // Also this loop can miss updates, what could leave outdated configurations. + // Avoid these issues by locking the processing of events from the main watcher. + if n.locker != nil { + n.locker.Lock() + defer n.locker.Unlock() + } + for _, pod := range n.store.List() { + pod, ok := pod.(*kubernetes.Pod) + if ok && pod.Namespace == ns.Name { + n.handler(pod) + } + } +} + +// OnAdd handles add events on namespaces. Nothing to do, if pods are added to this +// namespace they will generate their own add events. +func (*namespacePodUpdater) OnAdd(interface{}) {} + +// OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this +// namespace they will generate their own delete events. +func (*namespacePodUpdater) OnDelete(interface{}) {} diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 558b62e4a493..3f874b649d4c 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -18,6 +18,7 @@ package kubernetes import ( + "sync" "testing" "time" @@ -1552,6 +1553,72 @@ func TestEmitEvent(t *testing.T) { } } +func TestNamespacePodUpdater(t *testing.T) { + pod := func(name, namespace string) *kubernetes.Pod { + return &kubernetes.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + } + + cases := map[string]struct { + pods []interface{} + expected []interface{} + }{ + "no pods": {}, + "two pods but only one in namespace": { + pods: []interface{}{ + pod("onepod", "foo"), + pod("onepod", "bar"), + }, + expected: []interface{}{ + pod("onepod", "foo"), + }, + }, + "two pods but none in namespace": { + pods: []interface{}{ + pod("onepod", "bar"), + pod("otherpod", "bar"), + }, + }, + } + + for title, c := range cases { + t.Run(title, func(t *testing.T) { + handler := &mockUpdaterHandler{} + store := &mockUpdaterStore{objects: c.pods} + updater := newNamespacePodUpdater(handler.OnUpdate, store, &sync.Mutex{}) + + namespace := &kubernetes.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + } + updater.OnUpdate(namespace) + + assert.EqualValues(t, c.expected, handler.objects) + }) + } +} + +type mockUpdaterHandler struct { + objects []interface{} +} + +func (h *mockUpdaterHandler) OnUpdate(obj interface{}) { + h.objects = append(h.objects, obj) +} + +type mockUpdaterStore struct { + objects []interface{} +} + +func (s *mockUpdaterStore) List() []interface{} { + return s.objects +} + func NewMockPodEventerManager(pod *pod) EventManager { em := &eventerManager{} em.eventer = pod