From 58edbb4e56c17e58995ec76071ef313d2aaad83d Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Thu, 2 Jul 2020 08:29:34 -0700 Subject: [PATCH] Fix ordering and duplicate configs on autodiscover (#19317) #18979 introduced a pod level event which is generated after all container events. The ordering is wrong in that pod events are sent last which would generate a valid event similar to container events. The ordering needs to be pod first and container events next so that pod events dont override valid container events. One other issue was that the pod level hint generates a single config with all hosts and it wont get over written by container hints causing more than one config to be spun up for the same hint (one with a container meta and one without). --- .../autodiscover/providers/kubernetes/pod.go | 19 +- .../providers/kubernetes/pod_test.go | 521 +++++++++++++++--- .../autodiscover/builder/hints/metrics.go | 39 +- .../builder/hints/metrics_test.go | 62 ++- 4 files changed, 538 insertions(+), 103 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 649c372bfd6..30ed913060b 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -284,6 +284,7 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet var ( annotations = common.MapStr{} nsAnn = common.MapStr{} + events = make([]bus.Event, 0) ) for k, v := range pod.GetObjectMeta().GetAnnotations() { safemapstr.Put(annotations, k, v) @@ -299,7 +300,6 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet } } - emitted := 0 // Emit container and port information for _, c := range containers { // If it doesn't have an ID, container doesn't exist in @@ -345,8 +345,7 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet "kubernetes": meta, }, } - p.publish(event) - emitted++ + events = append(events, event) } for _, port := range c.Ports { @@ -361,16 +360,16 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet "kubernetes": meta, }, } - p.publish(event) - emitted++ + events = append(events, event) } } - // Finally publish a pod level event so that hints that have no exposed ports can get processed. + // Publish a pod level event so that hints that have no exposed ports can get processed. // Log hints would just ignore this event as there is no ${data.container.id} - // Publish the pod level hint only if atleast one container level hint was emitted. This ensures that there is + // Publish the pod level hint only if at least one container level hint was generated. This ensures that there is // no unnecessary pod level events emitted prematurely. - if emitted != 0 { + // We publish the pod level hint first so that it doesn't override a valid container level event. + if len(events) != 0 { meta := p.metagen.Generate(pod) // Information that can be used in discovering a workload @@ -392,6 +391,10 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet }, } p.publish(event) + } + // Publish the container level hints finally. + for _, event := range events { + p.publish(event) } } diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 7e5d51b23b0..2dacee2b9fc 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -353,7 +353,7 @@ func TestEmitEvent(t *testing.T) { Message string Flag string Pod *kubernetes.Pod - Expected bus.Event + Expected []bus.Event }{ { Message: "Test common pod start", @@ -389,44 +389,227 @@ func TestEmitEvent(t *testing.T) { }, }, }, - Expected: bus.Event{ - "start": true, - "host": "127.0.0.1", - "port": 0, - "id": cid, - "provider": UUID, - "kubernetes": common.MapStr{ - "container": common.MapStr{ - "id": "foobar", - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - "runtime": "docker", + Expected: []bus.Event{ + { + "start": true, + "host": "127.0.0.1", + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, }, - "pod": common.MapStr{ - "name": "filebeat", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, }, - "node": common.MapStr{ - "name": "node", + "config": []*common.Config{}, + }, + { + "start": true, + "host": "127.0.0.1", + "port": 0, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, }, - "namespace": "default", - "annotations": common.MapStr{}, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + }, + }, + { + Message: "Test common pod start with multiple ports exposed", + Flag: "start", + Pod: &kubernetes.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, }, - "meta": common.MapStr{ + TypeMeta: typeMeta, + Status: v1.PodStatus{ + PodIP: podIP, + ContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name, + ContainerID: containerID, + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: node, + Containers: []kubernetes.Container{ + { + Image: containerImage, + Name: name, + Ports: []v1.ContainerPort{ + { + ContainerPort: 8080, + }, + { + ContainerPort: 9090, + }, + }, + }, + }, + }, + }, + Expected: []bus.Event{ + { + "start": true, + "host": "127.0.0.1", + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + { + "start": true, + "host": "127.0.0.1", + "port": int32(8080), + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + { + "start": true, + "host": "127.0.0.1", + "port": int32(9090), + "id": cid, + "provider": UUID, "kubernetes": common.MapStr{ - "namespace": "default", "container": common.MapStr{ - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - }, "pod": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ "name": "filebeat", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, "node": common.MapStr{ + }, + "node": common.MapStr{ "name": "node", }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, }, + "config": []*common.Config{}, }, - "config": []*common.Config{}, }, }, { @@ -522,44 +705,75 @@ func TestEmitEvent(t *testing.T) { }, }, }, - Expected: bus.Event{ - "stop": true, - "host": "", - "id": cid, - "port": 0, - "provider": UUID, - "kubernetes": common.MapStr{ - "container": common.MapStr{ - "id": "", - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - "runtime": "", - }, - "pod": common.MapStr{ - "name": "filebeat", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + Expected: []bus.Event{ + { + "stop": true, + "host": "", + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, }, - "node": common.MapStr{ - "name": "node", + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, }, - "namespace": "default", - "annotations": common.MapStr{}, + "config": []*common.Config{}, }, - "meta": common.MapStr{ + { + "stop": true, + "host": "", + "id": cid, + "port": 0, + "provider": UUID, "kubernetes": common.MapStr{ - "namespace": "default", "container": common.MapStr{ - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - }, "pod": common.MapStr{ + "id": "", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "", + }, + "pod": common.MapStr{ "name": "filebeat", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, "node": common.MapStr{ + }, + "node": common.MapStr{ "name": "node", }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, }, + "config": []*common.Config{}, }, - "config": []*common.Config{}, }, }, { @@ -592,44 +806,176 @@ func TestEmitEvent(t *testing.T) { }, }, }, - Expected: bus.Event{ - "stop": true, - "host": "127.0.0.1", - "port": 0, - "id": cid, - "provider": UUID, - "kubernetes": common.MapStr{ - "container": common.MapStr{ - "id": "", - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - "runtime": "", + Expected: []bus.Event{ + { + "stop": true, + "host": "127.0.0.1", + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, }, - "pod": common.MapStr{ - "name": "filebeat", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + { + "stop": true, + "host": "127.0.0.1", + "port": 0, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, }, - "node": common.MapStr{ - "name": "node", + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, }, - "namespace": "default", - "annotations": common.MapStr{}, + "config": []*common.Config{}, }, - "meta": common.MapStr{ + }, + }, + { + Message: "Test stop pod without container id", + Flag: "stop", + Pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + TypeMeta: typeMeta, + Status: v1.PodStatus{ + PodIP: podIP, + ContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name, + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: node, + Containers: []kubernetes.Container{ + { + Image: containerImage, + Name: name, + }, + }, + }, + }, + Expected: []bus.Event{ + { + "stop": true, + "host": "127.0.0.1", + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + { + "stop": true, + "host": "127.0.0.1", + "port": 0, + "id": cid, + "provider": UUID, "kubernetes": common.MapStr{ - "namespace": "default", "container": common.MapStr{ - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - }, "pod": common.MapStr{ + "id": "", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "", + }, + "pod": common.MapStr{ "name": "filebeat", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, "node": common.MapStr{ + }, + "node": common.MapStr{ "name": "node", }, + "namespace": "default", + "annotations": common.MapStr{}, }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, }, - "config": []*common.Config{}, }, }, } @@ -663,14 +1009,17 @@ func TestEmitEvent(t *testing.T) { pod.emit(test.Pod, test.Flag) - select { - case event := <-listener.Events(): - assert.Equal(t, test.Expected, event, test.Message) - case <-time.After(2 * time.Second): - if test.Expected != nil { - t.Fatal("Timeout while waiting for event") + for i := 0; i < len(test.Expected); i++ { + select { + case event := <-listener.Events(): + assert.Equal(t, test.Expected[i], event, test.Message) + case <-time.After(2 * time.Second): + if test.Expected != nil { + t.Fatal("Timeout while waiting for event") + } } } + }) } } diff --git a/metricbeat/autodiscover/builder/hints/metrics.go b/metricbeat/autodiscover/builder/hints/metrics.go index 52eba34a1ff..37ec6f150e0 100644 --- a/metricbeat/autodiscover/builder/hints/metrics.go +++ b/metricbeat/autodiscover/builder/hints/metrics.go @@ -133,7 +133,6 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c moduleConfig := common.MapStr{ "module": mod, "metricsets": msets, - "hosts": hosts, "timeout": tout, "period": ival, "enabled": true, @@ -154,15 +153,30 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c moduleConfig["password"] = password } - logp.Debug("hints.builder", "generated config: %v", moduleConfig) + // If there are hosts that match, ensure that there is a module config for each valid host. + // We do this because every config that is from a Pod that has an exposed port will generate a valid + // module config. However, the pod level hint will generate a config with all hosts that are defined in the + // config. To make sure that these pod level configs get deduped, it is essential that we generate exactly one + // module config per host. + if len(hosts) != 0 { + for _, h := range hosts { + mod := moduleConfig.Clone() + mod["hosts"] = []string{h} + + logp.Debug("hints.builder", "generated config: %v", mod) + + // Create config object + cfg := m.generateConfig(mod) + configs = append(configs, cfg) + } + } else { + logp.Debug("hints.builder", "generated config: %v", moduleConfig) - // Create config object - cfg, err := common.NewConfigFrom(moduleConfig) - if err != nil { - logp.Debug("hints.builder", "config merge failed with error: %v", err) + // Create config object + cfg := m.generateConfig(moduleConfig) + configs = append(configs, cfg) } - logp.Debug("hints.builder", "generated config: %+v", common.DebugString(cfg, true)) - configs = append(configs, cfg) + } // Apply information in event to the template to generate the final config @@ -171,6 +185,15 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c return template.ApplyConfigTemplate(event, configs, options...) } +func (m *metricHints) generateConfig(mod common.MapStr) *common.Config { + cfg, err := common.NewConfigFrom(mod) + if err != nil { + logp.Debug("hints.builder", "config merge failed with error: %v", err) + } + logp.Debug("hints.builder", "generated config: %+v", common.DebugString(cfg, true)) + return cfg +} + func (m *metricHints) getModule(hints common.MapStr) string { return builder.GetHintString(hints, m.Key, module) } diff --git a/metricbeat/autodiscover/builder/hints/metrics_test.go b/metricbeat/autodiscover/builder/hints/metrics_test.go index d2d0462a047..4badd9c2a02 100644 --- a/metricbeat/autodiscover/builder/hints/metrics_test.go +++ b/metricbeat/autodiscover/builder/hints/metrics_test.go @@ -409,6 +409,66 @@ func TestGenerateHints(t *testing.T) { }, }, }, + { + message: "Module with multiple hosts returns the right number of hints. Pod level hints need to be one per host", + event: bus.Event{ + "host": "1.2.3.4", + "hints": common.MapStr{ + "metrics": common.MapStr{ + "module": "mockmoduledefaults", + "namespace": "test", + "hosts": "${data.host}:9090, ${data.host}:9091", + }, + }, + }, + len: 2, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + }, + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9091"}, + }, + }, + }, + { + message: "Module with multiple hosts and an exposed port creates a config for just the exposed port", + event: bus.Event{ + "host": "1.2.3.4", + "port": 9091, + "hints": common.MapStr{ + "metrics": common.MapStr{ + "module": "mockmoduledefaults", + "namespace": "test", + "hosts": "${data.host}:9090, ${data.host}:9091", + }, + }, + }, + len: 1, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9091"}, + }, + }, + }, } for _, test := range tests { mockRegister := mb.NewRegister() @@ -423,7 +483,7 @@ func TestGenerateHints(t *testing.T) { logger: logp.NewLogger("hints.builder"), } cfgs := m.CreateConfig(test.event) - assert.Equal(t, len(cfgs), test.len) + assert.Equal(t, len(cfgs), test.len, test.message) // The check below helps skipping config validation if there is no config supposed to be emitted. if len(cfgs) == 0 {