Skip to content

Commit

Permalink
Update all watchers - current and extra - according to watcher options
Browse files Browse the repository at this point in the history
Signed-off-by: constanca <[email protected]>
  • Loading branch information
constanca-m committed Feb 14, 2024
1 parent 15b673a commit 57135c9
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 116 deletions.
158 changes: 82 additions & 76 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (*nilEnricher) Start(*Watchers) {}
func (*nilEnricher) Stop(*Watchers) {}
func (*nilEnricher) Enrich([]mapstr.M) {}

type watcherData struct {
type metaWatcher struct {
watcher kubernetes.Watcher
started bool // true if watcher has started, false otherwise

Expand All @@ -97,8 +97,8 @@ type watcherData struct {
}

type Watchers struct {
watchersMap map[string]*watcherData
lock sync.RWMutex
metaWatchersMap map[string]*metaWatcher
lock sync.RWMutex
}

const selector = "kubernetes"
Expand All @@ -123,7 +123,7 @@ const (

func NewWatchers() *Watchers {
watchers := &Watchers{
watchersMap: make(map[string]*watcherData),
metaWatchersMap: make(map[string]*metaWatcher),
}
return watchers
}
Expand Down Expand Up @@ -256,24 +256,29 @@ func createWatcher(
options kubernetes.WatchOptions,
client k8sclient.Interface,
resourceWatchers *Watchers,
namespace string,
nodeScope bool) (bool, error) {
namespace string) (bool, error) {

// We need to check the node scope to decide on whether a watcher should be updated or not
nodeScope := false
if options.Node != "" {
nodeScope = true
}

resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()

watcher, ok := resourceWatchers.watchersMap[resourceName]
// if it does not exist, create the watcher
resourceMetaWatchers, ok := resourceWatchers.metaWatchersMap[resourceName]
// if it does not exist, create the resourceMetaWatchers
if !ok {
// check if we need to add namespace to the watcher options
// check if we need to add namespace to the resourceMetaWatchers options
if isNamespaced(resourceName) {
options.Namespace = namespace
}
watcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
if err != nil {
return false, err
}
resourceWatchers.watchersMap[resourceName] = &watcherData{
resourceWatchers.metaWatchersMap[resourceName] = &metaWatcher{
watcher: watcher,
started: false,
metadataObjects: make(map[string]bool),
Expand All @@ -283,21 +288,22 @@ func createWatcher(
nodeScope: nodeScope,
}
return true, nil
} else if watcher.nodeScope != nodeScope && watcher.nodeScope {
// It might happen that the watcher already exists, but is only being used to monitor the resources
// of a single node. In that case, we need to check if we are trying to create a new watcher that will track
// the resources of multiple nodes. If it is the case, then we need to update the watcher.
// check if we need to add namespace to the watcher options

} else if resourceMetaWatchers.nodeScope != nodeScope && resourceMetaWatchers.nodeScope {
// It might happen that the resourceMetaWatchers already exists, but is only being used to monitor the resources
// of a single node. In that case, we need to check if we are trying to create a new resourceMetaWatchers that will track
// the resources of multiple nodes. If it is the case, then we need to update the resourceMetaWatchers.
// check if we need to add namespace to the resourceMetaWatchers options
if isNamespaced(resourceName) {
options.Namespace = namespace
}
restartWatcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
if err != nil {
return false, err
}
watcher.restartWatcher = restartWatcher
watcher.nodeScope = nodeScope
// update the handler of the restart resourceMetaWatchers to match the current resourceMetaWatchers handler
restartWatcher.AddEventHandler(resourceMetaWatchers.watcher.GetEventHandler())
resourceMetaWatchers.restartWatcher = restartWatcher
resourceMetaWatchers.nodeScope = nodeScope
}
return false, nil
}
Expand All @@ -308,7 +314,7 @@ func addToMetricsetsUsing(resourceName string, metricsetUsing string, resourceWa
resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()

data, ok := resourceWatchers.watchersMap[resourceName]
data, ok := resourceWatchers.metaWatchersMap[resourceName]
if ok {
contains := false
for _, which := range data.metricsetsUsing {
Expand All @@ -328,7 +334,7 @@ func addToMetricsetsUsing(resourceName string, metricsetUsing string, resourceWa
// It returns true if element was removed and new size of array.
// The cache should be locked when called.
func removeFromMetricsetsUsing(resourceName string, notUsingName string, resourceWatchers *Watchers) (bool, int) {
data, ok := resourceWatchers.watchersMap[resourceName]
data, ok := resourceWatchers.metaWatchersMap[resourceName]
removed := false
if ok {
newIndex := 0
Expand Down Expand Up @@ -368,7 +374,7 @@ func createAllWatchers(

// Create a watcher for the given resource.
// If it fails, we return an error, so we can stop the extra watchers from creating.
created, err := createWatcher(resourceName, res, *options, client, resourceWatchers, config.Namespace, nodeScope)
created, err := createWatcher(resourceName, res, *options, client, resourceWatchers, config.Namespace)
if err != nil {
return fmt.Errorf("error initializing Kubernetes watcher %s, required by %s: %w", resourceName, metricsetName, err)
} else if created {
Expand All @@ -382,7 +388,7 @@ func createAllWatchers(
for _, extra := range extraWatchers {
extraRes := getResource(extra)
if extraRes != nil {
created, err = createWatcher(extra, extraRes, *options, client, resourceWatchers, config.Namespace, false)
created, err = createWatcher(extra, extraRes, *options, client, resourceWatchers, config.Namespace)
if err != nil {
log.Errorf("Error initializing Kubernetes watcher %s, required by %s: %s", extra, metricsetName, err)
} else {
Expand All @@ -407,18 +413,18 @@ func createMetadataGen(client k8sclient.Interface, commonConfig *conf.C, addReso
resourceWatchers.lock.RLock()
defer resourceWatchers.lock.RUnlock()

resourceWatcher := resourceWatchers.watchersMap[resourceName]
resourceMetaWatcher := resourceWatchers.metaWatchersMap[resourceName]
// This should not be possible since the watchers should have been created before
if resourceWatcher == nil {
if resourceMetaWatcher == nil {
return nil, fmt.Errorf("could not create the metadata generator, as the watcher for %s does not exist", resourceName)
}

var metaGen *metadata.Resource

namespaceWatcher := resourceWatchers.watchersMap[NamespaceResource]
if namespaceWatcher != nil {
namespaceMetaWatcher := resourceWatchers.metaWatchersMap[NamespaceResource]
if namespaceMetaWatcher != nil {
n := metadata.NewNamespaceMetadataGenerator(addResourceMetadata.Namespace,
(*namespaceWatcher).watcher.Store(), client)
(*namespaceMetaWatcher).watcher.Store(), client)
metaGen = metadata.NewNamespaceAwareResourceMetadataGenerator(commonConfig, client, n)
} else {
metaGen = metadata.NewResourceMetadataGenerator(commonConfig, client)
Expand All @@ -435,41 +441,41 @@ func createMetadataGenSpecific(client k8sclient.Interface, commonConfig *conf.C,
defer resourceWatchers.lock.RUnlock()

// The watcher for the resource needs to exist
resWatcher := resourceWatchers.watchersMap[resourceName]
if resWatcher == nil {
resourceMetaWatcher := resourceWatchers.metaWatchersMap[resourceName]
if resourceMetaWatcher == nil {
return nil, fmt.Errorf("could not create the metadata generator, as the watcher for %s does not exist", resourceName)
}

var metaGen metadata.MetaGen
if resourceName == PodResource {
var nodeWatcher kubernetes.Watcher
if watcher := resourceWatchers.watchersMap[NodeResource]; watcher != nil {
nodeWatcher = (*watcher).watcher
if nodeMetaWatcher := resourceWatchers.metaWatchersMap[NodeResource]; nodeMetaWatcher != nil {
nodeWatcher = (*nodeMetaWatcher).watcher
}
var namespaceWatcher kubernetes.Watcher
if watcher := resourceWatchers.watchersMap[NamespaceResource]; watcher != nil {
namespaceWatcher = (*watcher).watcher
if namespaceMetaWatcher := resourceWatchers.metaWatchersMap[NamespaceResource]; namespaceMetaWatcher != nil {
namespaceWatcher = (*namespaceMetaWatcher).watcher
}
var replicaSetWatcher kubernetes.Watcher
if watcher := resourceWatchers.watchersMap[ReplicaSetResource]; watcher != nil {
replicaSetWatcher = (*watcher).watcher
if replicasetMetaWatcher := resourceWatchers.metaWatchersMap[ReplicaSetResource]; replicasetMetaWatcher != nil {
replicaSetWatcher = (*replicasetMetaWatcher).watcher
}
var jobWatcher kubernetes.Watcher
if watcher := resourceWatchers.watchersMap[JobResource]; watcher != nil {
jobWatcher = (*watcher).watcher
if jobMetaWatcher := resourceWatchers.metaWatchersMap[JobResource]; jobMetaWatcher != nil {
jobWatcher = (*jobMetaWatcher).watcher
}

metaGen = metadata.GetPodMetaGen(commonConfig, (*resWatcher).watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher,
metaGen = metadata.GetPodMetaGen(commonConfig, (*resourceMetaWatcher).watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher,
jobWatcher, addResourceMetadata)
return metaGen, nil
} else if resourceName == ServiceResource {
namespaceWatcher := resourceWatchers.watchersMap[NamespaceResource]
if namespaceWatcher == nil {
namespaceMetaWatcher := resourceWatchers.metaWatchersMap[NamespaceResource]
if namespaceMetaWatcher == nil {
return nil, fmt.Errorf("could not create the metadata generator, as the watcher for namespace does not exist")
}
namespaceMeta := metadata.NewNamespaceMetadataGenerator(addResourceMetadata.Namespace,
(*namespaceWatcher).watcher.Store(), client)
metaGen = metadata.NewServiceMetadataGenerator(commonConfig, (*resWatcher).watcher.Store(),
(*namespaceMetaWatcher).watcher.Store(), client)
metaGen = metadata.NewServiceMetadataGenerator(commonConfig, (*resourceMetaWatcher).watcher.Store(),
namespaceMeta, client)
return metaGen, nil
}
Expand Down Expand Up @@ -858,17 +864,17 @@ func buildMetadataEnricher(
resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()

watcher := resourceWatchers.watchersMap[resourceName]
if watcher != nil {
watcher.enrichers[metricsetName] = enricher
resourceMetaWatcher := resourceWatchers.metaWatchersMap[resourceName]
if resourceMetaWatcher != nil {
resourceMetaWatcher.enrichers[metricsetName] = enricher

// Check if this shared watcher has already detected resources from a previous enricher.
// In that case, for each resource, call the updateFunc of the current enricher to
// update its metadata. This is needed in cases where the watcher has already been
// notified for new/updated resources while the enricher for current metricset has not
// built yet (example is pod, state_pod metricsets).
for key, _ := range watcher.metadataObjects {
obj, exists, err := watcher.watcher.Store().GetByKey(key)
for key, _ := range resourceMetaWatcher.metadataObjects {
obj, exists, err := resourceMetaWatcher.watcher.Store().GetByKey(key)
if err != nil {
log.Errorf("Error trying to get the object from the store: %s", err)
} else {
Expand All @@ -885,7 +891,7 @@ func buildMetadataEnricher(
// AddEventHandler sets add, update and delete methods of watcher.
// Those methods are triggered when an event is detected for a
// resource creation, update or deletion.
watcher.watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
resourceMetaWatcher.watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()
Expand All @@ -899,9 +905,9 @@ func buildMetadataEnricher(
if namespace != "" {
id = namespace + "/" + id
}
watcher.metadataObjects[id] = true
resourceMetaWatcher.metadataObjects[id] = true

for _, enricher := range watcher.enrichers {
for _, enricher := range resourceMetaWatcher.enrichers {
enricher.Lock()
newMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource))
// add the new metadata to the watcher received metadata
Expand All @@ -922,9 +928,9 @@ func buildMetadataEnricher(
if namespace != "" {
id = namespace + "/" + id
}
watcher.metadataObjects[id] = true
resourceMetaWatcher.metadataObjects[id] = true

for _, enricher := range watcher.enrichers {
for _, enricher := range resourceMetaWatcher.enrichers {
enricher.Lock()
updatedMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource))
for id, metadata := range updatedMetadataEvents {
Expand All @@ -944,9 +950,9 @@ func buildMetadataEnricher(
if namespace != "" {
id = namespace + "/" + id
}
delete(watcher.metadataObjects, id)
delete(resourceMetaWatcher.metadataObjects, id)

for _, enricher := range watcher.enrichers {
for _, enricher := range resourceMetaWatcher.enrichers {
enricher.Lock()
ids := enricher.deleteFunc(obj.(kubernetes.Resource))
// update this watcher events by removing all the metadata[id]
Expand All @@ -973,32 +979,32 @@ func (e *enricher) Start(resourceWatchers *Watchers) {
// first.
extras := getExtraWatchers(e.resourceName, e.config.AddResourceMetadata)
for _, extra := range extras {
extraWatcher := resourceWatchers.watchersMap[extra]
if extraWatcher != nil && !extraWatcher.started {
if err := extraWatcher.watcher.Start(); err != nil {
extraWatcherMeta := resourceWatchers.metaWatchersMap[extra]
if extraWatcherMeta != nil && !extraWatcherMeta.started {
if err := extraWatcherMeta.watcher.Start(); err != nil {
e.log.Warnf("Error starting %s watcher: %s", extra, err)
} else {
extraWatcher.started = true
extraWatcherMeta.started = true
}
}
}

// Start the main watcher if not already started or if a restart is needed
resourceWatcher := resourceWatchers.watchersMap[e.resourceName]
if resourceWatcher != nil {
if !resourceWatcher.started {
if err := resourceWatcher.watcher.Start(); err != nil {
resourceMetaWatcher := resourceWatchers.metaWatchersMap[e.resourceName]
if resourceMetaWatcher != nil {
if !resourceMetaWatcher.started {
if err := resourceMetaWatcher.watcher.Start(); err != nil {
e.log.Warnf("Error starting %s watcher: %s", e.resourceName, err)
} else {
resourceWatcher.started = true
resourceMetaWatcher.started = true
}
} else if resourceWatcher.restartWatcher != nil {
resourceWatcher.watcher.Stop()
if err := resourceWatcher.restartWatcher.Start(); err != nil {
} else if resourceMetaWatcher.restartWatcher != nil {
resourceMetaWatcher.watcher.Stop()
if err := resourceMetaWatcher.restartWatcher.Start(); err != nil {
e.log.Warnf("Error restarting %s watcher: %s", e.resourceName, err)
} else {
resourceWatcher.watcher = resourceWatcher.restartWatcher
resourceWatcher.restartWatcher = nil
resourceMetaWatcher.watcher = resourceMetaWatcher.restartWatcher
resourceMetaWatcher.restartWatcher = nil
}
}

Expand All @@ -1011,23 +1017,23 @@ func (e *enricher) Stop(resourceWatchers *Watchers) {
resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()

resourceWatcher := resourceWatchers.watchersMap[e.resourceName]
if resourceWatcher != nil && resourceWatcher.started {
resourceMetaWatcher := resourceWatchers.metaWatchersMap[e.resourceName]
if resourceMetaWatcher != nil && resourceMetaWatcher.started {
_, size := removeFromMetricsetsUsing(e.resourceName, e.metricsetName, resourceWatchers)
if size == 0 {
resourceWatcher.watcher.Stop()
resourceWatcher.started = false
resourceMetaWatcher.watcher.Stop()
resourceMetaWatcher.started = false
}
}

extras := getExtraWatchers(e.resourceName, e.config.AddResourceMetadata)
for _, extra := range extras {
extraWatcher := resourceWatchers.watchersMap[extra]
if extraWatcher != nil && extraWatcher.started {
extraMetaWatcher := resourceWatchers.metaWatchersMap[extra]
if extraMetaWatcher != nil && extraMetaWatcher.started {
_, size := removeFromMetricsetsUsing(extra, e.metricsetName, resourceWatchers)
if size == 0 {
extraWatcher.watcher.Stop()
extraWatcher.started = false
extraMetaWatcher.watcher.Stop()
extraMetaWatcher.started = false
}
}
}
Expand Down
Loading

0 comments on commit 57135c9

Please sign in to comment.