Skip to content

Commit

Permalink
Make k8s meta processor initialisation asynchronous (#16373) (#16419)
Browse files Browse the repository at this point in the history
(cherry picked from commit 4891c04)
  • Loading branch information
ChrsMark authored Feb 20, 2020
1 parent 43d5db4 commit d62349f
Showing 1 changed file with 91 additions and 66 deletions.
157 changes: 91 additions & 66 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,24 @@ package add_kubernetes_metadata
import (
"fmt"
"os"
"sync"
"time"

"github.com/elastic/beats/libbeat/common/kubernetes/metadata"

k8sclient "k8s.io/client-go/kubernetes"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/kubernetes"
"github.com/elastic/beats/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
jsprocessor "github.com/elastic/beats/libbeat/processors/script/javascript/module/processor"
)

const (
timeout = time.Second * 5
timeout = time.Second * 5
selector = "kubernetes"
checkNodeReadyAttempts = 10
)

type kubernetesAnnotator struct {
Expand All @@ -47,10 +49,9 @@ type kubernetesAnnotator struct {
matchers *Matchers
cache *cache
kubernetesAvailable bool
initOnce sync.Once
}

const selector = "kubernetes"

func init() {
processors.RegisterPlugin("add_kubernetes_metadata", New)
jsprocessor.RegisterPlugin("AddKubernetesMetadata", New)
Expand All @@ -64,14 +65,29 @@ func init() {
Indexing.AddMatcher(FieldFormatMatcherName, NewFieldFormatMatcher)
}

func isKubernetesAvailable(client k8sclient.Interface) bool {
func isKubernetesAvailable(client k8sclient.Interface) (bool, error) {
server, err := client.Discovery().ServerVersion()
if err != nil {
logp.Info("%v: could not detect kubernetes env: %v", "add_kubernetes_metadata", err)
return false
return false, err
}
logp.Info("%v: kubernetes env detected, with version: %v", "add_kubernetes_metadata", server)
return true
return true, nil
}

func isKubernetesAvailableWithRetry(client k8sclient.Interface) bool {
connectionAttempts := 1
for {
kubernetesAvailable, err := isKubernetesAvailable(client)
if kubernetesAvailable {
return true
}
if connectionAttempts > checkNodeReadyAttempts {
logp.Info("%v: could not detect kubernetes env: %v", "add_kubernetes_metadata", err)
return false
}
time.Sleep(3 * time.Second)
connectionAttempts += 1
}
}

// New constructs a new add_kubernetes_metadata processor.
Expand Down Expand Up @@ -108,73 +124,82 @@ func New(cfg *common.Config) (processors.Processor, error) {
kubernetesAvailable: false,
}

client, err := kubernetes.GetKubernetesClient(config.KubeConfig)
if err != nil {
if kubernetes.IsInCluster(config.KubeConfig) {
log.Debugf("Could not create kubernetes client using in_cluster config: %+v", err)
} else if config.KubeConfig == "" {
log.Debugf("Could not create kubernetes client using config: %v: %+v", os.Getenv("KUBECONFIG"), err)
} else {
log.Debugf("Could not create kubernetes client using config: %v: %+v", config.KubeConfig, err)
}
return processor, nil
}
// complete processor's initialisation asynchronously so as to re-try on failing k8s client initialisations in case
// the k8s node is not yet ready.
go processor.init(config, cfg)

if !isKubernetesAvailable(client) {
return processor, nil
}
return processor, nil
}

matchers := NewMatchers(config.Matchers)
func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Config) {
k.initOnce.Do(func() {
client, err := kubernetes.GetKubernetesClient(config.KubeConfig)
if err != nil {
if kubernetes.IsInCluster(config.KubeConfig) {
k.log.Debugf("Could not create kubernetes client using in_cluster config: %+v", err)
} else if config.KubeConfig == "" {
k.log.Debugf("Could not create kubernetes client using config: %v: %+v", os.Getenv("KUBECONFIG"), err)
} else {
k.log.Debugf("Could not create kubernetes client using config: %v: %+v", config.KubeConfig, err)
}
return
}

if matchers.Empty() {
log.Debugf("Could not initialize kubernetes plugin with zero matcher plugins")
return processor, nil
}
if !isKubernetesAvailableWithRetry(client) {
return
}

processor.matchers = matchers
matchers := NewMatchers(config.Matchers)

config.Host = kubernetes.DiscoverKubernetesNode(log, config.Host, kubernetes.IsInCluster(config.KubeConfig), client)
if matchers.Empty() {
k.log.Debugf("Could not initialize kubernetes plugin with zero matcher plugins")
return
}

log.Debug("Initializing a new Kubernetes watcher using host: %s", config.Host)
k.matchers = matchers

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Host,
Namespace: config.Namespace,
}, nil)
if err != nil {
log.Errorf("Couldn't create kubernetes watcher for %T", &kubernetes.Pod{})
return nil, err
}
config.Host = kubernetes.DiscoverKubernetesNode(k.log, config.Host, kubernetes.IsInCluster(config.KubeConfig), client)

metaGen := metadata.NewPodMetadataGenerator(cfg, watcher.Store(), nil, nil)
processor.indexers = NewIndexers(config.Indexers, metaGen)
processor.watcher = watcher
processor.kubernetesAvailable = true

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*kubernetes.Pod)
log.Debugf("Adding kubernetes pod: %s/%s", pod.GetNamespace(), pod.GetName())
processor.addPod(pod)
},
UpdateFunc: func(obj interface{}) {
pod := obj.(*kubernetes.Pod)
log.Debugf("Updating kubernetes pod: %s/%s", pod.GetNamespace(), pod.GetName())
processor.updatePod(pod)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*kubernetes.Pod)
log.Debugf("Removing pod: %s/%s", pod.GetNamespace(), pod.GetName())
processor.removePod(pod)
},
})
k.log.Debugf("Initializing a new Kubernetes watcher using host: %s", config.Host)

if err := watcher.Start(); err != nil {
return nil, err
}
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Host,
Namespace: config.Namespace,
}, nil)
if err != nil {
k.log.Errorf("Couldn't create kubernetes watcher for %T", &kubernetes.Pod{})
return
}

return processor, nil
metaGen := metadata.NewPodMetadataGenerator(cfg, watcher.Store(), nil, nil)
k.indexers = NewIndexers(config.Indexers, metaGen)
k.watcher = watcher
k.kubernetesAvailable = true

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*kubernetes.Pod)
k.log.Debugf("Adding kubernetes pod: %s/%s", pod.GetNamespace(), pod.GetName())
k.addPod(pod)
},
UpdateFunc: func(obj interface{}) {
pod := obj.(*kubernetes.Pod)
k.log.Debugf("Updating kubernetes pod: %s/%s", pod.GetNamespace(), pod.GetName())
k.updatePod(pod)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*kubernetes.Pod)
k.log.Debugf("Removing pod: %s/%s", pod.GetNamespace(), pod.GetName())
k.removePod(pod)
},
})

if err := watcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start watcher: %v", err)
return
}
})
}

func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) {
Expand Down

0 comments on commit d62349f

Please sign in to comment.