diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6b565987892..8eebaa31137 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -365,6 +365,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Added Cisco Meraki module {pull}40836[40836] - Added Palo Alto Networks module {pull}40686[40686] - Restore docker.network.in.* and docker.network.out.* fields in docker module {pull}40968[40968] +- Only watch metadata for ReplicaSets in metricbeat k8s module {pull}41289[41289] *Metricbeat* diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index c17f5ba9718..5844c555c88 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -25,7 +25,12 @@ import ( "sync" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/apimachinery/pkg/runtime/schema" + k8sclient "k8s.io/client-go/kubernetes" + k8sclientmeta "k8s.io/client-go/metadata" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" @@ -304,6 +309,7 @@ func createWatcher( resource kubernetes.Resource, options kubernetes.WatchOptions, client k8sclient.Interface, + metadataClient k8sclientmeta.Interface, resourceWatchers *Watchers, namespace string, extraWatcher bool) (bool, error) { @@ -355,9 +361,27 @@ func createWatcher( if isNamespaced(resourceName) { options.Namespace = namespace } - watcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil) + var ( + watcher kubernetes.Watcher + err error + ) + switch resource.(type) { + // use a metadata informer for ReplicaSets, as we only need their metadata + case *kubernetes.ReplicaSet: + watcher, err = kubernetes.NewNamedMetadataWatcher( + "resource_metadata_enricher_rs", + client, + metadataClient, + schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}, + options, + nil, + transformReplicaSetMetadata, + ) + default: + watcher, err = kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil) + } if err != nil { - return false, err + return false, fmt.Errorf("error creating watcher for %T: %w", resource, err) } resourceMetaWatcher = &metaWatcher{ @@ -450,6 +474,7 @@ func removeFromMetricsetsUsing(resourceName string, notUsingName string, resourc // createAllWatchers creates all the watchers required by a metricset func createAllWatchers( client k8sclient.Interface, + metadataClient k8sclientmeta.Interface, metricsetName string, resourceName string, nodeScope bool, @@ -469,7 +494,7 @@ func createAllWatchers( // Create the main watcher for the given resource. // For example pod metricset's main watcher will be pod watcher. // If it fails, we return an error, so we can stop the extra watchers from creating. - created, err := createWatcher(resourceName, res, *options, client, resourceWatchers, config.Namespace, false) + created, err := createWatcher(resourceName, res, *options, client, metadataClient, resourceWatchers, config.Namespace, false) if err != nil { return fmt.Errorf("error initializing Kubernetes watcher %s, required by %s: %w", resourceName, metricsetName, err) } else if created { @@ -484,7 +509,7 @@ func createAllWatchers( for _, extra := range extraWatchers { extraRes := getResource(extra) if extraRes != nil { - created, err = createWatcher(extra, extraRes, *options, client, resourceWatchers, config.Namespace, true) + created, err = createWatcher(extra, extraRes, *options, client, metadataClient, resourceWatchers, config.Namespace, true) if err != nil { log.Errorf("Error initializing Kubernetes watcher %s, required by %s: %s", extra, metricsetName, err) } else { @@ -620,11 +645,16 @@ func NewResourceMetadataEnricher( log.Errorf("Error creating Kubernetes client: %s", err) return &nilEnricher{} } + metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions) + if err != nil { + log.Errorf("Error creating Kubernetes client: %s", err) + return &nilEnricher{} + } metricsetName := base.Name() resourceName := getResourceName(metricsetName) // Create all watchers needed for this metricset - err = createAllWatchers(client, metricsetName, resourceName, nodeScope, config, log, resourceWatchers) + err = createAllWatchers(client, metadataClient, metricsetName, resourceName, nodeScope, config, log, resourceWatchers) if err != nil { log.Errorf("Error starting the watchers: %s", err) return &nilEnricher{} @@ -659,61 +689,7 @@ func NewResourceMetadataEnricher( // It is responsible for generating the metadata for a detected resource by executing the metadata generators Generate method. // It is a common handler for all resource watchers. The kind of resource(e.g. pod or deployment) is checked inside the function. // It returns a map of a resource identifier(i.e. namespace-resource_name) as key and the metadata as value. - updateFunc := func(r kubernetes.Resource) map[string]mapstr.M { - accessor, _ := meta.Accessor(r) - id := accessor.GetName() - namespace := accessor.GetNamespace() - if namespace != "" { - id = join(namespace, id) - } - - switch r := r.(type) { - case *kubernetes.Pod: - return map[string]mapstr.M{id: specificMetaGen.Generate(r)} - - case *kubernetes.Node: - nodeName := r.GetObjectMeta().GetName() - metrics := NewNodeMetrics() - if cpu, ok := r.Status.Capacity["cpu"]; ok { - if q, err := resource.ParseQuantity(cpu.String()); err == nil { - metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000) - } - } - if memory, ok := r.Status.Capacity["memory"]; ok { - if q, err := resource.ParseQuantity(memory.String()); err == nil { - metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value())) - } - } - nodeStore, _ := metricsRepo.AddNodeStore(nodeName) - nodeStore.SetNodeMetrics(metrics) - - return map[string]mapstr.M{id: generalMetaGen.Generate(NodeResource, r)} - case *kubernetes.Deployment: - return map[string]mapstr.M{id: generalMetaGen.Generate(DeploymentResource, r)} - case *kubernetes.Job: - return map[string]mapstr.M{id: generalMetaGen.Generate(JobResource, r)} - case *kubernetes.CronJob: - return map[string]mapstr.M{id: generalMetaGen.Generate(CronJobResource, r)} - case *kubernetes.Service: - return map[string]mapstr.M{id: specificMetaGen.Generate(r)} - case *kubernetes.StatefulSet: - return map[string]mapstr.M{id: generalMetaGen.Generate(StatefulSetResource, r)} - case *kubernetes.Namespace: - return map[string]mapstr.M{id: generalMetaGen.Generate(NamespaceResource, r)} - case *kubernetes.ReplicaSet: - return map[string]mapstr.M{id: generalMetaGen.Generate(ReplicaSetResource, r)} - case *kubernetes.DaemonSet: - return map[string]mapstr.M{id: generalMetaGen.Generate(DaemonSetResource, r)} - case *kubernetes.PersistentVolume: - return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeResource, r)} - case *kubernetes.PersistentVolumeClaim: - return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeClaimResource, r)} - case *kubernetes.StorageClass: - return map[string]mapstr.M{id: generalMetaGen.Generate(StorageClassResource, r)} - default: - return map[string]mapstr.M{id: generalMetaGen.Generate(r.GetObjectKind().GroupVersionKind().Kind, r)} - } - } + updateFunc := getEventMetadataFunc(log, generalMetaGen, specificMetaGen, metricsRepo) // deleteFunc to be used as the resource watcher's delete handler. // The deleteFunc is executed when a watcher is triggered for a resource deletion(e.g. pod deleted). @@ -797,10 +773,15 @@ func NewContainerMetadataEnricher( log.Errorf("Error creating Kubernetes client: %s", err) return &nilEnricher{} } + metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions) + if err != nil { + log.Errorf("Error creating Kubernetes client: %s", err) + return &nilEnricher{} + } metricsetName := base.Name() - err = createAllWatchers(client, metricsetName, PodResource, nodeScope, config, log, resourceWatchers) + err = createAllWatchers(client, metadataClient, metricsetName, PodResource, nodeScope, config, log, resourceWatchers) if err != nil { log.Errorf("Error starting the watchers: %s", err) return &nilEnricher{} @@ -1231,3 +1212,87 @@ func AddClusterECSMeta(base mb.BaseMetricSet) mapstr.M { } return ecsClusterMeta } + +// transformReplicaSetMetadata ensures that the PartialObjectMetadata resources we get from a metadata watcher +// can be correctly interpreted by the update function returned by getEventMetadataFunc. +// This really just involves adding missing type information. +func transformReplicaSetMetadata(obj interface{}) (interface{}, error) { + old, ok := obj.(*metav1.PartialObjectMetadata) + if !ok { + return nil, fmt.Errorf("obj of type %T neither a ReplicaSet nor a PartialObjectMetadata", obj) + } + old.TypeMeta = metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "ReplicaSet", + } + return old, nil +} + +// getEventMetadataFunc returns a function that takes a kubernetes Resource as an argument and returns metadata +// that can directly be used for event enrichment. +// This function is intended to be used as the resource watchers add and update handler. +func getEventMetadataFunc( + logger *logp.Logger, + generalMetaGen *metadata.Resource, + specificMetaGen metadata.MetaGen, + metricsRepo *MetricsRepo, +) func(r kubernetes.Resource) map[string]mapstr.M { + return func(r kubernetes.Resource) map[string]mapstr.M { + accessor, accErr := meta.Accessor(r) + if accErr != nil { + logger.Errorf("Error creating accessor: %s", accErr) + } + id := accessor.GetName() + namespace := accessor.GetNamespace() + if namespace != "" { + id = join(namespace, id) + } + + switch r := r.(type) { + case *kubernetes.Pod: + return map[string]mapstr.M{id: specificMetaGen.Generate(r)} + + case *kubernetes.Node: + nodeName := r.GetObjectMeta().GetName() + metrics := NewNodeMetrics() + if cpu, ok := r.Status.Capacity["cpu"]; ok { + if q, err := resource.ParseQuantity(cpu.String()); err == nil { + metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000) + } + } + if memory, ok := r.Status.Capacity["memory"]; ok { + if q, err := resource.ParseQuantity(memory.String()); err == nil { + metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value())) + } + } + nodeStore, _ := metricsRepo.AddNodeStore(nodeName) + nodeStore.SetNodeMetrics(metrics) + + return map[string]mapstr.M{id: generalMetaGen.Generate(NodeResource, r)} + case *kubernetes.Deployment: + return map[string]mapstr.M{id: generalMetaGen.Generate(DeploymentResource, r)} + case *kubernetes.Job: + return map[string]mapstr.M{id: generalMetaGen.Generate(JobResource, r)} + case *kubernetes.CronJob: + return map[string]mapstr.M{id: generalMetaGen.Generate(CronJobResource, r)} + case *kubernetes.Service: + return map[string]mapstr.M{id: specificMetaGen.Generate(r)} + case *kubernetes.StatefulSet: + return map[string]mapstr.M{id: generalMetaGen.Generate(StatefulSetResource, r)} + case *kubernetes.Namespace: + return map[string]mapstr.M{id: generalMetaGen.Generate(NamespaceResource, r)} + case *kubernetes.ReplicaSet: + return map[string]mapstr.M{id: generalMetaGen.Generate(ReplicaSetResource, r)} + case *kubernetes.DaemonSet: + return map[string]mapstr.M{id: generalMetaGen.Generate(DaemonSetResource, r)} + case *kubernetes.PersistentVolume: + return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeResource, r)} + case *kubernetes.PersistentVolumeClaim: + return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeClaimResource, r)} + case *kubernetes.StorageClass: + return map[string]mapstr.M{id: generalMetaGen.Generate(StorageClassResource, r)} + default: + return map[string]mapstr.M{id: generalMetaGen.Generate(r.GetObjectKind().GroupVersionKind().Kind, r)} + } + } +} diff --git a/metricbeat/module/kubernetes/util/kubernetes_test.go b/metricbeat/module/kubernetes/util/kubernetes_test.go index 703035d5c38..ec2309b08bf 100644 --- a/metricbeat/module/kubernetes/util/kubernetes_test.go +++ b/metricbeat/module/kubernetes/util/kubernetes_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -40,6 +42,7 @@ import ( "github.com/stretchr/testify/require" k8sfake "k8s.io/client-go/kubernetes/fake" + k8smetafake "k8s.io/client-go/metadata/fake" "github.com/elastic/elastic-agent-autodiscover/kubernetes" "github.com/elastic/elastic-agent-libs/logp" @@ -70,6 +73,7 @@ func TestCreateWatcher(t *testing.T) { resourceWatchers := NewWatchers() client := k8sfake.NewSimpleClientset() + metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) config := &kubernetesConfig{ Namespace: "test-ns", SyncPeriod: time.Minute, @@ -80,7 +84,7 @@ func TestCreateWatcher(t *testing.T) { options, err := getWatchOptions(config, false, client, log) require.NoError(t, err) - created, err := createWatcher(NamespaceResource, &kubernetes.Node{}, *options, client, resourceWatchers, config.Namespace, false) + created, err := createWatcher(NamespaceResource, &kubernetes.Node{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false) require.True(t, created) require.NoError(t, err) @@ -90,7 +94,7 @@ func TestCreateWatcher(t *testing.T) { require.NotNil(t, resourceWatchers.metaWatchersMap[NamespaceResource].watcher) resourceWatchers.lock.Unlock() - created, err = createWatcher(NamespaceResource, &kubernetes.Namespace{}, *options, client, resourceWatchers, config.Namespace, true) + created, err = createWatcher(NamespaceResource, &kubernetes.Namespace{}, *options, client, metadataClient, resourceWatchers, config.Namespace, true) require.False(t, created) require.NoError(t, err) @@ -100,7 +104,7 @@ func TestCreateWatcher(t *testing.T) { require.NotNil(t, resourceWatchers.metaWatchersMap[NamespaceResource].watcher) resourceWatchers.lock.Unlock() - created, err = createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, resourceWatchers, config.Namespace, false) + created, err = createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false) require.True(t, created) require.NoError(t, err) @@ -115,6 +119,7 @@ func TestAddToMetricsetsUsing(t *testing.T) { resourceWatchers := NewWatchers() client := k8sfake.NewSimpleClientset() + metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) config := &kubernetesConfig{ Namespace: "test-ns", SyncPeriod: time.Minute, @@ -126,7 +131,7 @@ func TestAddToMetricsetsUsing(t *testing.T) { require.NoError(t, err) // Create the new entry with watcher and nil string array first - created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, resourceWatchers, config.Namespace, false) + created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false) require.True(t, created) require.NoError(t, err) @@ -152,6 +157,7 @@ func TestRemoveFromMetricsetsUsing(t *testing.T) { resourceWatchers := NewWatchers() client := k8sfake.NewSimpleClientset() + metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) config := &kubernetesConfig{ Namespace: "test-ns", SyncPeriod: time.Minute, @@ -163,7 +169,7 @@ func TestRemoveFromMetricsetsUsing(t *testing.T) { require.NoError(t, err) // Create the new entry with watcher and nil string array first - created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, resourceWatchers, config.Namespace, false) + created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false) require.True(t, created) require.NoError(t, err) @@ -192,6 +198,7 @@ func TestCreateAllWatchers(t *testing.T) { resourceWatchers := NewWatchers() client := k8sfake.NewSimpleClientset() + metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) config := &kubernetesConfig{ Namespace: "test-ns", SyncPeriod: time.Minute, @@ -204,7 +211,7 @@ func TestCreateAllWatchers(t *testing.T) { log := logp.NewLogger("test") // Start watchers based on a resource that does not exist should cause an error - err := createAllWatchers(client, "does-not-exist", "does-not-exist", false, config, log, resourceWatchers) + err := createAllWatchers(client, metadataClient, "does-not-exist", "does-not-exist", false, config, log, resourceWatchers) require.Error(t, err) resourceWatchers.lock.Lock() require.Equal(t, 0, len(resourceWatchers.metaWatchersMap)) @@ -213,7 +220,7 @@ func TestCreateAllWatchers(t *testing.T) { // Start watcher for a resource that requires other resources, should start all the watchers metricsetPod := "pod" extras := getExtraWatchers(PodResource, config.AddResourceMetadata) - err = createAllWatchers(client, metricsetPod, PodResource, false, config, log, resourceWatchers) + err = createAllWatchers(client, metadataClient, metricsetPod, PodResource, false, config, log, resourceWatchers) require.NoError(t, err) // Check that all the required watchers are in the map @@ -244,6 +251,7 @@ func TestCreateMetaGen(t *testing.T) { }, } client := k8sfake.NewSimpleClientset() + metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) _, err = createMetadataGen(client, commonConfig, config.AddResourceMetadata, DeploymentResource, resourceWatchers) // At this point, no watchers were created @@ -251,7 +259,7 @@ func TestCreateMetaGen(t *testing.T) { // Create the watchers necessary for the metadata generator metricsetDeployment := "state_deployment" - err = createAllWatchers(client, metricsetDeployment, DeploymentResource, false, config, log, resourceWatchers) + err = createAllWatchers(client, metadataClient, metricsetDeployment, DeploymentResource, false, config, log, resourceWatchers) require.NoError(t, err) // Create the generators, this time without error @@ -284,6 +292,7 @@ func TestCreateMetaGenSpecific(t *testing.T) { }, } client := k8sfake.NewSimpleClientset() + metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) // For pod: metricsetPod := "pod" @@ -293,7 +302,7 @@ func TestCreateMetaGenSpecific(t *testing.T) { require.Error(t, err) // Create the pod resource + the extras - err = createAllWatchers(client, metricsetPod, PodResource, false, config, log, resourceWatchers) + err = createAllWatchers(client, metadataClient, metricsetPod, PodResource, false, config, log, resourceWatchers) require.NoError(t, err) _, err = createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, PodResource, resourceWatchers) @@ -306,7 +315,7 @@ func TestCreateMetaGenSpecific(t *testing.T) { // Create the service resource + the extras metricsetService := "state_service" - err = createAllWatchers(client, metricsetService, ServiceResource, false, config, log, resourceWatchers) + err = createAllWatchers(client, metadataClient, metricsetService, ServiceResource, false, config, log, resourceWatchers) require.NoError(t, err) _, err = createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, ServiceResource, resourceWatchers) @@ -592,6 +601,144 @@ func TestBuildMetadataEnricher_EventHandler(t *testing.T) { resourceWatchers.lock.Unlock() } +func TestBuildMetadataEnricher_PartialMetadata(t *testing.T) { + resourceWatchers := NewWatchers() + + resourceWatchers.lock.Lock() + watcher := &metaWatcher{ + watcher: &mockWatcher{ + store: cache.NewStore(cache.MetaNamespaceKeyFunc), + }, + started: false, + metricsetsUsing: []string{"replicaset"}, + enrichers: make(map[string]*enricher), + } + resourceWatchers.metaWatchersMap[ReplicaSetResource] = watcher + addEventHandlerToWatcher(watcher, resourceWatchers) + resourceWatchers.lock.Unlock() + + isController := true + resource := &metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID("mockuid"), + Name: "enrich", + Labels: map[string]string{ + "label": "value", + }, + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "enrich_deployment", + Controller: &isController, + }, + }, + }, + } + + config := &kubernetesConfig{ + Namespace: "test-ns", + SyncPeriod: time.Minute, + Node: "test-node", + AddResourceMetadata: &metadata.AddResourceMetadataConfig{ + CronJob: false, + Deployment: true, + }, + } + + metricset := "replicaset" + log := logp.NewLogger(selector) + + commonMetaConfig := metadata.Config{} + commonConfig, _ := conf.NewConfigFrom(&commonMetaConfig) + client := k8sfake.NewSimpleClientset() + generalMetaGen := metadata.NewResourceMetadataGenerator(commonConfig, client) + + updateFunc := getEventMetadataFunc(log, generalMetaGen, nil, nil) + + deleteFunc := func(r kubernetes.Resource) []string { + accessor, _ := meta.Accessor(r) + id := accessor.GetName() + namespace := accessor.GetNamespace() + if namespace != "" { + id = join(namespace, id) + } + return []string{id} + } + + indexFunc := func(e mapstr.M) string { + name := getString(e, "name") + namespace := getString(e, mb.ModuleDataKey+".namespace") + var id string + if name != "" && namespace != "" { + id = join(namespace, name) + } else if namespace != "" { + id = namespace + } else { + id = name + } + return id + } + + enricher := buildMetadataEnricher(metricset, ReplicaSetResource, resourceWatchers, config, + updateFunc, deleteFunc, indexFunc, log) + + enricher.Start(resourceWatchers) + resourceWatchers.lock.Lock() + require.True(t, watcher.started) + resourceWatchers.lock.Unlock() + + // manually run the transform function here, just like the actual informer + transformed, err := transformReplicaSetMetadata(resource) + require.NoError(t, err) + watcher.watcher.GetEventHandler().OnAdd(transformed) + err = watcher.watcher.Store().Add(transformed) + require.NoError(t, err) + + // Test enricher + events := []mapstr.M{ + // {"name": "unknown"}, + {"name": resource.Name, mb.ModuleDataKey + ".namespace": resource.Namespace}, + } + enricher.Enrich(events) + + require.Equal(t, []mapstr.M{ + // {"name": "unknown"}, + { + "name": "enrich", + "_module": mapstr.M{ + "labels": mapstr.M{"label": "value"}, + "replicaset": mapstr.M{"name": "enrich", "uid": "mockuid"}, + "namespace": resource.Namespace, + "deployment": mapstr.M{ + "name": "enrich_deployment", + }, + }, + mb.ModuleDataKey + ".namespace": resource.Namespace, + "meta": mapstr.M{}, + }, + }, events) + + watcher.watcher.GetEventHandler().OnDelete(resource) + err = watcher.watcher.Store().Delete(resource) + require.NoError(t, err) + + events = []mapstr.M{ + {"name": "enrich"}, + } + enricher.Enrich(events) + + require.Equal(t, []mapstr.M{ + {"name": "enrich"}, + }, events) + + enricher.Stop(resourceWatchers) + resourceWatchers.lock.Lock() + require.False(t, watcher.started) + resourceWatchers.lock.Unlock() +} + func TestGetWatcherStoreKeyFromMetadataKey(t *testing.T) { t.Run("global resource", func(t *testing.T) { assert.Equal(t, "name", getWatcherStoreKeyFromMetadataKey("name"))