From 643344950bd70489e407748bb3b4b7329cf1f551 Mon Sep 17 00:00:00 2001 From: Zhiheng Date: Sat, 20 Jan 2018 03:58:28 +0800 Subject: [PATCH 1/2] Using k8s.io/client-go instead of github.com/ericchiang/k8s Now beat can detect host by itself when outside cluster It also fixes 2 bugs: * query pod meta won't refrese deleted map timeout * delete event for k8s autodiscovery will be delayed since watcher delay the event push --- .../providers/kubernetes/kubernetes.go | 100 +++--- libbeat/common/kubernetes/eventhandler.go | 92 ++++++ libbeat/common/kubernetes/types.go | 53 +-- libbeat/common/kubernetes/util.go | 131 +++++--- libbeat/common/kubernetes/watcher.go | 304 +++++------------- .../add_kubernetes_metadata/cache.go | 69 ++++ .../add_kubernetes_metadata/kubernetes.go | 111 +++---- 7 files changed, 443 insertions(+), 417 deletions(-) create mode 100644 libbeat/common/kubernetes/eventhandler.go create mode 100644 libbeat/processors/add_kubernetes_metadata/cache.go diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 646638402cfa..dcc2a511d478 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -15,15 +15,12 @@ func init() { // Provider implements autodiscover provider for docker containers type Provider struct { - config *Config - bus bus.Bus - watcher kubernetes.Watcher - metagen kubernetes.MetaGenerator - templates *template.Mapper - stop chan interface{} - startListener bus.Listener - stopListener bus.Listener - updateListener bus.Listener + config *Config + bus bus.Bus + watcher kubernetes.Watcher + metagen kubernetes.MetaGenerator + templates *template.Mapper + stop chan interface{} } // AutodiscoverBuilder builds and returns an autodiscover provider @@ -39,71 +36,54 @@ func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider, return nil, err } - client, err := kubernetes.GetKubernetesClient(config.InCluster, config.KubeConfig) + client, err := kubernetes.GetKubernetesClientset(config.InCluster, config.KubeConfig) if err != nil { return nil, err } metagen := kubernetes.NewMetaGenerator(config.IncludeAnnotations, config.IncludeLabels, config.ExcludeLabels) - config.Host = kubernetes.DiscoverKubernetesNode(config.Host, client) - watcher := kubernetes.NewWatcher(client.CoreV1(), config.SyncPeriod, config.CleanupTimeout, config.Host) + config.Host = kubernetes.DiscoverKubernetesNode(config.Host, config.InCluster, client) + watcher, err := kubernetes.NewWatcher(client, config.SyncPeriod, config.Host, config.Namespace, &kubernetes.Pod{}) + if err != nil { + logp.Err("kubernetes: Couldn't create watcher for %t", &kubernetes.Pod{}) + return nil, err + } + + p := &Provider{ + config: config, + bus: bus, + templates: mapper, + metagen: metagen, + watcher: watcher, + stop: make(chan interface{}), + } - start := watcher.ListenStart() - stop := watcher.ListenStop() - update := watcher.ListenUpdate() + watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ + AddFunc: func(obj kubernetes.Resource) { + p.emit(obj.(*kubernetes.Pod), "start") + }, + UpdateFunc: func(old, new kubernetes.Resource) { + p.emit(old.(*kubernetes.Pod), "stop") + p.emit(new.(*kubernetes.Pod), "start") + }, + DeleteFunc: func(obj kubernetes.Resource) { + p.emit(obj.(*kubernetes.Pod), "stop") + }, + }) if err := watcher.Start(); err != nil { return nil, err } - return &Provider{ - config: config, - bus: bus, - templates: mapper, - metagen: metagen, - watcher: watcher, - stop: make(chan interface{}), - startListener: start, - stopListener: stop, - updateListener: update, - }, nil + return p, nil } -// Start the autodiscover provider. Start and stop listeners work the -// conventional way. Update listener triggers a stop and then a start -// to simulate an update. -func (p *Provider) Start() { - go func() { - for { - select { - case <-p.stop: - p.startListener.Stop() - p.stopListener.Stop() - return - - case event := <-p.startListener.Events(): - p.emit(event, "start") - - case event := <-p.stopListener.Events(): - p.emit(event, "stop") - - case event := <-p.updateListener.Events(): - //On updates, first send a stop signal followed by a start signal to simulate a restart - p.emit(event, "stop") - p.emit(event, "start") - } - } - }() -} - -func (p *Provider) emit(event bus.Event, flag string) { - pod, ok := event["pod"].(*kubernetes.Pod) - if !ok { - logp.Err("Couldn't get a pod from watcher event") - return - } +// Start for Runner interface. +// Provider was actually started in AutodiscoverBuilder. +func (p *Provider) Start() {} +func (p *Provider) emit(pod *kubernetes.Pod, flag string) { host := pod.Status.PodIP // Emit pod container IDs @@ -171,7 +151,7 @@ func (p *Provider) publish(event bus.Event) { // Stop signals the stop channel to force the watch loop routine to stop. func (p *Provider) Stop() { - close(p.stop) + p.watcher.Stop() } // String returns a description of kubernetes autodiscover provider. diff --git a/libbeat/common/kubernetes/eventhandler.go b/libbeat/common/kubernetes/eventhandler.go new file mode 100644 index 000000000000..5f878c7a8016 --- /dev/null +++ b/libbeat/common/kubernetes/eventhandler.go @@ -0,0 +1,92 @@ +package kubernetes + +// ResourceEventHandler can handle notifications for events that happen to a +// resource. The events are informational only, so you can't return an +// error. +// * OnAdd is called when an object is added. +// * OnUpdate is called when an object is modified. Note that oldObj is the +// last known state of the object-- it is possible that several changes +// were combined together, so you can't use this to see every single +// change. OnUpdate is also called when a re-list happens, and it will +// get called even if nothing changed. This is useful for periodically +// evaluating or syncing something. +// * OnDelete will get the final state of the item if it is known, otherwise +// it will get an object of type DeletedFinalStateUnknown. This can +// happen if the watch is closed and misses the delete event and we don't +// notice the deletion until the subsequent re-list. +type ResourceEventHandler interface { + OnAdd(obj Resource) + OnUpdate(oldObj, newObj Resource) + OnDelete(obj Resource) +} + +// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or +// as few of the notification functions as you want while still implementing +// ResourceEventHandler. +type ResourceEventHandlerFuncs struct { + AddFunc func(obj Resource) + UpdateFunc func(oldObj, newObj Resource) + DeleteFunc func(obj Resource) +} + +// OnAdd calls AddFunc if it's not nil. +func (r ResourceEventHandlerFuncs) OnAdd(obj Resource) { + if r.AddFunc != nil { + r.AddFunc(obj) + } +} + +// OnUpdate calls UpdateFunc if it's not nil. +func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj Resource) { + if r.UpdateFunc != nil { + r.UpdateFunc(oldObj, newObj) + } +} + +// OnDelete calls DeleteFunc if it's not nil. +func (r ResourceEventHandlerFuncs) OnDelete(obj Resource) { + if r.DeleteFunc != nil { + r.DeleteFunc(obj) + } +} + +// FilteringResourceEventHandler applies the provided filter to all events coming +// in, ensuring the appropriate nested handler method is invoked. An object +// that starts passing the filter after an update is considered an add, and an +// object that stops passing the filter after an update is considered a delete. +type FilteringResourceEventHandler struct { + FilterFunc func(obj Resource) bool + Handler ResourceEventHandler +} + +// OnAdd calls the nested handler only if the filter succeeds +func (r FilteringResourceEventHandler) OnAdd(obj Resource) { + if !r.FilterFunc(obj) { + return + } + r.Handler.OnAdd(obj) +} + +// OnUpdate ensures the proper handler is called depending on whether the filter matches +func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj Resource) { + newer := r.FilterFunc(newObj) + older := r.FilterFunc(oldObj) + switch { + case newer && older: + r.Handler.OnUpdate(oldObj, newObj) + case newer && !older: + r.Handler.OnAdd(newObj) + case !newer && older: + r.Handler.OnDelete(oldObj) + default: + // do nothing + } +} + +// OnDelete calls the nested handler only if the filter succeeds +func (r FilteringResourceEventHandler) OnDelete(obj Resource) { + if !r.FilterFunc(obj) { + return + } + r.Handler.OnDelete(obj) +} diff --git a/libbeat/common/kubernetes/types.go b/libbeat/common/kubernetes/types.go index 134c13d4826b..a81a287e5963 100644 --- a/libbeat/common/kubernetes/types.go +++ b/libbeat/common/kubernetes/types.go @@ -1,14 +1,14 @@ package kubernetes import ( - "encoding/json" "strings" - - "github.com/elastic/beats/libbeat/logp" - - corev1 "github.com/ericchiang/k8s/api/v1" + "time" ) +type Resource interface { + GetMetadata() *ObjectMeta +} + type ObjectMeta struct { Annotations map[string]string `json:"annotations"` CreationTimestamp string `json:"creationTimestamp"` @@ -108,6 +108,10 @@ type Pod struct { Status PodStatus `json:"status"` } +func (p *Pod) GetMetadata() *ObjectMeta { + return &p.Metadata +} + // GetContainerID parses the container ID to get the actual ID string func (s *PodContainerStatus) GetContainerID() string { cID := s.ContainerID @@ -120,20 +124,29 @@ func (s *PodContainerStatus) GetContainerID() string { return "" } -// GetPod converts Pod to our own type -func GetPod(pod *corev1.Pod) *Pod { - bytes, err := json.Marshal(pod) - if err != nil { - logp.Warn("Unable to marshal %v", pod.String()) - return nil - } - - po := &Pod{} - err = json.Unmarshal(bytes, po) - if err != nil { - logp.Warn("Unable to marshal %v", pod.String()) - return nil - } +type Event struct { + APIVersion string `json:"apiVersion"` + Count int64 `json:"count"` + FirstTimestamp *time.Time `json:"firstTimestamp"` + InvolvedObject struct { + APIVersion string `json:"apiVersion"` + Kind string `json:"kind"` + Name string `json:"name"` + ResourceVersion string `json:"resourceVersion"` + UID string `json:"uid"` + } `json:"involvedObject"` + Kind string `json:"kind"` + LastTimestamp *time.Time `json:"lastTimestamp"` + Message string `json:"message"` + Metadata ObjectMeta `json:"metadata"` + Reason string `json:"reason"` + Source struct { + Component string `json:"component"` + Host string `json:"host"` + } `json:"source"` + Type string `json:"type"` +} - return po +func (e *Event) GetMetadata() *ObjectMeta { + return &e.Metadata } diff --git a/libbeat/common/kubernetes/util.go b/libbeat/common/kubernetes/util.go index 88c1c37048af..764bb7725b00 100644 --- a/libbeat/common/kubernetes/util.go +++ b/libbeat/common/kubernetes/util.go @@ -1,68 +1,123 @@ package kubernetes import ( - "context" - "fmt" "io/ioutil" "os" + "strings" - "github.com/ericchiang/k8s" - "github.com/ghodss/yaml" + "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" "github.com/elastic/beats/libbeat/logp" ) -// GetKubernetesClient returns a kubernetes client. If inCluster is true, it returns an +const defaultNode = "localhost" + +// GetKubernetesClientset returns a kubernetes clientset. If inCluster is true, it returns an // in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed, -// it parses the config file to get the config required to build a client. -func GetKubernetesClient(inCluster bool, kubeConfig string) (client *k8s.Client, err error) { +// it parses the config file to get the config required to build a clientset. +func GetKubernetesClientset(inCluster bool, kubeConfig string) (*kubernetes.Clientset, error) { + var config *rest.Config + var err error if inCluster == true { - client, err = k8s.NewInClusterClient() + config, err = rest.InClusterConfig() if err != nil { - return nil, fmt.Errorf("Unable to get in cluster configuration: %v", err) + return nil, err } } else { - data, err := ioutil.ReadFile(kubeConfig) + config, err = clientcmd.BuildConfigFromFlags("", kubeConfig) if err != nil { - return nil, fmt.Errorf("read kubeconfig: %v", err) + return nil, err } + } + config.ContentType = "application/vnd.kubernetes.protobuf" + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + return clientset, nil +} - // Unmarshal YAML into a Kubernetes config object. - var config k8s.Config - if err = yaml.Unmarshal(data, &config); err != nil { - return nil, fmt.Errorf("unmarshal kubeconfig: %v", err) +// DiscoverKubernetesNode figures out the Kubernetes node to use. +// If host is provided in the config use it directly. +// If beat is deployed in k8s cluster, use hostname of pod which is pod name to query pod meta for node name. +// If beat is deployed outside k8s cluster, use machine-id to match against k8s nodes for node name. +func DiscoverKubernetesNode(host string, inCluster bool, clientset *kubernetes.Clientset) (node string) { + if host != "" { + logp.Info("kubernetes: Using node %s provided in the config", host) + return host + } + + if inCluster { + ns := inClusterNamespace() + podName, err := os.Hostname() + if err != nil { + logp.Err("kubernetes: Couldn't get hostname as beat pod name in cluster with error: ", err.Error()) + return defaultNode } - client, err = k8s.NewClient(&config) + logp.Info("kubernetes: Using pod name %s and namespace %s to discover kubernetes node", podName, ns) + pod, err := clientset.CoreV1().Pods(inClusterNamespace()).Get(podName, v1.GetOptions{}) if err != nil { - return nil, err + logp.Err("kubernetes: Querying for pod failed with error: ", err.Error()) + return defaultNode } + logp.Info("kubernetes: Using node %s discovered by in cluster pod node query", pod.Spec.NodeSelector) + return pod.Spec.NodeName } - return client, nil + mid := machineID() + if mid == "" { + logp.Err("kubernetes: Couldn't collect info from any of the files in /etc/machine-id /var/lib/dbus/machine-id") + return defaultNode + } + + nodes, err := clientset.CoreV1().Nodes().List(v1.ListOptions{}) + if err != nil { + logp.Err("kubernetes: Querying for nodes failed with error: ", err.Error()) + return defaultNode + } + for _, n := range nodes.Items { + if n.Status.NodeInfo.MachineID == mid { + logp.Info("kubernetes: Using node %s discovered by machine-id matching", n.GetName()) + return n.GetName() + } + } + + logp.Warn("kubernetes: Couldn't discover node, using localhost as default") + return defaultNode } -// DiscoverKubernetesNode figures out the Kubernetes host to use. If host is provided in the config -// use it directly. Else use hostname of the pod which is the Pod ID to query the Pod and get the Node -// name from the specification. Else, return localhost as a default. -func DiscoverKubernetesNode(host string, client *k8s.Client) string { - ctx := context.Background() - if host == "" { - podName := os.Getenv("HOSTNAME") - logp.Info("Using pod name %s and namespace %s", podName, client.Namespace) - if podName == "localhost" { - host = "localhost" - } else { - pod, err := client.CoreV1().GetPod(ctx, podName, client.Namespace) - if err != nil { - logp.Err("Querying for pod failed with error: ", err.Error()) - logp.Info("Unable to find pod, setting host to localhost") - host = "localhost" - } else { - host = pod.Spec.GetNodeName() - } +// inClusterNamespace gets namespace from serviceaccount when beat is in cluster. +// code borrowed from client-go with some changes. +func inClusterNamespace() string { + // This way assumes you've set the POD_NAMESPACE environment variable using the downward API. + // This check has to be done first for backwards compatibility with the way InClusterConfig was originally set up + if ns := os.Getenv("POD_NAMESPACE"); ns != "" { + return ns + } + // Fall back to the namespace associated with the service account token, if available + if data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { + if ns := strings.TrimSpace(string(data)); len(ns) > 0 { + return ns } } - return host + return "default" +} + +// machineID borrowed from cadvisor. +func machineID() string { + for _, file := range []string{ + "/etc/machine-id", + "/var/lib/dbus/machine-id", + } { + id, err := ioutil.ReadFile(file) + if err == nil { + return strings.TrimSpace(string(id)) + } + } + return "" } diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index 24af7d240015..b669378d6e8c 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -2,17 +2,23 @@ package kubernetes import ( "context" - "errors" - "sync" + "encoding/json" + "fmt" "time" - "github.com/elastic/beats/libbeat/common/bus" - "github.com/elastic/beats/libbeat/logp" - - "github.com/ericchiang/k8s" - corev1 "github.com/ericchiang/k8s/api/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/informers/internalinterfaces" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" ) +func filterByNode(node string) internalinterfaces.TweakListOptionsFunc { + return func(opts *metav1.ListOptions) { + opts.FieldSelector = "spec.nodeName=" + node + } +} + // Watcher reads Kubernetes events and keeps a list of known pods type Watcher interface { // Start watching Kubernetes API for new containers @@ -21,241 +27,89 @@ type Watcher interface { // Stop watching Kubernetes API for new containers Stop() - // ListenStart returns a bus listener to receive pod started events, with a `pod` key holding it - ListenStart() bus.Listener - - // ListenUpdate returns a bus listener to receive pod updated events, with a `pod` key holding it - ListenUpdate() bus.Listener - - // ListenStop returns a bus listener to receive pod stopped events, with a `pod` key holding it - ListenStop() bus.Listener + AddEventHandler(ResourceEventHandler) } -type podWatcher struct { - sync.RWMutex - client Client - syncPeriod time.Duration - cleanupTimeout time.Duration - nodeFilter k8s.Option - lastResourceVersion string - ctx context.Context - stop context.CancelFunc - bus bus.Bus - pods map[string]*Pod // pod id -> Pod - deleted map[string]time.Time // deleted annotations key -> last access time -} - -// Client for Kubernetes interface -type Client interface { - ListPods(ctx context.Context, namespace string, options ...k8s.Option) (*corev1.PodList, error) - WatchPods(ctx context.Context, namespace string, options ...k8s.Option) (*k8s.CoreV1PodWatcher, error) +type watcher struct { + factory informers.SharedInformerFactory + informer cache.SharedIndexInformer + syncPeriod time.Duration + objToResource func(obj interface{}) Resource + stop chan struct{} } // NewWatcher initializes the watcher client to provide a local state of -// pods from the cluster (filtered to the given host) -func NewWatcher(client Client, syncPeriod, cleanupTimeout time.Duration, host string) Watcher { - ctx, cancel := context.WithCancel(context.Background()) - return &podWatcher{ - client: client, - cleanupTimeout: cleanupTimeout, - syncPeriod: syncPeriod, - nodeFilter: k8s.QueryParam("fieldSelector", "spec.nodeName="+host), - lastResourceVersion: "0", - ctx: ctx, - stop: cancel, - pods: make(map[string]*Pod), - deleted: make(map[string]time.Time), - bus: bus.New("kubernetes"), - } -} - -func (p *podWatcher) syncPods() error { - logp.Info("kubernetes: %s", "Performing a pod sync") - pods, err := p.client.ListPods( - p.ctx, - "", - p.nodeFilter, - k8s.ResourceVersion(p.lastResourceVersion)) - - if err != nil { - return err +// pod from the cluster (filtered to the given node) +func NewWatcher(clientset kubernetes.Interface, syncPeriod time.Duration, node, namespace string, r Resource) (Watcher, error) { + var tf internalinterfaces.TweakListOptionsFunc + if node != "" { + tf = filterByNode(node) } - - p.Lock() - for _, apiPod := range pods.Items { - pod := GetPod(apiPod) - p.pods[pod.Metadata.UID] = pod + f := informers.NewFilteredSharedInformerFactory( + clientset, + 0, + namespace, + tf, + ) + w := &watcher{ + factory: f, + syncPeriod: syncPeriod, + stop: make(chan struct{}), } - p.Unlock() - - // Emit all start events (avoid blocking if the bus get's blocked) - go func() { - for _, pod := range p.pods { - p.bus.Publish(bus.Event{ - "start": true, - "pod": pod, - }) + switch r.(type) { + case *Pod: + w.informer = f.Core().V1().Pods().Informer() + w.objToResource = func(obj interface{}) Resource { + bytes, _ := json.Marshal(obj) + r := &Pod{} + json.Unmarshal(bytes, r) + return r } - }() - - // Store last version - p.lastResourceVersion = pods.Metadata.GetResourceVersion() + case *Event: + w.informer = f.Events().V1beta1().Events().Informer() + w.objToResource = func(obj interface{}) Resource { + bytes, _ := json.Marshal(obj) + r := &Event{} + json.Unmarshal(bytes, r) + return r + } + default: + return nil, fmt.Errorf("unsupported resource type for watching %T", r) + } - logp.Info("kubernetes: %s", "Pod sync done") - return nil + return w, nil } -// Start watching pods -func (p *podWatcher) Start() error { +func (w *watcher) AddEventHandler(h ResourceEventHandler) { + w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + h.OnAdd(w.objToResource(obj)) + }, + UpdateFunc: func(old, new interface{}) { + h.OnUpdate(w.objToResource(old), w.objToResource(new)) + }, + DeleteFunc: func(obj interface{}) { + h.OnDelete(w.objToResource(obj)) + }, + }) +} +// Start watching resources +func (w *watcher) Start() error { + w.factory.Start(w.stop) // Make sure that events don't flow into the annotator before informer is fully set up // Sync initial state: - synced := make(chan struct{}) - go func() { - p.syncPods() - close(synced) - }() - - select { - case <-time.After(p.syncPeriod): - p.Stop() - return errors.New("Timeout while doing initial Kubernetes pods sync") - case <-synced: - // Watch for new changes - go p.watch() - go p.cleanupWorker() - return nil - } -} - -func (p *podWatcher) watch() { - for { - logp.Info("kubernetes: %s", "Watching API for pod events") - watcher, err := p.client.WatchPods(p.ctx, "", p.nodeFilter) - if err != nil { - //watch pod failures should be logged and gracefully failed over as metadata retrieval - //should never stop. - logp.Err("kubernetes: Watching API error %v", err) - time.Sleep(time.Second) - continue - } - - for { - _, apiPod, err := watcher.Next() - if err != nil { - logp.Err("kubernetes: Watching API error %v", err) - watcher.Close() - break - } - - pod := GetPod(apiPod) - if pod.Metadata.DeletionTimestamp != "" { - // Pod deleted - p.Lock() - p.deleted[pod.Metadata.UID] = time.Now() - p.Unlock() - - } else { - if p.Pod(pod.Metadata.UID) != nil { - // Pod updated - p.Lock() - p.pods[pod.Metadata.UID] = pod - // un-delete if it's flagged (in case of update or recreation) - delete(p.deleted, pod.Metadata.UID) - p.Unlock() - - p.bus.Publish(bus.Event{ - "update": true, - "pod": pod, - }) - - } else { - // Pod added - p.Lock() - p.pods[pod.Metadata.UID] = pod - // un-delete if it's flagged (in case of update or recreation) - delete(p.deleted, pod.Metadata.UID) - p.Unlock() - - p.bus.Publish(bus.Event{ - "start": true, - "pod": pod, - }) - } - } - } - } -} - -// Check annotations flagged as deleted for their last access time, fully delete -// the ones older than p.cleanupTimeout -func (p *podWatcher) cleanupWorker() { - for { - // Wait a full period - time.Sleep(p.cleanupTimeout) - - select { - case <-p.ctx.Done(): - return - default: - // Check entries for timeout - var toDelete []string - timeout := time.Now().Add(-p.cleanupTimeout) - p.RLock() - for key, lastSeen := range p.deleted { - if lastSeen.Before(timeout) { - toDelete = append(toDelete, key) - } - } - p.RUnlock() - - // Delete timed out entries: - p.Lock() - for _, key := range toDelete { - p.bus.Publish(bus.Event{ - "stop": true, - "pod": p.Pod(key), - }) - - delete(p.deleted, key) - delete(p.pods, key) - } - p.Unlock() + ctx, cancl := context.WithTimeout(context.Background(), w.syncPeriod) + defer cancl() + for t, finished := range w.factory.WaitForCacheSync(ctx.Done()) { + if !finished { + return fmt.Errorf("kubernetes: Timeout while doing initial Kubernetes sync for %s", t) } } -} - -func (p *podWatcher) Pod(uid string) *Pod { - p.RLock() - pod := p.pods[uid] - _, deleted := p.deleted[uid] - p.RUnlock() - - // Update deleted last access - if deleted { - p.Lock() - p.deleted[uid] = time.Now() - p.Unlock() - } - - return pod -} - -// ListenStart returns a bus listener to receive pod started events, with a `pod` key holding it -func (p *podWatcher) ListenStart() bus.Listener { - return p.bus.Subscribe("start") -} -// ListenStop returns a bus listener to receive pod stopped events, with a `pod` key holding it -func (p *podWatcher) ListenStop() bus.Listener { - return p.bus.Subscribe("stop") -} - -// ListenUpdate returns a bus listener to receive updated pod events, with a `pod` key holding it -func (p *podWatcher) ListenUpdate() bus.Listener { - return p.bus.Subscribe("update") + return nil } -func (p *podWatcher) Stop() { - p.stop() +func (w *watcher) Stop() { + close(w.stop) } diff --git a/libbeat/processors/add_kubernetes_metadata/cache.go b/libbeat/processors/add_kubernetes_metadata/cache.go new file mode 100644 index 000000000000..e1bf9229cbae --- /dev/null +++ b/libbeat/processors/add_kubernetes_metadata/cache.go @@ -0,0 +1,69 @@ +package add_kubernetes_metadata + +import ( + "sync" + "time" + + "github.com/elastic/beats/libbeat/common" +) + +type cache struct { + sync.RWMutex + timeout time.Duration + deleted map[string]time.Time // key -> when should this obj be deleted + metadata map[string]common.MapStr +} + +func newCache(cleanupTimeout time.Duration) *cache { + c := &cache{ + timeout: cleanupTimeout, + deleted: make(map[string]time.Time), + metadata: make(map[string]common.MapStr), + } + go c.cleanup() + return c +} + +func (c *cache) get(key string) common.MapStr { + c.Lock() + defer c.Unlock() + // add lifecycle if key was queried + if t, ok := c.deleted[key]; ok { + c.deleted[key] = t.Add(c.timeout) + } + return c.metadata[key] +} + +func (c *cache) delete(key string) { + c.Lock() + defer c.Unlock() + c.deleted[key] = time.Now().Add(c.timeout) +} + +func (c *cache) set(key string, data common.MapStr) { + c.Lock() + defer c.Unlock() + delete(c.deleted, key) + c.metadata[key] = data +} + +func (c *cache) cleanup() { + ticker := time.Tick(timeout) + space := []string{} + for now := range ticker { + shouldDelete := space[:0] + c.RLock() + for k, t := range c.deleted { + if now.After(t) { + shouldDelete = append(shouldDelete, k) + } + } + c.RUnlock() + c.Lock() + for _, k := range shouldDelete { + delete(c.deleted, k) + delete(c.metadata, k) + } + c.Unlock() + } +} diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index bf7e49c9fa85..ec9b6f9658ce 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -3,12 +3,10 @@ package add_kubernetes_metadata import ( "errors" "fmt" - "sync" "time" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/bus" "github.com/elastic/beats/libbeat/common/kubernetes" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" @@ -18,19 +16,11 @@ const ( timeout = time.Second * 5 ) -var ( - fatalError = errors.New("Unable to create kubernetes processor") -) - type kubernetesAnnotator struct { - sync.RWMutex - watcher kubernetes.Watcher - startListener bus.Listener - stopListener bus.Listener - updateListener bus.Listener - indexers *Indexers - matchers *Matchers - metadata map[string]common.MapStr + watcher kubernetes.Watcher + indexers *Indexers + matchers *Matchers + cache *cache } func init() { @@ -84,41 +74,47 @@ func newKubernetesAnnotator(cfg *common.Config) (processors.Processor, error) { return nil, fmt.Errorf("Can not initialize kubernetes plugin with zero matcher plugins") } - client, err := kubernetes.GetKubernetesClient(config.InCluster, config.KubeConfig) + client, err := kubernetes.GetKubernetesClientset(config.InCluster, config.KubeConfig) if err != nil { return nil, err } - config.Host = kubernetes.DiscoverKubernetesNode(config.Host, client) + config.Host = kubernetes.DiscoverKubernetesNode(config.Host, config.InCluster, client) logp.Debug("kubernetes", "Using host ", config.Host) logp.Debug("kubernetes", "Initializing watcher") - if client != nil { - watcher := kubernetes.NewWatcher(client.CoreV1(), config.SyncPeriod, config.CleanupTimeout, config.Host) - start := watcher.ListenStart() - stop := watcher.ListenStop() - update := watcher.ListenUpdate() - - processor := &kubernetesAnnotator{ - watcher: watcher, - indexers: indexers, - matchers: matchers, - metadata: make(map[string]common.MapStr, 0), - startListener: start, - stopListener: stop, - updateListener: update, - } - // Start worker - go processor.worker() + watcher, err := kubernetes.NewWatcher(client, config.SyncPeriod, config.Host, config.Namespace, &kubernetes.Pod{}) + if err != nil { + logp.Err("kubernetes: Couldn't create watcher for %t", &kubernetes.Pod{}) + return nil, err + } + + processor := &kubernetesAnnotator{ + watcher: watcher, + indexers: indexers, + matchers: matchers, + cache: newCache(config.CleanupTimeout), + } - if err := watcher.Start(); err != nil { - return nil, err - } - return processor, nil + watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ + AddFunc: func(obj kubernetes.Resource) { + processor.addPod(obj.(*kubernetes.Pod)) + }, + UpdateFunc: func(old, new kubernetes.Resource) { + processor.removePod(old.(*kubernetes.Pod)) + processor.addPod(new.(*kubernetes.Pod)) + }, + DeleteFunc: func(obj kubernetes.Resource) { + processor.removePod(obj.(*kubernetes.Pod)) + }, + }) + + if err := watcher.Start(); err != nil { + return nil, err } - return nil, fatalError + return processor, nil } func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) { @@ -127,9 +123,7 @@ func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) { return event, nil } - k.RLock() - metadata := k.metadata[index] - k.RUnlock() + metadata := k.cache.get(index) if metadata == nil { return event, nil } @@ -148,48 +142,17 @@ func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) { return event, nil } -// worker watches pod events and keeps a map of metadata -func (k *kubernetesAnnotator) worker() { - for { - select { - case event := <-k.startListener.Events(): - processEvent(k.addPod, event) - - case event := <-k.stopListener.Events(): - processEvent(k.removePod, event) - - case event := <-k.updateListener.Events(): - processEvent(k.removePod, event) - processEvent(k.addPod, event) - } - } -} - -// Run pod actions while handling errors -func processEvent(f func(pod *kubernetes.Pod), event bus.Event) { - pod, ok := event["pod"].(*kubernetes.Pod) - if !ok { - logp.Err("Couldn't get a pod from watcher event") - return - } - f(pod) -} - func (k *kubernetesAnnotator) addPod(pod *kubernetes.Pod) { metadata := k.indexers.GetMetadata(pod) - k.Lock() - defer k.Unlock() for _, m := range metadata { - k.metadata[m.Index] = m.Data + k.cache.set(m.Index, m.Data) } } func (k *kubernetesAnnotator) removePod(pod *kubernetes.Pod) { indexes := k.indexers.GetIndexes(pod) - k.Lock() - defer k.Unlock() for _, idx := range indexes { - delete(k.metadata, idx) + k.cache.delete(idx) } } From fd8c82513c9b2ffe6448e970c0194f01b3108ffc Mon Sep 17 00:00:00 2001 From: Zhiheng Date: Sat, 20 Jan 2018 04:00:28 +0800 Subject: [PATCH 2/2] Metricbeat k8s event module refactor to use libbeat --- metricbeat/module/kubernetes/event/event.go | 81 ++++++------ metricbeat/module/kubernetes/event/types.go | 46 ------- metricbeat/module/kubernetes/event/watcher.go | 118 ------------------ 3 files changed, 39 insertions(+), 206 deletions(-) delete mode 100644 metricbeat/module/kubernetes/event/types.go delete mode 100644 metricbeat/module/kubernetes/event/watcher.go diff --git a/metricbeat/module/kubernetes/event/event.go b/metricbeat/module/kubernetes/event/event.go index 50d64e391ee5..7a6da32477ec 100644 --- a/metricbeat/module/kubernetes/event/event.go +++ b/metricbeat/module/kubernetes/event/event.go @@ -2,14 +2,13 @@ package event import ( "fmt" - "io/ioutil" + "time" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/common/kubernetes" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" - - "github.com/ericchiang/k8s" - "github.com/ghodss/yaml" ) // init registers the MetricSet with the central registry. @@ -25,7 +24,7 @@ func init() { // MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling. type MetricSet struct { mb.BaseMetricSet - watcher *Watcher + watcher kubernetes.Watcher } // New create a new instance of the MetricSet @@ -41,30 +40,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return nil, fmt.Errorf("fail to unpack the kubernetes event configuration: %s", err) } - var client *k8s.Client - if config.InCluster == true { - client, err = k8s.NewInClusterClient() - if err != nil { - return nil, fmt.Errorf("Unable to get in cluster configuration") - } - } else { - data, err := ioutil.ReadFile(config.KubeConfig) - if err != nil { - return nil, fmt.Errorf("read kubeconfig: %v", err) - } - - // Unmarshal YAML into a Kubernetes config object. - var config k8s.Config - if err = yaml.Unmarshal(data, &config); err != nil { - return nil, fmt.Errorf("unmarshal kubeconfig: %v", err) - } - client, err = k8s.NewClient(&config) - if err != nil { - return nil, err - } + client, err := kubernetes.GetKubernetesClientset(config.InCluster, config.KubeConfig) + if err != nil { + return nil, err } - watcher := NewWatcher(client, config.SyncPeriod, config.Namespace) + watcher, err := kubernetes.NewWatcher(client, config.SyncPeriod, "", config.Namespace, &kubernetes.Event{}) + if err != nil { + logp.Err("kubernetes: Couldn't create watcher for %t", &kubernetes.Event{}) + return nil, err + } return &MetricSet{ BaseMetricSet: base, @@ -74,24 +59,36 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Run method provides the Kubernetes event watcher with a reporter with which events can be reported. func (m *MetricSet) Run(reporter mb.PushReporter) { - // Start event watcher - m.watcher.Run() - - for { - select { - case <-reporter.Done(): - m.watcher.Stop() - return - case msg := <-m.watcher.eventQueue: - // Ignore events that are deleted - if msg.Metadata.DeletionTimestamp == nil { - reporter.Event(generateMapStrFromEvent(msg)) - } - } + now := time.Now() + handler := kubernetes.ResourceEventHandlerFuncs{ + AddFunc: func(obj kubernetes.Resource) { + reporter.Event(generateMapStrFromEvent(obj.(*kubernetes.Event))) + }, + UpdateFunc: func(_, new kubernetes.Resource) { + reporter.Event(generateMapStrFromEvent(new.(*kubernetes.Event))) + }, + // ignore events that are deleted + DeleteFunc: nil, } + m.watcher.AddEventHandler(kubernetes.FilteringResourceEventHandler{ + // skip events happened before watch + FilterFunc: func(obj kubernetes.Resource) bool { + eve := obj.(*kubernetes.Event) + if eve.LastTimestamp.Before(now) { + return false + } + return true + }, + Handler: handler, + }) + // start event watcher + m.watcher.Start() + <-reporter.Done() + m.watcher.Stop() + return } -func generateMapStrFromEvent(eve *Event) common.MapStr { +func generateMapStrFromEvent(eve *kubernetes.Event) common.MapStr { eventMeta := common.MapStr{ "timestamp": common.MapStr{ "created": eve.Metadata.CreationTimestamp, diff --git a/metricbeat/module/kubernetes/event/types.go b/metricbeat/module/kubernetes/event/types.go deleted file mode 100644 index 2abb51bf3d84..000000000000 --- a/metricbeat/module/kubernetes/event/types.go +++ /dev/null @@ -1,46 +0,0 @@ -package event - -import "time" - -type ObjectMeta struct { - Annotations map[string]string `json:"annotations"` - CreationTimestamp *time.Time `json:"creationTimestamp"` - DeletionTimestamp *time.Time `json:"deletionTimestamp"` - GenerateName string `json:"generateName"` - Labels map[string]string `json:"labels"` - Name string `json:"name"` - Namespace string `json:"namespace"` - OwnerReferences []struct { - APIVersion string `json:"apiVersion"` - Controller bool `json:"controller"` - Kind string `json:"kind"` - Name string `json:"name"` - UID string `json:"uid"` - } `json:"ownerReferences"` - ResourceVersion string `json:"resourceVersion"` - SelfLink string `json:"selfLink"` - UID string `json:"uid"` -} - -type Event struct { - APIVersion string `json:"apiVersion"` - Count int64 `json:"count"` - FirstTimestamp *time.Time `json:"firstTimestamp"` - InvolvedObject struct { - APIVersion string `json:"apiVersion"` - Kind string `json:"kind"` - Name string `json:"name"` - ResourceVersion string `json:"resourceVersion"` - UID string `json:"uid"` - } `json:"involvedObject"` - Kind string `json:"kind"` - LastTimestamp *time.Time `json:"lastTimestamp"` - Message string `json:"message"` - Metadata ObjectMeta `json:"metadata"` - Reason string `json:"reason"` - Source struct { - Component string `json:"component"` - Host string `json:"host"` - } `json:"source"` - Type string `json:"type"` -} diff --git a/metricbeat/module/kubernetes/event/watcher.go b/metricbeat/module/kubernetes/event/watcher.go deleted file mode 100644 index 3ebe7d7df1ad..000000000000 --- a/metricbeat/module/kubernetes/event/watcher.go +++ /dev/null @@ -1,118 +0,0 @@ -package event - -import ( - "context" - "encoding/json" - "time" - - "github.com/elastic/beats/libbeat/logp" - - "github.com/ericchiang/k8s" - corev1 "github.com/ericchiang/k8s/api/v1" -) - -// Watcher is a controller that synchronizes Pods. -type Watcher struct { - kubeClient *k8s.Client - namespace string - syncPeriod time.Duration - eventQueue chan *Event - lastResourceVersion string - ctx context.Context - stop context.CancelFunc -} - -// NewWatcher initializes the watcher client to provide a local state of -// pods from the cluster (filtered to the given host) -func NewWatcher(kubeClient *k8s.Client, syncPeriod time.Duration, namespace string) *Watcher { - ctx, cancel := context.WithCancel(context.Background()) - return &Watcher{ - kubeClient: kubeClient, - namespace: namespace, - syncPeriod: syncPeriod, - eventQueue: make(chan *Event, 10), - lastResourceVersion: "0", - ctx: ctx, - stop: cancel, - } -} - -// watchEvents watches on the Kubernetes API server and puts them onto a channel. -// watchEvents only starts from the most recent event. -func (w *Watcher) watchEvents() { - for { - //To avoid writing old events, list events to get last resource version - events, err := w.kubeClient.CoreV1().ListEvents( - w.ctx, - w.namespace, - ) - - if err != nil { - //if listing fails try again after sometime - logp.Err("kubernetes: List API error %v", err) - // Sleep for a second to prevent API server from being bombarded - // API server could be down - time.Sleep(time.Second) - continue - } - - w.lastResourceVersion = events.Metadata.GetResourceVersion() - - logp.Info("kubernetes: %s", "Watching API for events") - watcher, err := w.kubeClient.CoreV1().WatchEvents( - w.ctx, - w.namespace, - k8s.ResourceVersion(w.lastResourceVersion), - ) - if err != nil { - //watch events failures should be logged and gracefully failed over as metadata retrieval - //should never stop. - logp.Err("kubernetes: Watching API eror %v", err) - // Sleep for a second to prevent API server from being bombarded - // API server could be down - time.Sleep(time.Second) - continue - } - - for { - _, eve, err := watcher.Next() - if err != nil { - logp.Err("kubernetes: Watching API error %v", err) - break - } - - event := w.getEventMeta(eve) - if event != nil { - w.eventQueue <- event - } - - } - } -} - -func (w *Watcher) Run() { - // Start watching on events - go w.watchEvents() -} - -func (w *Watcher) getEventMeta(pod *corev1.Event) *Event { - bytes, err := json.Marshal(pod) - if err != nil { - logp.Warn("Unable to marshal %v", pod.String()) - return nil - } - - eve := &Event{} - err = json.Unmarshal(bytes, eve) - if err != nil { - logp.Warn("Unable to marshal %v", pod.String()) - return nil - } - - return eve -} - -func (w *Watcher) Stop() { - w.stop() - close(w.eventQueue) -}