From 9a18e7c491c8d450492b1e668148f7912939cbda Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Mon, 11 Feb 2019 09:57:52 +0100 Subject: [PATCH] Fix stopping of modules started by kubernetes autodiscover (#10476) (#10643) Kubernetes autodiscover only emits events for containers with an ID in pods with an IP, but when a pod is being terminated, their containers can lack of ID and the pod itself can lack of IP. This leads to modules that are never stopped because the delete event that should stop them lacks of the needed information. This change makes two things to avoid this problem: * Don't require the pod to have an IP on stop events. * Use IDs for containers that don't depend on its state. (cherry picked from commit 15f2f263f885e717c9a52866b0a0e97ebcfcf07b) --- CHANGELOG.next.asciidoc | 1 + .../providers/kubernetes/kubernetes.go | 28 ++- .../providers/kubernetes/kubernetes_test.go | 167 +++++++++++++++++- 3 files changed, 186 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 84b8c2ce770e..fa7d16d66d93 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -71,6 +71,7 @@ https://github.com/elastic/beats/compare/v6.6.0...6.x[Check the HEAD diff] - Fix panic and file unlock in spool on atomic operation (arm, x86-32). File lock was not released when panic occurs, leading to the beat deadlocking on startup. {pull}10289[10289] - Adding logging traces at debug level when the pipeline client receives the following events: onFilteredOut, onDroppedOnPublish. {pull}9016[9016] - Do not panic when no tokenizer string is configured for a dissect processor. {issue}8895[8895] +- Fix stopping of modules started by kubernetes autodiscover. {pull}10476[10476] - Fix a issue when remote and local configuration didn't match when fetching configuration from Central Management. {issue}10587[10587] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index a61387b42c72..530f4c3dd88f 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -18,6 +18,7 @@ package kubernetes import ( + "fmt" "time" "github.com/gofrs/uuid" @@ -144,12 +145,16 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku containerstatuses []*kubernetes.PodContainerStatus) { host := pod.Status.GetPodIP() - // Do not emit events without host (container is still being configured) - if host == "" { + // If the container doesn't exist in the runtime or its network + // is not configured, it won't have an IP. Skip it as we cannot + // generate configs without host, and an update will arrive when + // the container is ready. + // If stopping, emit the event in any case to ensure cleanup. + if host == "" && flag != "stop" { return } - // Collect all container IDs and runtimes from status information. + // Collect all runtimes from status information. containerIDs := map[string]string{} runtimes := map[string]string{} for _, c := range containerstatuses { @@ -160,13 +165,18 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku // Emit container and port information for _, c := range containers { + // If it doesn't have an ID, container doesn't exist in + // the runtime, emit only an event if we are stopping, so + // we are sure of cleaning up configurations. cid := containerIDs[c.GetName()] - - // If there is a container ID that is empty then ignore it. It either means that the container is still starting - // up or the container is shutting down. - if cid == "" { + if cid == "" && flag != "stop" { continue } + + // This must be an id that doesn't depend on the state of the container + // so it works also on `stop` if containers have been already deleted. + eventID := fmt.Sprintf("%s.%s", pod.Metadata.GetUid(), c.GetName()) + cmeta := common.MapStr{ "id": cid, "name": c.GetName(), @@ -190,7 +200,7 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku if len(c.Ports) == 0 { event := bus.Event{ "provider": p.uuid, - "id": cid, + "id": eventID, flag: true, "host": host, "kubernetes": kubemeta, @@ -204,7 +214,7 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku for _, port := range c.Ports { event := bus.Event{ "provider": p.uuid, - "id": cid, + "id": eventID, flag: true, "host": host, "port": port.GetContainerPort(), diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go b/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go index 1dc416021651..382558cc670c 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go @@ -160,6 +160,7 @@ func TestEmitEvent(t *testing.T) { uid := "005f3b90-4b9d-12f8-acf0-31020a840133" containerImage := "elastic/filebeat:6.3.0" node := "node" + cid := "005f3b90-4b9d-12f8-acf0-31020a840133.filebeat" UUID, err := uuid.NewV4() if err != nil { t.Fatal(err) @@ -204,7 +205,7 @@ func TestEmitEvent(t *testing.T) { Expected: bus.Event{ "start": true, "host": "127.0.0.1", - "id": "foobar", + "id": cid, "provider": UUID, "kubernetes": common.MapStr{ "container": common.MapStr{ @@ -270,6 +271,170 @@ func TestEmitEvent(t *testing.T) { }, Expected: nil, }, + { + Message: "Test pod without container id", + Flag: "start", + Pod: &v1.Pod{ + Metadata: &metav1.ObjectMeta{ + Name: &name, + Uid: &uid, + Namespace: &namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Status: &v1.PodStatus{ + PodIP: &podIP, + ContainerStatuses: []*kubernetes.PodContainerStatus{ + { + Name: &name, + }, + }, + }, + Spec: &v1.PodSpec{ + NodeName: &node, + Containers: []*kubernetes.Container{ + { + Image: &containerImage, + Name: &name, + }, + }, + }, + }, + Expected: nil, + }, + { + Message: "Test stop pod without host", + Flag: "stop", + Pod: &v1.Pod{ + Metadata: &metav1.ObjectMeta{ + Name: &name, + Uid: &uid, + Namespace: &namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Status: &v1.PodStatus{ + ContainerStatuses: []*kubernetes.PodContainerStatus{ + { + Name: &name, + }, + }, + }, + Spec: &v1.PodSpec{ + NodeName: &node, + Containers: []*kubernetes.Container{ + { + Image: &containerImage, + Name: &name, + }, + }, + }, + }, + Expected: bus.Event{ + "stop": true, + "host": "", + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + }, + { + Message: "Test stop pod without container id", + Flag: "stop", + Pod: &v1.Pod{ + Metadata: &metav1.ObjectMeta{ + Name: &name, + Uid: &uid, + Namespace: &namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Status: &v1.PodStatus{ + PodIP: &podIP, + ContainerStatuses: []*kubernetes.PodContainerStatus{ + { + Name: &name, + }, + }, + }, + Spec: &v1.PodSpec{ + NodeName: &node, + Containers: []*kubernetes.Container{ + { + Image: &containerImage, + Name: &name, + }, + }, + }, + }, + Expected: bus.Event{ + "stop": true, + "host": "127.0.0.1", + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + }, } for _, test := range tests {