From 5799039972a5adf2ed66b07b72fb6643967c7f56 Mon Sep 17 00:00:00 2001 From: Roberto Bruggemann Date: Fri, 19 Jan 2018 16:44:19 +0000 Subject: [PATCH] Check if k8s resources are supported in `runReflectorUntil` `isResourceSupported` checks whether a kubernetes resource is supported by the api server. This ensures that, if the probe is unable to communicate with the api server, the call is retried until a true/false response. If `isResourceSupported` returns false, `ListAndWatch` is not called and `runReflectorUntil` just exits. --- probe/kubernetes/client.go | 103 +++++++++++++++++++++++-------------- 1 file changed, 64 insertions(+), 39 deletions(-) diff --git a/probe/kubernetes/client.go b/probe/kubernetes/client.go index 306e5f537b..f9ffc7cd81 100644 --- a/probe/kubernetes/client.go +++ b/probe/kubernetes/client.go @@ -24,6 +24,8 @@ import ( clientcmdapi "k8s.io/client-go/tools/clientcmd/api" ) +const errResourceNotFound = "the server could not find the requested resource" + // Client keeps track of running kubernetes pods and services type Client interface { Stop() @@ -61,23 +63,6 @@ type client struct { podWatches []func(Event, Pod) } -// runReflectorUntil runs cache.Reflector#ListAndWatch in an endless loop. -// Errors are logged and retried with exponential backoff. -func runReflectorUntil(r *cache.Reflector, stopCh <-chan struct{}, msg string) { - listAndWatch := func() (bool, error) { - select { - case <-stopCh: - return true, nil - default: - err := r.ListAndWatch(stopCh) - return false, err - } - } - bo := backoff.New(listAndWatch, fmt.Sprintf("Kubernetes reflector (%s)", msg)) - bo.SetMaxBackoff(5 * time.Minute) - go bo.Start() -} - // ClientConfig establishes the configuration for the kubernetes client type ClientConfig struct { Interval time.Duration @@ -134,7 +119,6 @@ func NewClient(config ClientConfig) (Client, error) { if err != nil { return nil, err } - } log.Infof("kubernetes: targeting api server %s", restConfig.Host) @@ -151,34 +135,50 @@ func NewClient(config ClientConfig) (Client, error) { podStore := NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc) result.podStore = result.setupStore(c.CoreV1Client.RESTClient(), "pods", &apiv1.Pod{}, podStore) - result.serviceStore = result.setupStore(c.CoreV1Client.RESTClient(), "services", &apiv1.Service{}, nil) result.nodeStore = result.setupStore(c.CoreV1Client.RESTClient(), "nodes", &apiv1.Node{}, nil) result.namespaceStore = result.setupStore(c.CoreV1Client.RESTClient(), "namespaces", &apiv1.Namespace{}, nil) + result.deploymentStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil) + result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil) + result.jobStore = result.setupStore(c.BatchV1Client.RESTClient(), "jobs", &apibatchv1.Job{}, nil) + result.cronJobStore = result.setupStore(c.BatchV2alpha1Client.RESTClient(), "cronjobs", &apibatchv2alpha1.CronJob{}, nil) + result.statefulSetStore = result.setupStore(c.AppsV1beta1Client.RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil) - // We list deployments here to check if this version of kubernetes is >= 1.2. - // We would use NegotiateVersion, but Kubernetes 1.1 "supports" - // extensions/v1beta1, but not deployments or daemonsets. - if _, err := c.Extensions().Deployments(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil { - log.Infof("Deployments and DaemonSets are not supported by this Kubernetes version: %v", err) - } else { - result.deploymentStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil) - result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil) + return result, nil +} + +func (c *client) isResourceSupported(resource string) (bool, error) { + var group string + switch resource { + case "deployments", "daemonsets": + group = "extensions/v1beta1" + case "cronjobs": + group = "batch/v2alpha1" + case "statefulsets": + group = "apps/v1beta1" + case "pods", "services", "nodes", "namespaces": + group = "v1" + case "jobs": + group = "batch/v1" + default: + return false, fmt.Errorf("Support check not available for resource %v", resource) } - // CronJobs and StatefulSets were introduced later. Easiest to use the same technique. - if _, err := c.BatchV2alpha1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil { - log.Infof("CronJobs are not supported by this Kubernetes version: %v", err) - } else { - result.jobStore = result.setupStore(c.BatchV1Client.RESTClient(), "jobs", &apibatchv1.Job{}, nil) - result.cronJobStore = result.setupStore(c.BatchV2alpha1Client.RESTClient(), "cronjobs", &apibatchv2alpha1.CronJob{}, nil) + + resourceList, err := c.client.Discovery().ServerResourcesForGroupVersion(group) + if err != nil { + if err.Error() == errResourceNotFound { + return false, nil + } + return false, err } - if _, err := c.Apps().StatefulSets(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil { - log.Infof("StatefulSets are not supported by this Kubernetes version: %v", err) - } else { - result.statefulSetStore = result.setupStore(c.AppsV1beta1Client.RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil) + + for _, v := range resourceList.APIResources { + if v.Name == resource { + return true, nil + } } - return result, nil + return false, nil } func (c *client) setupStore(kclient cache.Getter, resource string, itemType interface{}, nonDefaultStore cache.Store) cache.Store { @@ -187,10 +187,35 @@ func (c *client) setupStore(kclient cache.Getter, resource string, itemType inte if store == nil { store = cache.NewStore(cache.MetaNamespaceKeyFunc) } - runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), c.quit, resource) + c.runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), resource) return store } +// runReflectorUntil runs cache.Reflector#ListAndWatch in an endless loop. +// Errors are logged and retried with exponential backoff. +func (c *client) runReflectorUntil(r *cache.Reflector, resource string) { + listAndWatch := func() (bool, error) { + select { + case <-c.quit: + return true, nil + default: + ok, err := c.isResourceSupported(resource) + if err != nil { + return false, err + } + if !ok { + log.Infof("%v are not supported by this Kubernetes version", resource) + return true, nil + } + err = r.ListAndWatch(c.quit) + return false, err + } + } + bo := backoff.New(listAndWatch, fmt.Sprintf("Kubernetes reflector (%s)", resource)) + bo.SetMaxBackoff(5 * time.Minute) + go bo.Start() +} + func (c *client) WatchPods(f func(Event, Pod)) { c.podWatchesMutex.Lock() defer c.podWatchesMutex.Unlock()