Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Discuss&POC] refactor k8s client to k8s.io/client-go #6117

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 40 additions & 60 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@ func init() {

// Provider implements autodiscover provider for docker containers
type Provider struct {
config *Config
bus bus.Bus
watcher kubernetes.Watcher
metagen kubernetes.MetaGenerator
templates *template.Mapper
stop chan interface{}
startListener bus.Listener
stopListener bus.Listener
updateListener bus.Listener
config *Config
bus bus.Bus
watcher kubernetes.Watcher
metagen kubernetes.MetaGenerator
templates *template.Mapper
stop chan interface{}
}

// AutodiscoverBuilder builds and returns an autodiscover provider
Expand All @@ -39,71 +36,54 @@ func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider,
return nil, err
}

client, err := kubernetes.GetKubernetesClient(config.InCluster, config.KubeConfig)
client, err := kubernetes.GetKubernetesClientset(config.InCluster, config.KubeConfig)
if err != nil {
return nil, err
}

metagen := kubernetes.NewMetaGenerator(config.IncludeAnnotations, config.IncludeLabels, config.ExcludeLabels)

config.Host = kubernetes.DiscoverKubernetesNode(config.Host, client)
watcher := kubernetes.NewWatcher(client.CoreV1(), config.SyncPeriod, config.CleanupTimeout, config.Host)
config.Host = kubernetes.DiscoverKubernetesNode(config.Host, config.InCluster, client)
watcher, err := kubernetes.NewWatcher(client, config.SyncPeriod, config.Host, config.Namespace, &kubernetes.Pod{})
if err != nil {
logp.Err("kubernetes: Couldn't create watcher for %t", &kubernetes.Pod{})
return nil, err
}

p := &Provider{
config: config,
bus: bus,
templates: mapper,
metagen: metagen,
watcher: watcher,
stop: make(chan interface{}),
}

start := watcher.ListenStart()
stop := watcher.ListenStop()
update := watcher.ListenUpdate()
watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj kubernetes.Resource) {
p.emit(obj.(*kubernetes.Pod), "start")
},
UpdateFunc: func(old, new kubernetes.Resource) {
p.emit(old.(*kubernetes.Pod), "stop")
p.emit(new.(*kubernetes.Pod), "start")
},
DeleteFunc: func(obj kubernetes.Resource) {
p.emit(obj.(*kubernetes.Pod), "stop")
},
})

if err := watcher.Start(); err != nil {
return nil, err
}

return &Provider{
config: config,
bus: bus,
templates: mapper,
metagen: metagen,
watcher: watcher,
stop: make(chan interface{}),
startListener: start,
stopListener: stop,
updateListener: update,
}, nil
return p, nil
}

// Start the autodiscover provider. Start and stop listeners work the
// conventional way. Update listener triggers a stop and then a start
// to simulate an update.
func (p *Provider) Start() {
go func() {
for {
select {
case <-p.stop:
p.startListener.Stop()
p.stopListener.Stop()
return

case event := <-p.startListener.Events():
p.emit(event, "start")

case event := <-p.stopListener.Events():
p.emit(event, "stop")

case event := <-p.updateListener.Events():
//On updates, first send a stop signal followed by a start signal to simulate a restart
p.emit(event, "stop")
p.emit(event, "start")
}
}
}()
}

func (p *Provider) emit(event bus.Event, flag string) {
pod, ok := event["pod"].(*kubernetes.Pod)
if !ok {
logp.Err("Couldn't get a pod from watcher event")
return
}
// Start for Runner interface.
// Provider was actually started in AutodiscoverBuilder.
func (p *Provider) Start() {}

func (p *Provider) emit(pod *kubernetes.Pod, flag string) {
host := pod.Status.PodIP

// Emit pod container IDs
Expand Down Expand Up @@ -171,7 +151,7 @@ func (p *Provider) publish(event bus.Event) {

// Stop signals the stop channel to force the watch loop routine to stop.
func (p *Provider) Stop() {
close(p.stop)
p.watcher.Stop()
}

// String returns a description of kubernetes autodiscover provider.
Expand Down
92 changes: 92 additions & 0 deletions libbeat/common/kubernetes/eventhandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package kubernetes

// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// * OnAdd is called when an object is added.
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// * OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
type ResourceEventHandler interface {
OnAdd(obj Resource)
OnUpdate(oldObj, newObj Resource)
OnDelete(obj Resource)
}

// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj Resource)
UpdateFunc func(oldObj, newObj Resource)
DeleteFunc func(obj Resource)
}

// OnAdd calls AddFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnAdd(obj Resource) {
if r.AddFunc != nil {
r.AddFunc(obj)
}
}

// OnUpdate calls UpdateFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj Resource) {
if r.UpdateFunc != nil {
r.UpdateFunc(oldObj, newObj)
}
}

// OnDelete calls DeleteFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnDelete(obj Resource) {
if r.DeleteFunc != nil {
r.DeleteFunc(obj)
}
}

// FilteringResourceEventHandler applies the provided filter to all events coming
// in, ensuring the appropriate nested handler method is invoked. An object
// that starts passing the filter after an update is considered an add, and an
// object that stops passing the filter after an update is considered a delete.
type FilteringResourceEventHandler struct {
FilterFunc func(obj Resource) bool
Handler ResourceEventHandler
}

// OnAdd calls the nested handler only if the filter succeeds
func (r FilteringResourceEventHandler) OnAdd(obj Resource) {
if !r.FilterFunc(obj) {
return
}
r.Handler.OnAdd(obj)
}

// OnUpdate ensures the proper handler is called depending on whether the filter matches
func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj Resource) {
newer := r.FilterFunc(newObj)
older := r.FilterFunc(oldObj)
switch {
case newer && older:
r.Handler.OnUpdate(oldObj, newObj)
case newer && !older:
r.Handler.OnAdd(newObj)
case !newer && older:
r.Handler.OnDelete(oldObj)
default:
// do nothing
}
}

// OnDelete calls the nested handler only if the filter succeeds
func (r FilteringResourceEventHandler) OnDelete(obj Resource) {
if !r.FilterFunc(obj) {
return
}
r.Handler.OnDelete(obj)
}
53 changes: 33 additions & 20 deletions libbeat/common/kubernetes/types.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package kubernetes

import (
"encoding/json"
"strings"

"github.com/elastic/beats/libbeat/logp"

corev1 "github.com/ericchiang/k8s/api/v1"
"time"
)

type Resource interface {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Resource should have comment or be unexported

GetMetadata() *ObjectMeta
}

type ObjectMeta struct {
Annotations map[string]string `json:"annotations"`
CreationTimestamp string `json:"creationTimestamp"`
Expand Down Expand Up @@ -108,6 +108,10 @@ type Pod struct {
Status PodStatus `json:"status"`
}

func (p *Pod) GetMetadata() *ObjectMeta {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Pod.GetMetadata should have comment or be unexported

return &p.Metadata
}

// GetContainerID parses the container ID to get the actual ID string
func (s *PodContainerStatus) GetContainerID() string {
cID := s.ContainerID
Expand All @@ -120,20 +124,29 @@ func (s *PodContainerStatus) GetContainerID() string {
return ""
}

// GetPod converts Pod to our own type
func GetPod(pod *corev1.Pod) *Pod {
bytes, err := json.Marshal(pod)
if err != nil {
logp.Warn("Unable to marshal %v", pod.String())
return nil
}

po := &Pod{}
err = json.Unmarshal(bytes, po)
if err != nil {
logp.Warn("Unable to marshal %v", pod.String())
return nil
}
type Event struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Event should have comment or be unexported

APIVersion string `json:"apiVersion"`
Count int64 `json:"count"`
FirstTimestamp *time.Time `json:"firstTimestamp"`
InvolvedObject struct {
APIVersion string `json:"apiVersion"`
Kind string `json:"kind"`
Name string `json:"name"`
ResourceVersion string `json:"resourceVersion"`
UID string `json:"uid"`
} `json:"involvedObject"`
Kind string `json:"kind"`
LastTimestamp *time.Time `json:"lastTimestamp"`
Message string `json:"message"`
Metadata ObjectMeta `json:"metadata"`
Reason string `json:"reason"`
Source struct {
Component string `json:"component"`
Host string `json:"host"`
} `json:"source"`
Type string `json:"type"`
}

return po
func (e *Event) GetMetadata() *ObjectMeta {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Event.GetMetadata should have comment or be unexported

return &e.Metadata
}
Loading