Skip to content

Commit

Permalink
Check if k8s resources are supported in runReflectorUntil
Browse files Browse the repository at this point in the history
`isResourceSupported` checks whether a kubernetes resource is supported by the api server.
This ensures that, if the probe is unable to communicate with the api server, the call is retried until a true/false response.

If `isResourceSupported` returns false, `ListAndWatch` is not called and `runReflectorUntil` just exits.
  • Loading branch information
Roberto Bruggemann committed Jan 22, 2018
1 parent ea0b549 commit fd31b15
Showing 1 changed file with 50 additions and 41 deletions.
91 changes: 50 additions & 41 deletions probe/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
log "github.com/Sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
apiv1 "k8s.io/client-go/pkg/api/v1"
apiappsv1beta1 "k8s.io/client-go/pkg/apis/apps/v1beta1"
Expand Down Expand Up @@ -61,23 +63,6 @@ type client struct {
podWatches []func(Event, Pod)
}

// runReflectorUntil runs cache.Reflector#ListAndWatch in an endless loop.
// Errors are logged and retried with exponential backoff.
func runReflectorUntil(r *cache.Reflector, stopCh <-chan struct{}, msg string) {
listAndWatch := func() (bool, error) {
select {
case <-stopCh:
return true, nil
default:
err := r.ListAndWatch(stopCh)
return false, err
}
}
bo := backoff.New(listAndWatch, fmt.Sprintf("Kubernetes reflector (%s)", msg))
bo.SetMaxBackoff(5 * time.Minute)
go bo.Start()
}

// ClientConfig establishes the configuration for the kubernetes client
type ClientConfig struct {
Interval time.Duration
Expand Down Expand Up @@ -134,7 +119,6 @@ func NewClient(config ClientConfig) (Client, error) {
if err != nil {
return nil, err
}

}
log.Infof("kubernetes: targeting api server %s", restConfig.Host)

Expand All @@ -151,46 +135,71 @@ func NewClient(config ClientConfig) (Client, error) {

podStore := NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc)
result.podStore = result.setupStore(c.CoreV1Client.RESTClient(), "pods", &apiv1.Pod{}, podStore)

result.serviceStore = result.setupStore(c.CoreV1Client.RESTClient(), "services", &apiv1.Service{}, nil)
result.nodeStore = result.setupStore(c.CoreV1Client.RESTClient(), "nodes", &apiv1.Node{}, nil)
result.namespaceStore = result.setupStore(c.CoreV1Client.RESTClient(), "namespaces", &apiv1.Namespace{}, nil)
result.deploymentStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil)
result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil)
result.jobStore = result.setupStore(c.BatchV1Client.RESTClient(), "jobs", &apibatchv1.Job{}, nil)
result.cronJobStore = result.setupStore(c.BatchV2alpha1Client.RESTClient(), "cronjobs", &apibatchv2alpha1.CronJob{}, nil)
result.statefulSetStore = result.setupStore(c.AppsV1beta1Client.RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil)

// We list deployments here to check if this version of kubernetes is >= 1.2.
// We would use NegotiateVersion, but Kubernetes 1.1 "supports"
// extensions/v1beta1, but not deployments or daemonsets.
if _, err := c.Extensions().Deployments(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil {
log.Infof("Deployments and DaemonSets are not supported by this Kubernetes version: %v", err)
} else {
result.deploymentStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil)
result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil)
}
// CronJobs and StatefulSets were introduced later. Easiest to use the same technique.
if _, err := c.BatchV2alpha1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil {
log.Infof("CronJobs are not supported by this Kubernetes version: %v", err)
} else {
result.jobStore = result.setupStore(c.BatchV1Client.RESTClient(), "jobs", &apibatchv1.Job{}, nil)
result.cronJobStore = result.setupStore(c.BatchV2alpha1Client.RESTClient(), "cronjobs", &apibatchv2alpha1.CronJob{}, nil)
return result, nil
}

func (c *client) isResourceSupported(groupVersion schema.GroupVersion, resource string) (bool, error) {
resourceList, err := c.client.Discovery().ServerResourcesForGroupVersion(groupVersion.String())
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}
if _, err := c.Apps().StatefulSets(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil {
log.Infof("StatefulSets are not supported by this Kubernetes version: %v", err)
} else {
result.statefulSetStore = result.setupStore(c.AppsV1beta1Client.RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil)

for _, v := range resourceList.APIResources {
if v.Name == resource {
return true, nil
}
}

return result, nil
return false, nil
}

func (c *client) setupStore(kclient cache.Getter, resource string, itemType interface{}, nonDefaultStore cache.Store) cache.Store {
func (c *client) setupStore(kclient rest.Interface, resource string, itemType interface{}, nonDefaultStore cache.Store) cache.Store {
lw := cache.NewListWatchFromClient(kclient, resource, metav1.NamespaceAll, fields.Everything())
store := nonDefaultStore
if store == nil {
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
}
runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), c.quit, resource)
c.runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), kclient.APIVersion(), resource)
return store
}

// runReflectorUntil runs cache.Reflector#ListAndWatch in an endless loop, after checking that the resource is supported by kubernetes.
// Errors are logged and retried with exponential backoff.
func (c *client) runReflectorUntil(r *cache.Reflector, groupVersion schema.GroupVersion, resource string) {
listAndWatch := func() (bool, error) {
select {
case <-c.quit:
return true, nil
default:
ok, err := c.isResourceSupported(groupVersion, resource)
if err != nil {
return false, err
}
if !ok {
log.Infof("%v are not supported by this Kubernetes version", resource)
return true, nil
}
err = r.ListAndWatch(c.quit)
return false, err
}
}
bo := backoff.New(listAndWatch, fmt.Sprintf("Kubernetes reflector (%s)", resource))
bo.SetMaxBackoff(5 * time.Minute)
go bo.Start()
}

func (c *client) WatchPods(f func(Event, Pod)) {
c.podWatchesMutex.Lock()
defer c.podWatchesMutex.Unlock()
Expand Down

0 comments on commit fd31b15

Please sign in to comment.