Skip to content

Commit

Permalink
Check if k8s resources are supported in runReflectorUntil
Browse files Browse the repository at this point in the history
`isResourceSupported` checks whether a kubernetes resource is supported by the api server.
This ensures that, if the probe is unable to communicate with the api server, the call is retried until a true/false response.

If `isResourceSupported` returns false, `ListAndWatch` is not called and `runReflectorUntil` just exits.
  • Loading branch information
Roberto Bruggemann committed Jan 19, 2018
1 parent ea0b549 commit 5799039
Showing 1 changed file with 64 additions and 39 deletions.
103 changes: 64 additions & 39 deletions probe/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)

const errResourceNotFound = "the server could not find the requested resource"

// Client keeps track of running kubernetes pods and services
type Client interface {
Stop()
Expand Down Expand Up @@ -61,23 +63,6 @@ type client struct {
podWatches []func(Event, Pod)
}

// runReflectorUntil runs cache.Reflector#ListAndWatch in an endless loop.
// Errors are logged and retried with exponential backoff.
func runReflectorUntil(r *cache.Reflector, stopCh <-chan struct{}, msg string) {
listAndWatch := func() (bool, error) {
select {
case <-stopCh:
return true, nil
default:
err := r.ListAndWatch(stopCh)
return false, err
}
}
bo := backoff.New(listAndWatch, fmt.Sprintf("Kubernetes reflector (%s)", msg))
bo.SetMaxBackoff(5 * time.Minute)
go bo.Start()
}

// ClientConfig establishes the configuration for the kubernetes client
type ClientConfig struct {
Interval time.Duration
Expand Down Expand Up @@ -134,7 +119,6 @@ func NewClient(config ClientConfig) (Client, error) {
if err != nil {
return nil, err
}

}
log.Infof("kubernetes: targeting api server %s", restConfig.Host)

Expand All @@ -151,34 +135,50 @@ func NewClient(config ClientConfig) (Client, error) {

podStore := NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc)
result.podStore = result.setupStore(c.CoreV1Client.RESTClient(), "pods", &apiv1.Pod{}, podStore)

result.serviceStore = result.setupStore(c.CoreV1Client.RESTClient(), "services", &apiv1.Service{}, nil)
result.nodeStore = result.setupStore(c.CoreV1Client.RESTClient(), "nodes", &apiv1.Node{}, nil)
result.namespaceStore = result.setupStore(c.CoreV1Client.RESTClient(), "namespaces", &apiv1.Namespace{}, nil)
result.deploymentStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil)
result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil)
result.jobStore = result.setupStore(c.BatchV1Client.RESTClient(), "jobs", &apibatchv1.Job{}, nil)
result.cronJobStore = result.setupStore(c.BatchV2alpha1Client.RESTClient(), "cronjobs", &apibatchv2alpha1.CronJob{}, nil)
result.statefulSetStore = result.setupStore(c.AppsV1beta1Client.RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil)

// We list deployments here to check if this version of kubernetes is >= 1.2.
// We would use NegotiateVersion, but Kubernetes 1.1 "supports"
// extensions/v1beta1, but not deployments or daemonsets.
if _, err := c.Extensions().Deployments(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil {
log.Infof("Deployments and DaemonSets are not supported by this Kubernetes version: %v", err)
} else {
result.deploymentStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil)
result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil)
return result, nil
}

func (c *client) isResourceSupported(resource string) (bool, error) {
var group string
switch resource {
case "deployments", "daemonsets":
group = "extensions/v1beta1"
case "cronjobs":
group = "batch/v2alpha1"
case "statefulsets":
group = "apps/v1beta1"
case "pods", "services", "nodes", "namespaces":
group = "v1"
case "jobs":
group = "batch/v1"
default:
return false, fmt.Errorf("Support check not available for resource %v", resource)
}
// CronJobs and StatefulSets were introduced later. Easiest to use the same technique.
if _, err := c.BatchV2alpha1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil {
log.Infof("CronJobs are not supported by this Kubernetes version: %v", err)
} else {
result.jobStore = result.setupStore(c.BatchV1Client.RESTClient(), "jobs", &apibatchv1.Job{}, nil)
result.cronJobStore = result.setupStore(c.BatchV2alpha1Client.RESTClient(), "cronjobs", &apibatchv2alpha1.CronJob{}, nil)

resourceList, err := c.client.Discovery().ServerResourcesForGroupVersion(group)
if err != nil {
if err.Error() == errResourceNotFound {
return false, nil
}
return false, err
}
if _, err := c.Apps().StatefulSets(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil {
log.Infof("StatefulSets are not supported by this Kubernetes version: %v", err)
} else {
result.statefulSetStore = result.setupStore(c.AppsV1beta1Client.RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil)

for _, v := range resourceList.APIResources {
if v.Name == resource {
return true, nil
}
}

return result, nil
return false, nil
}

func (c *client) setupStore(kclient cache.Getter, resource string, itemType interface{}, nonDefaultStore cache.Store) cache.Store {
Expand All @@ -187,10 +187,35 @@ func (c *client) setupStore(kclient cache.Getter, resource string, itemType inte
if store == nil {
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
}
runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), c.quit, resource)
c.runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), resource)
return store
}

// runReflectorUntil runs cache.Reflector#ListAndWatch in an endless loop.
// Errors are logged and retried with exponential backoff.
func (c *client) runReflectorUntil(r *cache.Reflector, resource string) {
listAndWatch := func() (bool, error) {
select {
case <-c.quit:
return true, nil
default:
ok, err := c.isResourceSupported(resource)
if err != nil {
return false, err
}
if !ok {
log.Infof("%v are not supported by this Kubernetes version", resource)
return true, nil
}
err = r.ListAndWatch(c.quit)
return false, err
}
}
bo := backoff.New(listAndWatch, fmt.Sprintf("Kubernetes reflector (%s)", resource))
bo.SetMaxBackoff(5 * time.Minute)
go bo.Start()
}

func (c *client) WatchPods(f func(Event, Pod)) {
c.podWatchesMutex.Lock()
defer c.podWatchesMutex.Unlock()
Expand Down

0 comments on commit 5799039

Please sign in to comment.