From b4b6e6e67cabd258104bcb59db7cec499442014a Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 20 Apr 2021 10:56:32 +0200 Subject: [PATCH] Refactor kubernetes autodiscover to avoid skipping short-living pods (#24742) Refactor logic in kubernetes autodiscover that decides when to generate events to try to address issues with short-living containers. Kubernetes autodiscover can generate events without network information now (without host or port/ports). This allows to generate events for pods that haven't started yet, or have succeeded/failed before generating a running event. These events still include the container id, so they can be used to collect logs. Still, no start event is generated if no pod ip and no container ids are available. Some helpers have been added to obtain relevant information from pods and their containers. Some additional small refactors are done to improve readability. --- CHANGELOG.next.asciidoc | 1 + filebeat/autodiscover/builder/hints/logs.go | 5 - .../autodiscover/providers/kubernetes/pod.go | 452 +++++++++-------- .../providers/kubernetes/pod_test.go | 465 ++++++++++++++++-- libbeat/common/kubernetes/informer.go | 7 +- libbeat/common/kubernetes/watcher.go | 76 ++- 6 files changed, 722 insertions(+), 284 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 262e4f6eea9..4f4ccc33464 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -243,6 +243,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Allow cgroup self-monitoring to see alternate `hostfs` paths {pull}24334[24334] - Add `expand_keys` to the list of permitted config fields for `decode_json_fields` {24862}[24862] - Fix 'make setup' instructions for a new beat {pull}24944[24944] +- Fix discovery of short-living and failing pods in Kubernetes autodiscover {issue}22718[22718] {pull}24742[24742] - Fix inode removal tracking code when files are replaced by files with the same name {pull}25002[25002] - Fix `mage GenerateCustomBeat` instructions for a new beat {pull}17679[17679] - Fix bug with annotations dedot config on k8s not used {pull}25111[25111] diff --git a/filebeat/autodiscover/builder/hints/logs.go b/filebeat/autodiscover/builder/hints/logs.go index 05014134106..037dd3f402e 100644 --- a/filebeat/autodiscover/builder/hints/logs.go +++ b/filebeat/autodiscover/builder/hints/logs.go @@ -93,11 +93,6 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*comm return []*common.Config{} } - host, _ := event["host"].(string) - if host == "" { - return []*common.Config{} - } - if inputConfig != nil { configs := []*common.Config{} for _, cfg := range inputConfig { diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 09c74ac4d37..eabb18c8170 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -39,7 +39,7 @@ type pod struct { config *Config metagen metadata.MetaGen logger *logp.Logger - publish func([]bus.Event) + publishFunc func([]bus.Event) watcher kubernetes.Watcher nodeWatcher kubernetes.Watcher namespaceWatcher kubernetes.Watcher @@ -106,7 +106,7 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub p := &pod{ config: config, uuid: uuid, - publish: publish, + publishFunc: publish, metagen: metaGen, logger: logger, watcher: watcher, @@ -124,7 +124,7 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub return p, nil } -// OnAdd ensures processing of pod objects that are newly added +// OnAdd ensures processing of pod objects that are newly added. func (p *pod) OnAdd(obj interface{}) { p.crossUpdate.RLock() defer p.crossUpdate.RUnlock() @@ -133,9 +133,7 @@ func (p *pod) OnAdd(obj interface{}) { p.emit(obj.(*kubernetes.Pod), "start") } -// OnUpdate emits events for a given pod depending on the state of the pod, -// if it is terminating, a stop event is scheduled, if not, a stop and a start -// events are sent sequentially to recreate the resources assotiated to the pod. +// OnUpdate handles events for pods that have been updated. func (p *pod) OnUpdate(obj interface{}) { p.crossUpdate.RLock() defer p.crossUpdate.RUnlock() @@ -144,52 +142,21 @@ func (p *pod) OnUpdate(obj interface{}) { } func (p *pod) unlockedUpdate(obj interface{}) { - pod := obj.(*kubernetes.Pod) - - p.logger.Debugf("Watcher Pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) - switch pod.Status.Phase { - case kubernetes.PodSucceeded, kubernetes.PodFailed: - // If Pod is in a phase where all containers in the have terminated emit a stop event - p.logger.Debugf("Watcher Pod update (terminated): %+v", obj) - time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") }) - return - case kubernetes.PodPending: - p.logger.Debugf("Watcher Pod update (pending): don't know what to do with this Pod yet, skipping for now: %+v", obj) - return - } - - // here handle the case when a Pod is in `Terminating` phase. - // In this case the pod is neither `PodSucceeded` nor `PodFailed` and - // hence requires special handling. - if pod.GetObjectMeta().GetDeletionTimestamp() != nil { - p.logger.Debugf("Watcher Pod update (terminating): %+v", obj) - // Pod is terminating, don't reload its configuration and ignore the event - // if some pod is still running, we will receive more events when containers - // terminate. - for _, container := range pod.Status.ContainerStatuses { - if container.State.Running != nil { - return - } - } - time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") }) - return - } - p.logger.Debugf("Watcher Pod update: %+v", obj) - p.emit(pod, "stop") - p.emit(pod, "start") + p.emit(obj.(*kubernetes.Pod), "stop") + p.emit(obj.(*kubernetes.Pod), "start") } -// OnDelete stops pod objects that are deleted +// OnDelete stops pod objects that are deleted. func (p *pod) OnDelete(obj interface{}) { p.crossUpdate.RLock() defer p.crossUpdate.RUnlock() p.logger.Debugf("Watcher Pod delete: %+v", obj) - time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") }) + p.emit(obj.(*kubernetes.Pod), "stop") } -// GenerateHints creates hints needed for hints builder +// GenerateHints creates hints needed for hints builder. func (p *pod) GenerateHints(event bus.Event) bus.Event { // Try to build a config with enabled builders. Send a provider agnostic payload. // Builders are Beat specific. @@ -211,9 +178,9 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event { // Look at all the namespace level default annotations and do a merge with priority going to the pod annotations. if rawNsAnn, ok := kubeMeta["namespace_annotations"]; ok { - nsAnn, _ := rawNsAnn.(common.MapStr) - if len(nsAnn) != 0 { - annotations.DeepUpdateNoOverwrite(nsAnn) + namespaceAnnotations, _ := rawNsAnn.(common.MapStr) + if len(namespaceAnnotations) != 0 { + annotations.DeepUpdateNoOverwrite(namespaceAnnotations) } } } @@ -280,195 +247,286 @@ func (p *pod) Stop() { } } -func (p *pod) emit(pod *kubernetes.Pod, flag string) { - containers, statuses := getContainersInPod(pod) - p.emitEvents(pod, flag, containers, statuses) +type containerInPod struct { + id string + runtime string + spec kubernetes.Container + status kubernetes.PodContainerStatus } // getContainersInPod returns all the containers defined in a pod and their statuses. // It includes init and ephemeral containers. -func getContainersInPod(pod *kubernetes.Pod) ([]kubernetes.Container, []kubernetes.PodContainerStatus) { - var containers []kubernetes.Container - var statuses []kubernetes.PodContainerStatus +func getContainersInPod(pod *kubernetes.Pod) []*containerInPod { + var containers []*containerInPod + for _, c := range pod.Spec.Containers { + containers = append(containers, &containerInPod{spec: c}) + } + for _, c := range pod.Spec.InitContainers { + containers = append(containers, &containerInPod{spec: c}) + } + for _, c := range pod.Spec.EphemeralContainers { + c := kubernetes.Container(c.EphemeralContainerCommon) + containers = append(containers, &containerInPod{spec: c}) + } - // Emit events for all containers - containers = append(containers, pod.Spec.Containers...) - statuses = append(statuses, pod.Status.ContainerStatuses...) + statuses := make(map[string]*kubernetes.PodContainerStatus) + mapStatuses := func(s []kubernetes.PodContainerStatus) { + for i := range s { + statuses[s[i].Name] = &s[i] + } + } + mapStatuses(pod.Status.ContainerStatuses) + mapStatuses(pod.Status.InitContainerStatuses) + mapStatuses(pod.Status.EphemeralContainerStatuses) + for _, c := range containers { + if s, ok := statuses[c.spec.Name]; ok { + c.id, c.runtime = kubernetes.ContainerIDWithRuntime(*s) + c.status = *s + } + } - // Emit events for all initContainers - containers = append(containers, pod.Spec.InitContainers...) - statuses = append(statuses, pod.Status.InitContainerStatuses...) + return containers +} - // Emit events for all ephemeralContainers - // Ephemeral containers are alpha feature in k8s and this code may require some changes, if their - // api change in the future. - for _, c := range pod.Spec.EphemeralContainers { - containers = append(containers, kubernetes.Container(c.EphemeralContainerCommon)) +// emit emits the events for the given pod according to its state and +// the given flag. +// It emits a pod event if the pod has at least a running container, +// and a container event for each one of the ports defined in each +// container. +// If a container doesn't have any defined port, it emits a single +// container event with "port" set to 0. +// "start" events are only generated for containers that have an id. +// "stop" events are always generated to ensure that configurations are +// deleted. +// If the pod is terminated, "stop" events are delayed during the grace +// period defined in `CleanupTimeout`. +// Network information is only included in events for running containers +// and for pods with at least one running container. +func (p *pod) emit(pod *kubernetes.Pod, flag string) { + annotations := podAnnotations(pod) + namespaceAnnotations := podNamespaceAnnotations(pod, p.namespaceWatcher) + + eventList := make([][]bus.Event, 0) + portsMap := common.MapStr{} + containers := getContainersInPod(pod) + anyContainerRunning := false + for _, c := range containers { + if c.status.State.Running != nil { + anyContainerRunning = true + } + + events, ports := p.containerPodEvents(flag, pod, c, annotations, namespaceAnnotations) + if len(events) != 0 { + eventList = append(eventList, events) + } + if len(ports) > 0 { + portsMap.DeepUpdate(ports) + } + } + if len(eventList) != 0 { + event := p.podEvent(flag, pod, portsMap, anyContainerRunning, annotations, namespaceAnnotations) + // Ensure that the pod level event is published first to avoid + // pod metadata overriding a valid container metadata. + eventList = append([][]bus.Event{{event}}, eventList...) } - statuses = append(statuses, pod.Status.EphemeralContainerStatuses...) - return containers, statuses + delay := (flag == "stop" && podTerminated(pod, containers)) + p.publishAll(eventList, delay) } -func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernetes.Container, - containerstatuses []kubernetes.PodContainerStatus) { - host := pod.Status.PodIP +// containerPodEvents creates the events for a container in a pod +// One event is created for each configured port. If there is no +// configured port, a single event is created, with the port set to 0. +// Host and port information is only included if the container is +// running. +// If the container ID is unkown, only "stop" events are generated. +// It also returns a map with the named ports. +func (p *pod) containerPodEvents(flag string, pod *kubernetes.Pod, c *containerInPod, annotations, namespaceAnnotations common.MapStr) ([]bus.Event, common.MapStr) { + if c.id == "" && flag != "stop" { + return nil, nil + } - // If the container doesn't exist in the runtime or its network - // is not configured, it won't have an IP. Skip it as we cannot - // generate configs without host, and an update will arrive when - // the container is ready. - // If stopping, emit the event in any case to ensure cleanup. - if host == "" && flag != "stop" { - return + // This must be an id that doesn't depend on the state of the container + // so it works also on `stop` if containers have been already deleted. + eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.spec.Name) + + meta := p.metagen.Generate(pod, metadata.WithFields("container.name", c.spec.Name)) + + cmeta := common.MapStr{ + "id": c.id, + "runtime": c.runtime, + "image": common.MapStr{ + "name": c.spec.Image, + }, } - // Collect all runtimes from status information. - containerIDs := map[string]string{} - runtimes := map[string]string{} - for _, c := range containerstatuses { - // If the container is not being stopped then add the container only if it is in running state. - // This makes sure that we dont keep tailing init container logs after they have stopped. - // Emit the event in case that the pod is being stopped. - if flag == "stop" || c.State.Running != nil { - cid, runtime := kubernetes.ContainerIDWithRuntime(c) - containerIDs[c.Name] = cid - runtimes[c.Name] = runtime - } + // Information that can be used in discovering a workload + kubemeta := meta.Clone() + kubemeta["annotations"] = annotations + kubemeta["container"] = common.MapStr{ + "id": c.id, + "name": c.spec.Name, + "image": c.spec.Image, + "runtime": c.runtime, + } + if len(namespaceAnnotations) != 0 { + kubemeta["namespace_annotations"] = namespaceAnnotations } - // Pass annotations to all events so that it can be used in templating and by annotation builders. - var ( - annotations = common.MapStr{} - nsAnn = common.MapStr{} - podPorts = common.MapStr{} - eventList = make([][]bus.Event, 0) - ) - for k, v := range pod.GetObjectMeta().GetAnnotations() { - safemapstr.Put(annotations, k, v) + ports := c.spec.Ports + if len(ports) == 0 { + // Ensure that at least one event is generated for this container. + // Set port to zero to signify that the event is from a container + // and not from a pod. + ports = []kubernetes.ContainerPort{{ContainerPort: 0}} } - if p.namespaceWatcher != nil { - if rawNs, ok, err := p.namespaceWatcher.Store().GetByKey(pod.Namespace); ok && err == nil { - if namespace, ok := rawNs.(*kubernetes.Namespace); ok { - for k, v := range namespace.GetAnnotations() { - safemapstr.Put(nsAnn, k, v) - } + var events []bus.Event + portsMap := common.MapStr{} + for _, port := range ports { + event := bus.Event{ + "provider": p.uuid, + "id": eventID, + flag: true, + "kubernetes": kubemeta, + // Actual metadata that will enrich the event. + "meta": common.MapStr{ + "kubernetes": meta, + "container": cmeta, + }, + } + // Include network information only if the container is running, + // so templates that need network don't generate a config. + if c.status.State.Running != nil { + if port.Name != "" && port.ContainerPort != 0 { + portsMap[port.Name] = port.ContainerPort } + event["host"] = pod.Status.PodIP + event["port"] = port.ContainerPort } + + events = append(events, event) } - // Emit container and port information - for _, c := range containers { - // If it doesn't have an ID, container doesn't exist in - // the runtime, emit only an event if we are stopping, so - // we are sure of cleaning up configurations. - cid := containerIDs[c.Name] - if cid == "" && flag != "stop" { - continue + return events, portsMap +} + +// podEvent creates an event for a pod. +// It only includes network information if `includeNetwork` is true. +func (p *pod) podEvent(flag string, pod *kubernetes.Pod, ports common.MapStr, includeNetwork bool, annotations, namespaceAnnotations common.MapStr) bus.Event { + meta := p.metagen.Generate(pod) + + // Information that can be used in discovering a workload + kubemeta := meta.Clone() + kubemeta["annotations"] = annotations + if len(namespaceAnnotations) != 0 { + kubemeta["namespace_annotations"] = namespaceAnnotations + } + + // Don't set a port on the event + event := bus.Event{ + "provider": p.uuid, + "id": fmt.Sprint(pod.GetObjectMeta().GetUID()), + flag: true, + "kubernetes": kubemeta, + "meta": common.MapStr{ + "kubernetes": meta, + }, + } + + // Include network information only if the pod has an IP and there is any + // running container that could handle requests. + if pod.Status.PodIP != "" && includeNetwork { + event["host"] = pod.Status.PodIP + if len(ports) > 0 { + event["ports"] = ports } + } - // This must be an id that doesn't depend on the state of the container - // so it works also on `stop` if containers have been already deleted. - eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) + return event +} - meta := p.metagen.Generate(pod, metadata.WithFields("container.name", c.Name)) +// podAnnotations returns the annotations in a pod +func podAnnotations(pod *kubernetes.Pod) common.MapStr { + annotations := common.MapStr{} + for k, v := range pod.GetObjectMeta().GetAnnotations() { + safemapstr.Put(annotations, k, v) + } + return annotations +} - cmeta := common.MapStr{ - "id": cid, - "runtime": runtimes[c.Name], - "image": common.MapStr{ - "name": c.Image, - }, - } +// podNamespaceAnnotations returns the annotations of the namespace of the pod +func podNamespaceAnnotations(pod *kubernetes.Pod, watcher kubernetes.Watcher) common.MapStr { + if watcher == nil { + return nil + } - // Information that can be used in discovering a workload - kubemeta := meta.Clone() - kubemeta["annotations"] = annotations - kubemeta["container"] = common.MapStr{ - "id": cid, - "name": c.Name, - "image": c.Image, - "runtime": runtimes[c.Name], - } - if len(nsAnn) != 0 { - kubemeta["namespace_annotations"] = nsAnn - } + rawNs, ok, err := watcher.Store().GetByKey(pod.Namespace) + if !ok || err != nil { + return nil + } - var events []bus.Event - // Without this check there would be overlapping configurations with and without ports. - if len(c.Ports) == 0 { - // Set a zero port on the event to signify that the event is from a container - event := bus.Event{ - "provider": p.uuid, - "id": eventID, - flag: true, - "host": host, - "port": 0, - "kubernetes": kubemeta, - //Actual metadata that will enrich the event - "meta": common.MapStr{ - "kubernetes": meta, - "container": cmeta, - }, - } - events = append(events, event) - } + namespace, ok := rawNs.(*kubernetes.Namespace) + if !ok { + return nil + } - for _, port := range c.Ports { - podPorts[port.Name] = port.ContainerPort - event := bus.Event{ - "provider": p.uuid, - "id": eventID, - flag: true, - "host": host, - "port": port.ContainerPort, - "kubernetes": kubemeta, - "meta": common.MapStr{ - "kubernetes": meta, - "container": cmeta, - }, - } - events = append(events, event) - } - if len(events) != 0 { - eventList = append(eventList, events) - } + annotations := common.MapStr{} + for k, v := range namespace.GetAnnotations() { + safemapstr.Put(annotations, k, v) } + return annotations +} - // Publish a pod level event so that hints that have no exposed ports can get processed. - // Log hints would just ignore this event as there is no ${data.container.id} - // Publish the pod level hint only if at least one container level hint was generated. This ensures that there is - // no unnecessary pod level events emitted prematurely. - // We publish the pod level hint first so that it doesn't override a valid container level event. - if len(eventList) != 0 { - meta := p.metagen.Generate(pod) +// podTerminating returns true if a pod is marked for deletion or is in a phase beyond running. +func podTerminating(pod *kubernetes.Pod) bool { + if pod.GetObjectMeta().GetDeletionTimestamp() != nil { + return true + } - // Information that can be used in discovering a workload - kubemeta := meta.Clone() - kubemeta["annotations"] = annotations - if len(nsAnn) != 0 { - kubemeta["namespace_annotations"] = nsAnn - } + switch pod.Status.Phase { + case kubernetes.PodRunning, kubernetes.PodPending: + default: + return true + } - // Don't set a port on the event - event := bus.Event{ - "provider": p.uuid, - "id": fmt.Sprint(pod.GetObjectMeta().GetUID()), - flag: true, - "host": host, - "ports": podPorts, - "kubernetes": kubemeta, - "meta": common.MapStr{ - "kubernetes": meta, - }, + return false +} + +// podTerminated returns true if a pod is terminated, this method considers a +// pod as terminated if none of its containers are running (or going to be running). +func podTerminated(pod *kubernetes.Pod, containers []*containerInPod) bool { + // Pod is not marked for termination, so it is not terminated. + if !podTerminating(pod) { + return false + } + + // If any container is running, the pod is not terminated yet. + for _, container := range containers { + if container.status.State.Running != nil { + return false } - p.publish([]bus.Event{event}) } - // Ensure that the pod level event is published first to avoid pod metadata overriding a valid container metadata + return true +} + +// publishAll publishes all events in the event list in the same order. If delay is true +// publishAll schedules the publication of the events after the configured `CleanupPeriod` +// and returns inmediatelly. +// Order of published events matters, so this function will always publish a given eventList +// in the same goroutine. +func (p *pod) publishAll(eventList [][]bus.Event, delay bool) { + if delay && p.config.CleanupTimeout > 0 { + p.logger.Debug("Publish will wait for the cleanup timeout") + time.AfterFunc(p.config.CleanupTimeout, func() { + p.publishAll(eventList, false) + }) + return + } + for _, events := range eventList { - p.publish(events) + p.publishFunc(events) } } diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 3f874b649d4..b1fed1fddfe 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -370,6 +370,7 @@ func TestEmitEvent(t *testing.T) { TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, + Phase: kubernetes.PodRunning, ContainerStatuses: []kubernetes.PodContainerStatus{ { Name: name, @@ -396,7 +397,6 @@ func TestEmitEvent(t *testing.T) { "host": "127.0.0.1", "id": uid, "provider": UUID, - "ports": common.MapStr{}, "kubernetes": common.MapStr{ "pod": common.MapStr{ "name": "filebeat", @@ -426,7 +426,7 @@ func TestEmitEvent(t *testing.T) { { "start": true, "host": "127.0.0.1", - "port": 0, + "port": int32(0), "id": cid, "provider": UUID, "kubernetes": common.MapStr{ @@ -484,6 +484,7 @@ func TestEmitEvent(t *testing.T) { TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, + Phase: kubernetes.PodRunning, ContainerStatuses: []kubernetes.PodContainerStatus{ { Name: name, @@ -643,9 +644,10 @@ func TestEmitEvent(t *testing.T) { }, }, { - Message: "Test pod without host", + // This could be a succeeded pod from a short-living cron job. + Message: "Test succeeded pod start with multiple ports exposed", Flag: "start", - Pod: &v1.Pod{ + Pod: &kubernetes.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, UID: types.UID(uid), @@ -655,10 +657,174 @@ func TestEmitEvent(t *testing.T) { }, TypeMeta: typeMeta, Status: v1.PodStatus{ + PodIP: podIP, + Phase: kubernetes.PodSucceeded, ContainerStatuses: []kubernetes.PodContainerStatus{ { Name: name, ContainerID: containerID, + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{}, + }, + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: node, + Containers: []kubernetes.Container{ + { + Image: containerImage, + Name: name, + Ports: []v1.ContainerPort{ + { + ContainerPort: 8080, + Name: "port1", + }, + { + ContainerPort: 9090, + Name: "port2", + }, + }, + }, + }, + }, + }, + Expected: []bus.Event{ + { + "start": true, + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + { + "start": true, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, "node": common.MapStr{ + "name": "node", + }, "container": common.MapStr{ + "name": "filebeat", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "runtime": "docker", + "id": "foobar", + }, + }, + "config": []*common.Config{}, + }, + { + "start": true, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, "node": common.MapStr{ + "name": "node", + }, "container": common.MapStr{ + "name": "filebeat", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "id": "foobar", + "runtime": "docker", + }, + }, + "config": []*common.Config{}, + }, + }, + }, + { + Message: "Test pod without host", + Flag: "start", + Pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + TypeMeta: typeMeta, + Status: v1.PodStatus{ + Phase: kubernetes.PodPending, + ContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name, }, }, }, @@ -688,6 +854,7 @@ func TestEmitEvent(t *testing.T) { TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, + Phase: kubernetes.PodPending, ContainerStatuses: []kubernetes.PodContainerStatus{ { Name: name, @@ -738,10 +905,8 @@ func TestEmitEvent(t *testing.T) { Expected: []bus.Event{ { "stop": true, - "host": "", "id": uid, "provider": UUID, - "ports": common.MapStr{}, "kubernetes": common.MapStr{ "pod": common.MapStr{ "name": "filebeat", @@ -768,9 +933,7 @@ func TestEmitEvent(t *testing.T) { }, { "stop": true, - "host": "", "id": cid, - "port": 0, "provider": UUID, "kubernetes": common.MapStr{ "container": common.MapStr{ @@ -844,10 +1007,8 @@ func TestEmitEvent(t *testing.T) { Expected: []bus.Event{ { "stop": true, - "host": "127.0.0.1", "id": uid, "provider": UUID, - "ports": common.MapStr{}, "kubernetes": common.MapStr{ "pod": common.MapStr{ "name": "filebeat", @@ -876,8 +1037,6 @@ func TestEmitEvent(t *testing.T) { }, { "stop": true, - "host": "127.0.0.1", - "port": 0, "id": cid, "provider": UUID, "kubernetes": common.MapStr{ @@ -922,9 +1081,10 @@ func TestEmitEvent(t *testing.T) { }, }, { - Message: "Test stop pod without container id", + // This could be a succeeded pod from a short-living cron job. + Message: "Test succeeded pod stop with multiple ports exposed", Flag: "stop", - Pod: &v1.Pod{ + Pod: &kubernetes.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, UID: types.UID(uid), @@ -935,9 +1095,14 @@ func TestEmitEvent(t *testing.T) { TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, + Phase: kubernetes.PodSucceeded, ContainerStatuses: []kubernetes.PodContainerStatus{ { - Name: name, + Name: name, + ContainerID: containerID, + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{}, + }, }, }, }, @@ -947,6 +1112,16 @@ func TestEmitEvent(t *testing.T) { { Image: containerImage, Name: name, + Ports: []v1.ContainerPort{ + { + ContainerPort: 8080, + Name: "port1", + }, + { + ContainerPort: 9090, + Name: "port2", + }, + }, }, }, }, @@ -954,10 +1129,8 @@ func TestEmitEvent(t *testing.T) { Expected: []bus.Event{ { "stop": true, - "host": "127.0.0.1", "id": uid, "provider": UUID, - "ports": common.MapStr{}, "kubernetes": common.MapStr{ "pod": common.MapStr{ "name": "filebeat", @@ -986,16 +1159,195 @@ func TestEmitEvent(t *testing.T) { }, { "stop": true, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, "node": common.MapStr{ + "name": "node", + }, "container": common.MapStr{ + "name": "filebeat", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "runtime": "docker", + "id": "foobar", + }, + }, + "config": []*common.Config{}, + }, + { + "stop": true, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, "node": common.MapStr{ + "name": "node", + }, "container": common.MapStr{ + "name": "filebeat", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "id": "foobar", + "runtime": "docker", + }, + }, + "config": []*common.Config{}, + }, + }, + }, + { + Message: "Test terminated init container in started common pod", + Flag: "start", + Pod: &kubernetes.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + TypeMeta: typeMeta, + Status: v1.PodStatus{ + PodIP: podIP, + Phase: kubernetes.PodRunning, + InitContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name + "-init", + ContainerID: containerID, + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{}, + }, + }, + }, + ContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name, + ContainerID: containerID, + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: node, + Containers: []kubernetes.Container{ + { + Image: containerImage, + Name: name, + Ports: []v1.ContainerPort{ + { + ContainerPort: 8080, + Name: "http", + }, + }, + }, + }, + InitContainers: []kubernetes.Container{ + { + Image: containerImage, + Name: name + "-init", + }, + }, + }, + }, + Expected: []bus.Event{ + { + "start": true, + "host": "127.0.0.1", + "id": uid, + "ports": common.MapStr{ + "http": int32(8080), + }, + "provider": UUID, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": "127.0.0.1", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": "127.0.0.1", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + { + "start": true, "host": "127.0.0.1", - "port": 0, + "port": int32(8080), "id": cid, "provider": UUID, "kubernetes": common.MapStr{ "container": common.MapStr{ - "id": "", + "id": "foobar", "name": "filebeat", "image": "elastic/filebeat:6.3.0", - "runtime": "", + "runtime": "docker", }, "pod": common.MapStr{ "name": "filebeat", @@ -1023,8 +1375,51 @@ func TestEmitEvent(t *testing.T) { }, "container": common.MapStr{ "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, - "runtime": "", - "id": "", + "id": "foobar", + "runtime": "docker", + }, + }, + "config": []*common.Config{}, + }, + { + "start": true, + "id": cid + "-init", + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat-init", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "ip": podIP, + }, "node": common.MapStr{ + "name": "node", + }, "container": common.MapStr{ + "name": "filebeat-init", + }, + }, + "container": common.MapStr{ + "image": common.MapStr{"name": "elastic/filebeat:6.3.0"}, + "id": "foobar", + "runtime": "docker", }, }, "config": []*common.Config{}, @@ -1045,6 +1440,7 @@ func TestEmitEvent(t *testing.T) { TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, + Phase: kubernetes.PodPending, InitContainerStatuses: []kubernetes.PodContainerStatus{ { Name: name, @@ -1071,7 +1467,6 @@ func TestEmitEvent(t *testing.T) { "host": "127.0.0.1", "id": uid, "provider": UUID, - "ports": common.MapStr{}, "kubernetes": common.MapStr{ "pod": common.MapStr{ "name": "filebeat", @@ -1101,7 +1496,7 @@ func TestEmitEvent(t *testing.T) { { "start": true, "host": "127.0.0.1", - "port": 0, + "port": int32(0), "id": cid, "provider": UUID, "kubernetes": common.MapStr{ @@ -1159,6 +1554,7 @@ func TestEmitEvent(t *testing.T) { TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, + Phase: kubernetes.PodRunning, EphemeralContainerStatuses: []kubernetes.PodContainerStatus{ { Name: name, @@ -1187,7 +1583,6 @@ func TestEmitEvent(t *testing.T) { "host": "127.0.0.1", "id": uid, "provider": UUID, - "ports": common.MapStr{}, "kubernetes": common.MapStr{ "pod": common.MapStr{ "name": "filebeat", @@ -1217,7 +1612,7 @@ func TestEmitEvent(t *testing.T) { { "start": true, "host": "127.0.0.1", - "port": 0, + "port": int32(0), "id": cid, "provider": UUID, "kubernetes": common.MapStr{ @@ -1275,6 +1670,7 @@ func TestEmitEvent(t *testing.T) { TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, + Phase: kubernetes.PodRunning, InitContainerStatuses: []kubernetes.PodContainerStatus{ { Name: name, @@ -1334,7 +1730,6 @@ func TestEmitEvent(t *testing.T) { "host": "127.0.0.1", "id": uid, "provider": UUID, - "ports": common.MapStr{}, "kubernetes": common.MapStr{ "pod": common.MapStr{ "name": "filebeat", @@ -1365,7 +1760,7 @@ func TestEmitEvent(t *testing.T) { { "start": true, "host": "127.0.0.1", - "port": 0, + "port": int32(0), "id": cid, "provider": UUID, "kubernetes": common.MapStr{ @@ -1411,7 +1806,7 @@ func TestEmitEvent(t *testing.T) { { "start": true, "host": "127.0.0.1", - "port": 0, + "port": int32(0), "id": cid + "-init", "provider": UUID, "kubernetes": common.MapStr{ @@ -1457,7 +1852,7 @@ func TestEmitEvent(t *testing.T) { { "start": true, "host": "127.0.0.1", - "port": 0, + "port": int32(0), "id": cid + "-ephemeral", "provider": UUID, "kubernetes": common.MapStr{ @@ -1520,11 +1915,11 @@ func TestEmitEvent(t *testing.T) { pub := &publisher{b: p.bus} pod := &pod{ - metagen: metaGen, - config: defaultConfig(), - publish: pub.publish, - uuid: UUID, - logger: logp.NewLogger("kubernetes.pod"), + metagen: metaGen, + config: defaultConfig(), + publishFunc: pub.publish, + uuid: UUID, + logger: logp.NewLogger("kubernetes.pod"), } p.eventManager = NewMockPodEventerManager(pod) diff --git a/libbeat/common/kubernetes/informer.go b/libbeat/common/kubernetes/informer.go index b51640a248a..b092847dca3 100644 --- a/libbeat/common/kubernetes/informer.go +++ b/libbeat/common/kubernetes/informer.go @@ -153,9 +153,8 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio return nil, "", fmt.Errorf("unsupported resource type for watching %T", resource) } - if indexers != nil { - return cache.NewSharedIndexInformer(listwatch, resource, opts.SyncTimeout, indexers), objType, nil + if indexers == nil { + indexers = cache.Indexers{} } - - return cache.NewSharedInformer(listwatch, resource, opts.SyncTimeout), objType, nil + return cache.NewSharedIndexInformer(listwatch, resource, opts.SyncTimeout, indexers), objType, nil } diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index a7e2a631977..df58cf84a3e 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -86,7 +86,7 @@ type watcher struct { client kubernetes.Interface informer cache.SharedInformer store cache.Store - queue workqueue.RateLimitingInterface + queue workqueue.Interface ctx context.Context stop context.CancelFunc handler ResourceEventHandler @@ -97,16 +97,15 @@ type watcher struct { // resource from the cluster (filtered to the given node) func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { var store cache.Store - var queue workqueue.RateLimitingInterface + var queue workqueue.Interface - informer, objType, err := NewInformer(client, resource, opts, indexers) + informer, _, err := NewInformer(client, resource, opts, indexers) if err != nil { return nil, err } store = informer.GetStore() - queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), objType) - ctx, cancel := context.WithCancel(context.Background()) + queue = workqueue.New() if opts.IsUpdated == nil { opts.IsUpdated = func(o, n interface{}) bool { @@ -121,6 +120,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption } } + ctx, cancel := context.WithCancel(context.TODO()) w := &watcher{ client: client, informer: informer, @@ -216,53 +216,43 @@ func (w *watcher) enqueue(obj interface{}, state string) { // process gets the top of the work queue and processes the object that is received. func (w *watcher) process(ctx context.Context) bool { - keyObj, quit := w.queue.Get() + obj, quit := w.queue.Get() if quit { return false } + defer w.queue.Done(obj) - err := func(obj interface{}) error { - defer w.queue.Done(obj) - - var entry *item - var ok bool - if entry, ok = obj.(*item); !ok { - w.queue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected *item in workqueue but got %#v", obj)) - return nil - } - - key := entry.object.(string) - - o, exists, err := w.store.GetByKey(key) - if err != nil { - return nil - } - if !exists { - if entry.state == delete { - w.logger.Debugf("Object %+v was not found in the store, deleting anyway!", key) - // delete anyway in order to clean states - w.handler.OnDelete(entry.objectRaw) - } - return nil - } - - switch entry.state { - case add: - w.handler.OnAdd(o) - case update: - w.handler.OnUpdate(o) - case delete: - w.handler.OnDelete(o) - } + var entry *item + var ok bool + if entry, ok = obj.(*item); !ok { + utilruntime.HandleError(fmt.Errorf("expected *item in workqueue but got %#v", obj)) + return true + } - return nil - }(keyObj) + key := entry.object.(string) + o, exists, err := w.store.GetByKey(key) if err != nil { - utilruntime.HandleError(err) + utilruntime.HandleError(fmt.Errorf("getting object %#v from cache: %w", obj, err)) + return true + } + if !exists { + if entry.state == delete { + w.logger.Debugf("Object %+v was not found in the store, deleting anyway!", key) + // delete anyway in order to clean states + w.handler.OnDelete(entry.objectRaw) + } return true } + switch entry.state { + case add: + w.handler.OnAdd(o) + case update: + w.handler.OnUpdate(o) + case delete: + w.handler.OnDelete(o) + } + return true }