Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kubernetes watcher refactor #6159

Merged
merged 3 commits into from
Jan 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ SoundCloud Ltd. (http://soundcloud.com/).

--------------------------------------------------------------------
Dependency: github.com/ericchiang/k8s
Revision: 5803ed75e31fc1998b5f781ac08e22ff985c3f8f
Version: v1.0.0
Revision: 5912993f00cb7c971aaa54529a06bd3eecd6c3d4
License type (autodetected): Apache-2.0
./vendor/github.com/ericchiang/k8s/LICENSE:
--------------------------------------------------------------------
Expand Down
97 changes: 41 additions & 56 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kubernetes

import (
"time"

"github.com/elastic/beats/libbeat/autodiscover"
"github.com/elastic/beats/libbeat/autodiscover/template"
"github.com/elastic/beats/libbeat/common"
Expand All @@ -15,15 +17,11 @@ func init() {

// Provider implements autodiscover provider for docker containers
type Provider struct {
config *Config
bus bus.Bus
watcher kubernetes.Watcher
metagen kubernetes.MetaGenerator
templates *template.Mapper
stop chan interface{}
startListener bus.Listener
stopListener bus.Listener
updateListener bus.Listener
config *Config
bus bus.Bus
watcher kubernetes.Watcher
metagen kubernetes.MetaGenerator
templates *template.Mapper
}

// AutodiscoverBuilder builds and returns an autodiscover provider
Expand All @@ -47,63 +45,50 @@ func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider,
metagen := kubernetes.NewMetaGenerator(config.IncludeAnnotations, config.IncludeLabels, config.ExcludeLabels)

config.Host = kubernetes.DiscoverKubernetesNode(config.Host, config.InCluster, client)
watcher := kubernetes.NewWatcher(client.CoreV1(), config.SyncPeriod, config.CleanupTimeout, config.Host)

start := watcher.ListenStart()
stop := watcher.ListenStop()
update := watcher.ListenUpdate()
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Host,
Namespace: config.Namespace,
})
if err != nil {
logp.Err("kubernetes: Couldn't create watcher for %t", &kubernetes.Pod{})
return nil, err
}

p := &Provider{
config: config,
bus: bus,
templates: mapper,
metagen: metagen,
watcher: watcher,
}

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj kubernetes.Resource) {
p.emit(obj.(*kubernetes.Pod), "start")
},
UpdateFunc: func(obj kubernetes.Resource) {
p.emit(obj.(*kubernetes.Pod), "stop")
p.emit(obj.(*kubernetes.Pod), "start")
},
DeleteFunc: func(obj kubernetes.Resource) {
time.AfterFunc(config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
},
})

if err := watcher.Start(); err != nil {
return nil, err
}

return &Provider{
config: config,
bus: bus,
templates: mapper,
metagen: metagen,
watcher: watcher,
stop: make(chan interface{}),
startListener: start,
stopListener: stop,
updateListener: update,
}, nil
return p, nil
}

// Start the autodiscover provider. Start and stop listeners work the
// conventional way. Update listener triggers a stop and then a start
// to simulate an update.
// Start for Runner interface.
func (p *Provider) Start() {
go func() {
for {
select {
case <-p.stop:
p.startListener.Stop()
p.stopListener.Stop()
return

case event := <-p.startListener.Events():
p.emit(event, "start")

case event := <-p.stopListener.Events():
p.emit(event, "stop")

case event := <-p.updateListener.Events():
//On updates, first send a stop signal followed by a start signal to simulate a restart
p.emit(event, "stop")
p.emit(event, "start")
}
}
}()
}

func (p *Provider) emit(event bus.Event, flag string) {
pod, ok := event["pod"].(*kubernetes.Pod)
if !ok {
logp.Err("Couldn't get a pod from watcher event")
return
}

func (p *Provider) emit(pod *kubernetes.Pod, flag string) {
host := pod.Status.PodIP

// Emit pod container IDs
Expand Down Expand Up @@ -171,7 +156,7 @@ func (p *Provider) publish(event bus.Event) {

// Stop signals the stop channel to force the watch loop routine to stop.
func (p *Provider) Stop() {
close(p.stop)
p.watcher.Stop()
}

// String returns a description of kubernetes autodiscover provider.
Expand Down
84 changes: 84 additions & 0 deletions libbeat/common/kubernetes/eventhandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package kubernetes

// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// * OnAdd is called when an object is added.
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// * OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
type ResourceEventHandler interface {
OnAdd(obj Resource)
OnUpdate(obj Resource)
OnDelete(obj Resource)
}

// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj Resource)
UpdateFunc func(obj Resource)
DeleteFunc func(obj Resource)
}

// OnAdd calls AddFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnAdd(obj Resource) {
if r.AddFunc != nil {
r.AddFunc(obj)
}
}

// OnUpdate calls UpdateFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnUpdate(obj Resource) {
if r.UpdateFunc != nil {
r.UpdateFunc(obj)
}
}

// OnDelete calls DeleteFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnDelete(obj Resource) {
if r.DeleteFunc != nil {
r.DeleteFunc(obj)
}
}

// FilteringResourceEventHandler applies the provided filter to all events coming
// in, ensuring the appropriate nested handler method is invoked. An object
// that starts passing the filter after an update is considered an add, and an
// object that stops passing the filter after an update is considered a delete.
type FilteringResourceEventHandler struct {
FilterFunc func(obj Resource) bool
Handler ResourceEventHandler
}

// OnAdd calls the nested handler only if the filter succeeds
func (r FilteringResourceEventHandler) OnAdd(obj Resource) {
if !r.FilterFunc(obj) {
return
}
r.Handler.OnAdd(obj)
}

// OnUpdate ensures the proper handler is called depending on whether the filter matches
func (r FilteringResourceEventHandler) OnUpdate(obj Resource) {
if !r.FilterFunc(obj) {
return
}
r.Handler.OnUpdate(obj)
}

// OnDelete calls the nested handler only if the filter succeeds
func (r FilteringResourceEventHandler) OnDelete(obj Resource) {
if !r.FilterFunc(obj) {
return
}
r.Handler.OnDelete(obj)
}
69 changes: 51 additions & 18 deletions libbeat/common/kubernetes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,29 @@ package kubernetes
import (
"encoding/json"
"strings"
"time"

"github.com/elastic/beats/libbeat/logp"

corev1 "github.com/ericchiang/k8s/api/v1"
"github.com/ericchiang/k8s"
"github.com/ericchiang/k8s/apis/core/v1"
)

func init() {
k8s.Register("", "v1", "events", true, &v1.Event{})
k8s.RegisterList("", "v1", "events", true, &v1.EventList{})
}

// Resource is kind of kubernetes resource like pod, event, etc...
// It has a GetMetadata method for getting ObjectMeta which containing useful info like labels
type Resource interface {
GetMetadata() *ObjectMeta
}

func resourceConverter(k8sObj k8s.Resource, r Resource) Resource {
bytes, _ := json.Marshal(k8sObj)
json.Unmarshal(bytes, r)
return r
}

type ObjectMeta struct {
Annotations map[string]string `json:"annotations"`
CreationTimestamp string `json:"creationTimestamp"`
Expand Down Expand Up @@ -108,6 +125,11 @@ type Pod struct {
Status PodStatus `json:"status"`
}

// GetMetadata implements Resource
func (p *Pod) GetMetadata() *ObjectMeta {
return &p.Metadata
}

// GetContainerID parses the container ID to get the actual ID string
func (s *PodContainerStatus) GetContainerID() string {
cID := s.ContainerID
Expand All @@ -120,20 +142,31 @@ func (s *PodContainerStatus) GetContainerID() string {
return ""
}

// GetPod converts Pod to our own type
func GetPod(pod *corev1.Pod) *Pod {
bytes, err := json.Marshal(pod)
if err != nil {
logp.Warn("Unable to marshal %v", pod.String())
return nil
}

po := &Pod{}
err = json.Unmarshal(bytes, po)
if err != nil {
logp.Warn("Unable to marshal %v", pod.String())
return nil
}
// Event is kubernetes event
type Event struct {
APIVersion string `json:"apiVersion"`
Count int64 `json:"count"`
FirstTimestamp *time.Time `json:"firstTimestamp"`
InvolvedObject struct {
APIVersion string `json:"apiVersion"`
Kind string `json:"kind"`
Name string `json:"name"`
ResourceVersion string `json:"resourceVersion"`
UID string `json:"uid"`
} `json:"involvedObject"`
Kind string `json:"kind"`
LastTimestamp *time.Time `json:"lastTimestamp"`
Message string `json:"message"`
Metadata ObjectMeta `json:"metadata"`
Reason string `json:"reason"`
Source struct {
Component string `json:"component"`
Host string `json:"host"`
} `json:"source"`
Type string `json:"type"`
}

return po
// GetMetadata implements Resource
func (e *Event) GetMetadata() *ObjectMeta {
return &e.Metadata
}
7 changes: 5 additions & 2 deletions libbeat/common/kubernetes/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/ericchiang/k8s"
"github.com/ericchiang/k8s/apis/core/v1"
"github.com/ghodss/yaml"

"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -66,7 +67,8 @@ func DiscoverKubernetesNode(host string, inCluster bool, client *k8s.Client) (no
return defaultNode
}
logp.Info("kubernetes: Using pod name %s and namespace %s to discover kubernetes node", podName, ns)
pod, err := client.CoreV1().GetPod(context.TODO(), podName, ns)
pod := v1.Pod{}
err = client.Get(context.TODO(), ns, podName, &pod)
if err != nil {
logp.Err("kubernetes: Querying for pod failed with error: ", err.Error())
return defaultNode
Expand All @@ -81,7 +83,8 @@ func DiscoverKubernetesNode(host string, inCluster bool, client *k8s.Client) (no
return defaultNode
}

nodes, err := client.CoreV1().ListNodes(context.TODO())
nodes := v1.NodeList{}
err := client.List(context.TODO(), k8s.AllNamespaces, &nodes)
if err != nil {
logp.Err("kubernetes: Querying for nodes failed with error: ", err.Error())
return defaultNode
Expand Down
Loading