diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4998320b6442..94102ba4127d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -109,6 +109,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix Azure Monitor 429 error by causing metricbeat to retry the request again. {pull}38294[38294] - Fix fields not being parsed correctly in postgresql/database {issue}25301[25301] {pull}37720[37720] - rabbitmq/queue - Change the mapping type of `rabbitmq.queue.consumers.utilisation.pct` to `scaled_float` from `long` because the values fall within the range of `[0.0, 1.0]`. Previously, conversion to integer resulted in reporting either `0` or `1`. +- Fix Kubernetes metadata sometimes not being present after startup {pull}41216[41216] *Osquerybeat* diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index f94d424ec0e4..4a95dd683626 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -20,6 +20,7 @@ package util import ( "errors" "fmt" + "maps" "strings" "sync" "time" @@ -39,6 +40,10 @@ import ( "github.com/elastic/beats/v7/metricbeat/mb" ) +// Resource metadata keys are composed of multiple parts - usually just the namespace and name. This string is the +// separator between the parts when treating the key as a single string. +const resourceMetadataKeySeparator = "/" + type kubernetesConfig struct { KubeConfig string `config:"kube_config"` KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"` @@ -67,12 +72,13 @@ type Enricher interface { type enricher struct { sync.RWMutex - metadata map[string]mapstr.M + metadataCache map[string]mapstr.M index func(mapstr.M) string updateFunc func(kubernetes.Resource) map[string]mapstr.M deleteFunc func(kubernetes.Resource) []string metricsetName string resourceName string + watcher *metaWatcher isPod bool config *kubernetesConfig log *logp.Logger @@ -90,8 +96,7 @@ type metaWatcher struct { metricsetsUsing []string // list of metricsets using this shared watcher(e.g. pod, container, state_pod) - enrichers map[string]*enricher // map of enrichers using this watcher. The key is the metricset name. Each metricset has its own enricher - metadataObjects map[string]bool // representation of a set of ids(in the form of namespace_name-resource_name) of each object received by the watcher's handler functions + enrichers map[string]*enricher // map of enrichers using this watcher. The key is the metricset name. Each metricset has its own enricher nodeScope bool // whether this watcher should watch for resources in current node or in whole cluster restartWatcher kubernetes.Watcher // whether this watcher needs a restart. Only relevant in leader nodes due to metricsets with different nodescope(pod, state_pod) @@ -179,10 +184,10 @@ func getExtraWatchers(resourceName string, addResourceMetadata *metadata.AddReso // in order to be able to retrieve 2nd layer Owner metadata like in case of: // Deployment -> Replicaset -> Pod // CronJob -> job -> Pod - if addResourceMetadata != nil && addResourceMetadata.Deployment { + if addResourceMetadata.Deployment { extra = append(extra, ReplicaSetResource) } - if addResourceMetadata != nil && addResourceMetadata.CronJob { + if addResourceMetadata.CronJob { extra = append(extra, JobResource) } return extra @@ -320,47 +325,82 @@ func createWatcher( // Check if a watcher for the specific resource already exists. resourceMetaWatcher, ok := resourceWatchers.metaWatchersMap[resourceName] - // If it does not exist, create the resourceMetaWatcher. - if !ok { - // Check if we need to add namespace to the watcher's options. - if isNamespaced(resourceName) { - options.Namespace = namespace - } - watcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil) - if err != nil { - return false, err - } - resourceWatchers.metaWatchersMap[resourceName] = &metaWatcher{ - watcher: watcher, - started: false, // not started yet - metadataObjects: make(map[string]bool), - enrichers: make(map[string]*enricher), - metricsetsUsing: make([]string, 0), - restartWatcher: nil, - nodeScope: nodeScope, - } - return true, nil - } else if resourceMetaWatcher.nodeScope != nodeScope && resourceMetaWatcher.nodeScope { - // It might happen that the watcher already exists, but is only being used to monitor the resources - // of a single node(e.g. created by pod metricset). In that case, we need to check if we are trying to create a new watcher that will track - // the resources of whole cluster(e.g. in case of state_pod metricset). - // If it is the case, then we need to update the watcher by changing its watch options (removing options.Node) - // A running watcher cannot be updated directly. Instead, we must create a new one with the correct watch options. - // The new restartWatcher must be identical to the old watcher, including the same handler function, with the only difference being the watch options. - - if isNamespaced(resourceName) { - options.Namespace = namespace + // If the watcher exists, exit + if ok { + if resourceMetaWatcher.nodeScope != nodeScope && resourceMetaWatcher.nodeScope { + // It might happen that the watcher already exists, but is only being used to monitor the resources + // of a single node(e.g. created by pod metricset). In that case, we need to check if we are trying to create a new watcher that will track + // the resources of whole cluster(e.g. in case of state_pod metricset). + // If it is the case, then we need to update the watcher by changing its watch options (removing options.Node) + // A running watcher cannot be updated directly. Instead, we must create a new one with the correct watch options. + // The new restartWatcher must be identical to the old watcher, including the same handler function, with the only difference being the watch options. + + if isNamespaced(resourceName) { + options.Namespace = namespace + } + restartWatcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil) + if err != nil { + return false, err + } + // update the handler of the restartWatcher to match the current watcher's handler. + restartWatcher.AddEventHandler(resourceMetaWatcher.watcher.GetEventHandler()) + resourceMetaWatcher.restartWatcher = restartWatcher + resourceMetaWatcher.nodeScope = nodeScope } - restartWatcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil) - if err != nil { - return false, err + return false, nil + } + // Watcher doesn't exist, create it + + // Check if we need to add namespace to the watcher's options. + if isNamespaced(resourceName) { + options.Namespace = namespace + } + watcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil) + if err != nil { + return false, err + } + + resourceMetaWatcher = &metaWatcher{ + watcher: watcher, + started: false, // not started yet + enrichers: make(map[string]*enricher), + metricsetsUsing: make([]string, 0), + restartWatcher: nil, + nodeScope: nodeScope, + } + resourceWatchers.metaWatchersMap[resourceName] = resourceMetaWatcher + + // Add event handlers to the watcher. The only action we need to do here is invalidate the enricher cache. + addEventHandlerToWatcher(resourceMetaWatcher, resourceWatchers) + + return true, nil +} + +// addEventHandlerToWatcher adds an event handler to the watcher that invalidates the cache of enrichers attached +// to the watcher. +func addEventHandlerToWatcher(metaWatcher *metaWatcher, resourceWatchers *Watchers) { + notifyFunc := func(obj interface{}) { + enrichers := make(map[string]*enricher, len(metaWatcher.enrichers)) + + resourceWatchers.lock.Lock() + maps.Copy(enrichers, metaWatcher.enrichers) + resourceWatchers.lock.Unlock() + + for _, enricher := range enrichers { + enricher.Lock() + ids := enricher.deleteFunc(obj.(kubernetes.Resource)) + // update this watcher events by removing all the metadata[id] + for _, id := range ids { + delete(enricher.metadataCache, id) + } + enricher.Unlock() } - // update the handler of the restartWatcher to match the current watcher's handler. - restartWatcher.AddEventHandler(resourceMetaWatcher.watcher.GetEventHandler()) - resourceMetaWatcher.restartWatcher = restartWatcher - resourceMetaWatcher.nodeScope = nodeScope } - return false, nil + metaWatcher.watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) {}, // do nothing + UpdateFunc: notifyFunc, + DeleteFunc: notifyFunc, + }) } // addToMetricsetsUsing adds metricset identified by metricsetUsing to the list of resources using the shared watcher @@ -604,6 +644,7 @@ func NewResourceMetadataEnricher( return &nilEnricher{} } + _, _ = specificMetaGen, generalMetaGen // necessary for earlier versions of golangci-lint // updateFunc to be used as the resource watchers add and update handler. // The handler function is executed when a watcher is triggered(i.e. new/updated resource). // It is responsible for generating the metadata for a detected resource by executing the metadata generators Generate method. @@ -904,7 +945,7 @@ func getString(m mapstr.M, key string) string { } func join(fields ...string) string { - return strings.Join(fields, ":") + return strings.Join(fields, resourceMetadataKeySeparator) } // buildMetadataEnricher builds and returns a metadata enricher for a given metricset. @@ -922,7 +963,7 @@ func buildMetadataEnricher( log *logp.Logger) *enricher { enricher := &enricher{ - metadata: map[string]mapstr.M{}, + metadataCache: map[string]mapstr.M{}, index: indexFunc, updateFunc: updateFunc, deleteFunc: deleteFunc, @@ -940,104 +981,7 @@ func buildMetadataEnricher( if resourceMetaWatcher != nil { // Append the new enricher to watcher's enrichers map. resourceMetaWatcher.enrichers[metricsetName] = enricher - - // Check if this shared watcher has already detected resources and collected their - // metadata for another enricher. - // In that case, for each resource, call the updateFunc of the current enricher to - // generate its metadata. This is needed in cases where the watcher has already been - // notified for new/updated resources while the enricher for current metricset has not - // built yet (example is pod, state_pod metricsets). - for key := range resourceMetaWatcher.metadataObjects { - obj, exists, err := resourceMetaWatcher.watcher.Store().GetByKey(key) - if err != nil { - log.Errorf("Error trying to get the object from the store: %s", err) - } else { - if exists { - newMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource)) - // add the new metadata to the watcher received metadata - for id, metadata := range newMetadataEvents { - enricher.metadata[id] = metadata - } - } - } - } - - // AddEventHandler sets add, update and delete methods of watcher. - // Those methods are triggered when an event is detected for a - // resource creation, update or deletion. - resourceMetaWatcher.watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - resourceWatchers.lock.Lock() - defer resourceWatchers.lock.Unlock() - - // Add object(detected resource) to the list of metadata objects of this watcher, - // so it can be used by enrichers created after the event is triggered. - // The identifier of the object is in the form of namespace/name so that - // it can be easily fetched from watcher's store in previous step. - accessor, _ := meta.Accessor(obj.(kubernetes.Resource)) - id := accessor.GetName() - namespace := accessor.GetNamespace() - if namespace != "" { - id = namespace + "/" + id - } - resourceMetaWatcher.metadataObjects[id] = true - // Execute the updateFunc of each enricher associated to thos watcher. - for _, enricher := range resourceMetaWatcher.enrichers { - enricher.Lock() - newMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource)) - // add the new metadata to the watcher received metadata - for id, metadata := range newMetadataEvents { - enricher.metadata[id] = metadata - } - enricher.Unlock() - } - }, - UpdateFunc: func(obj interface{}) { - resourceWatchers.lock.Lock() - defer resourceWatchers.lock.Unlock() - - // Add object to the list of metadata objects of this watcher - accessor, _ := meta.Accessor(obj.(kubernetes.Resource)) - id := accessor.GetName() - namespace := accessor.GetNamespace() - if namespace != "" { - id = namespace + "/" + id - } - resourceMetaWatcher.metadataObjects[id] = true - - for _, enricher := range resourceMetaWatcher.enrichers { - enricher.Lock() - updatedMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource)) - for id, metadata := range updatedMetadataEvents { - enricher.metadata[id] = metadata - } - enricher.Unlock() - } - }, - DeleteFunc: func(obj interface{}) { - resourceWatchers.lock.Lock() - defer resourceWatchers.lock.Unlock() - - // Remove object from the list of metadata objects of this watcher - accessor, _ := meta.Accessor(obj.(kubernetes.Resource)) - id := accessor.GetName() - namespace := accessor.GetNamespace() - if namespace != "" { - id = namespace + "/" + id - } - delete(resourceMetaWatcher.metadataObjects, id) - - for _, enricher := range resourceMetaWatcher.enrichers { - enricher.Lock() - ids := enricher.deleteFunc(obj.(kubernetes.Resource)) - // update this watcher events by removing all the metadata[id] - for _, id := range ids { - delete(enricher.metadata, id) - } - enricher.Unlock() - } - }, - }) + enricher.watcher = resourceMetaWatcher } return enricher @@ -1124,11 +1068,8 @@ func (e *enricher) Stop(resourceWatchers *Watchers) { // This method is executed whenever a new event is created and about to be published. // The enricher's index method is used to retrieve the resource identifier from each event. func (e *enricher) Enrich(events []mapstr.M) { - e.RLock() - defer e.RUnlock() - for _, event := range events { - if meta := e.metadata[e.index(event)]; meta != nil { + if meta := e.getMetadata(event); meta != nil { k8s, err := meta.GetValue("kubernetes") if err != nil { continue @@ -1145,10 +1086,9 @@ func (e *enricher) Enrich(events []mapstr.M) { } // don't apply pod metadata to module level - k8sMeta = k8sMeta.Clone() delete(k8sMeta, "pod") } - ecsMeta := meta.Clone() + ecsMeta := meta err = ecsMeta.Delete("kubernetes") if err != nil { logp.Debug("kubernetes", "Failed to delete field '%s': %s", "kubernetes", err) @@ -1162,6 +1102,48 @@ func (e *enricher) Enrich(events []mapstr.M) { } } +// getMetadata returns metadata for the given event. If the metadata doesn't exist in the cache, we try to get it +// from the watcher store. +// The returned map is copy to be owned by the caller. +func (e *enricher) getMetadata(event mapstr.M) mapstr.M { + e.Lock() + defer e.Unlock() + metaKey := e.index(event) + eventMeta := e.metadataCache[metaKey] + if eventMeta == nil { + e.updateMetadataCacheFromWatcher(metaKey) + eventMeta = e.metadataCache[metaKey] + } + if eventMeta != nil { + eventMeta = eventMeta.Clone() + } + return eventMeta +} + +// updateMetadataCacheFromWatcher updates the metadata cache for the given key with data from the watcher. +func (e *enricher) updateMetadataCacheFromWatcher(key string) { + storeKey := getWatcherStoreKeyFromMetadataKey(key) + if res, exists, _ := e.watcher.watcher.Store().GetByKey(storeKey); exists { + eventMetaMap := e.updateFunc(res.(kubernetes.Resource)) + for k, v := range eventMetaMap { + e.metadataCache[k] = v + } + } +} + +// getWatcherStoreKeyFromMetadataKey returns a watcher store key for a given metadata cache key. These are identical +// for nearly all resources, and have the form `{namespace}/{name}`, with the exception of containers, where it's +// `{namespace}/{pod_name}/{container_name}`. In that case, we want the Pod key, so we drop the final part. +func getWatcherStoreKeyFromMetadataKey(metaKey string) string { + parts := strings.Split(metaKey, resourceMetadataKeySeparator) + if len(parts) <= 2 { // normal K8s resource + return metaKey + } + + // container, we need to remove the final part to get the Pod key + return strings.Join(parts[:2], resourceMetadataKeySeparator) +} + func CreateEvent(event mapstr.M, namespace string) (mb.Event, error) { var moduleFieldsMapStr mapstr.M moduleFields, ok := event[mb.ModuleDataKey] diff --git a/metricbeat/module/kubernetes/util/kubernetes_test.go b/metricbeat/module/kubernetes/util/kubernetes_test.go index 61da906372f4..703035d5c38d 100644 --- a/metricbeat/module/kubernetes/util/kubernetes_test.go +++ b/metricbeat/module/kubernetes/util/kubernetes_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -469,13 +471,14 @@ func TestBuildMetadataEnricher_EventHandler(t *testing.T) { resourceWatchers := NewWatchers() resourceWatchers.lock.Lock() - resourceWatchers.metaWatchersMap[PodResource] = &metaWatcher{ - watcher: &mockWatcher{}, + watcher := &metaWatcher{ + watcher: newMockWatcher(), started: false, metricsetsUsing: []string{"pod"}, - metadataObjects: make(map[string]bool), enrichers: make(map[string]*enricher), } + resourceWatchers.metaWatchersMap[PodResource] = watcher + addEventHandlerToWatcher(watcher, resourceWatchers) resourceWatchers.lock.Unlock() funcs := mockFuncs{} @@ -489,8 +492,10 @@ func TestBuildMetadataEnricher_EventHandler(t *testing.T) { Namespace: "default", }, } - id := "default/enrich" - metadataObjects := map[string]bool{id: true} + events := []mapstr.M{ + {"name": "unknown"}, + {"name": "enrich"}, + } config := &kubernetesConfig{ Namespace: "test-ns", @@ -509,30 +514,22 @@ func TestBuildMetadataEnricher_EventHandler(t *testing.T) { funcs.update, funcs.delete, funcs.index, log) resourceWatchers.lock.Lock() wData := resourceWatchers.metaWatchersMap[PodResource] - mockW := wData.watcher.(*mockWatcher) + mockW, ok := wData.watcher.(*mockWatcher) + require.True(t, ok) require.NotNil(t, mockW.handler) resourceWatchers.lock.Unlock() enricher.Start(resourceWatchers) resourceWatchers.lock.Lock() - watcher := resourceWatchers.metaWatchersMap[PodResource] require.True(t, watcher.started) - mockW = watcher.watcher.(*mockWatcher) resourceWatchers.lock.Unlock() mockW.handler.OnAdd(resource) - - resourceWatchers.lock.Lock() - require.Equal(t, metadataObjects, watcher.metadataObjects) - resourceWatchers.lock.Unlock() - - require.Equal(t, resource, funcs.updated) + err := mockW.Store().Add(resource) + require.NoError(t, err) // Test enricher - events := []mapstr.M{ - {"name": "unknown"}, - {"name": "enrich"}, - } + enricher.Enrich(events) require.Equal(t, []mapstr.M{ @@ -544,6 +541,8 @@ func TestBuildMetadataEnricher_EventHandler(t *testing.T) { }, }, events) + require.Equal(t, resource, funcs.updated) + // Enrich a pod (metadata goes in root level) events = []mapstr.M{ {"name": "unknown"}, @@ -565,14 +564,13 @@ func TestBuildMetadataEnricher_EventHandler(t *testing.T) { // Emit delete event resourceWatchers.lock.Lock() wData = resourceWatchers.metaWatchersMap[PodResource] - mockW = wData.watcher.(*mockWatcher) + mockW, ok = wData.watcher.(*mockWatcher) + require.True(t, ok) resourceWatchers.lock.Unlock() mockW.handler.OnDelete(resource) - - resourceWatchers.lock.Lock() - require.Equal(t, map[string]bool{}, watcher.metadataObjects) - resourceWatchers.lock.Unlock() + err = mockW.Store().Delete(resource) + require.NoError(t, err) require.Equal(t, resource, funcs.deleted) @@ -594,87 +592,16 @@ func TestBuildMetadataEnricher_EventHandler(t *testing.T) { resourceWatchers.lock.Unlock() } -// Test if we can add metadata from past events to an enricher that is associated -// with a resource that had already triggered the handler functions -func TestBuildMetadataEnricher_EventHandler_PastObjects(t *testing.T) { - log := logp.NewLogger(selector) - - resourceWatchers := NewWatchers() - - resourceWatchers.lock.Lock() - resourceWatchers.metaWatchersMap[PodResource] = &metaWatcher{ - watcher: &mockWatcher{}, - started: false, - metricsetsUsing: []string{"pod", "state_pod"}, - metadataObjects: make(map[string]bool), - enrichers: make(map[string]*enricher), - } - resourceWatchers.lock.Unlock() - - funcs := mockFuncs{} - resource1 := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - UID: types.UID("mockuid"), - Name: "enrich", - Labels: map[string]string{ - "label": "value", - }, - Namespace: "default", - }, - } - id1 := "default/enrich" - resource2 := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - UID: types.UID("mockuid2"), - Name: "enrich-2", - Labels: map[string]string{ - "label": "value", - }, - Namespace: "default-2", - }, - } - id2 := "default-2/enrich-2" - - config := &kubernetesConfig{ - Namespace: "test-ns", - SyncPeriod: time.Minute, - Node: "test-node", - AddResourceMetadata: &metadata.AddResourceMetadataConfig{ - CronJob: false, - Deployment: false, - }, - } - - enricher := buildMetadataEnricher("pod", PodResource, resourceWatchers, config, - funcs.update, funcs.delete, funcs.index, log) - enricher.Start(resourceWatchers) - - resourceWatchers.lock.Lock() - - watcher := resourceWatchers.metaWatchersMap[PodResource] - mockW := watcher.watcher.(*mockWatcher) - resourceWatchers.lock.Unlock() - - mockW.handler.OnAdd(resource1) - - resourceWatchers.lock.Lock() - metadataObjects := map[string]bool{id1: true} - require.Equal(t, metadataObjects, watcher.metadataObjects) - resourceWatchers.lock.Unlock() - - mockW.handler.OnUpdate(resource2) - - resourceWatchers.lock.Lock() - metadataObjects[id2] = true - require.Equal(t, metadataObjects, watcher.metadataObjects) - resourceWatchers.lock.Unlock() - - mockW.handler.OnDelete(resource1) - - resourceWatchers.lock.Lock() - delete(metadataObjects, id1) - require.Equal(t, metadataObjects, watcher.metadataObjects) - resourceWatchers.lock.Unlock() +func TestGetWatcherStoreKeyFromMetadataKey(t *testing.T) { + t.Run("global resource", func(t *testing.T) { + assert.Equal(t, "name", getWatcherStoreKeyFromMetadataKey("name")) + }) + t.Run("namespaced resource", func(t *testing.T) { + assert.Equal(t, "namespace/name", getWatcherStoreKeyFromMetadataKey("namespace/name")) + }) + t.Run("container", func(t *testing.T) { + assert.Equal(t, "namespace/pod", getWatcherStoreKeyFromMetadataKey("namespace/pod/container")) + }) } type mockFuncs struct { @@ -716,6 +643,19 @@ func (f *mockFuncs) index(m mapstr.M) string { type mockWatcher struct { handler kubernetes.ResourceEventHandler + store cache.Store +} + +func newMockWatcher() *mockWatcher { + return &mockWatcher{ + store: cache.NewStore(func(obj interface{}) (string, error) { + objName, err := cache.ObjectToName(obj) + if err != nil { + return "", err + } + return objName.Name, nil + }), + } } func (m *mockWatcher) GetEventHandler() kubernetes.ResourceEventHandler { @@ -735,7 +675,7 @@ func (m *mockWatcher) AddEventHandler(r kubernetes.ResourceEventHandler) { } func (m *mockWatcher) Store() cache.Store { - return nil + return m.store } func (m *mockWatcher) Client() k8s.Interface {