diff --git a/cluster-autoscaler/utils/drain/drain_test.go b/cluster-autoscaler/utils/drain/drain_test.go index f6e5de7a7638..f7edcc83bc39 100644 --- a/cluster-autoscaler/utils/drain/drain_test.go +++ b/cluster-autoscaler/utils/drain/drain_test.go @@ -25,6 +25,7 @@ import ( apiv1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" v1appslister "k8s.io/client-go/listers/apps/v1" @@ -644,7 +645,18 @@ func TestDrain(t *testing.T) { ssLister, err := kube_util.NewTestStatefulSetLister([]*appsv1.StatefulSet{&statefulset}) assert.NoError(t, err) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, dsLister, rcLister, jobLister, rsLister, ssLister) + stopCh := make(<-chan struct{}) + allResources := []runtime.Object{&ds, &job, &statefulset} + for _, rc := range test.rcs { + allResources = append(allResources, rc) + } + for _, replicaset := range test.replicaSets { + allResources = append(allResources, replicaset) + } + + genericListerFactory := kube_util.NewTestGenericListerFactory(stopCh, allResources...) + + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, dsLister, rcLister, jobLister, rsLister, ssLister, genericListerFactory) pods, daemonSetPods, blockingPod, err := GetPodsForDeletionOnNodeDrain(test.pods, test.pdbs, true, true, registry, 0, testTime) diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go index 2bfb3467f129..cce2149c1209 100644 --- a/cluster-autoscaler/utils/kubernetes/listers.go +++ b/cluster-autoscaler/utils/kubernetes/listers.go @@ -62,7 +62,6 @@ type ListerRegistry interface { // GenericListerFactory is a factory for creating // listers for a new GVRs identified during runtime - type GenericListerFactory interface { GetLister(gvr schema.GroupVersionResource, namespace string) dynamiclister.Lister } @@ -89,63 +88,7 @@ type listerRegistryImpl struct { // GetLister returns the lister for a particular GVR func (g *genericListerFactoryImpl) GetLister(gvr schema.GroupVersionResource, namespace string) dynamiclister.Lister { - key := fmt.Sprintf("%s_%s_%s_%s", gvr.Group, gvr.Version, gvr.Resource, namespace) - if g.listersMap[key] != nil { - return g.listersMap[key] - } - - var lister func(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) - var watcher func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) - - if namespace == apiv1.NamespaceAll { - lister = g.dynamicClient.Resource(gvr).List - watcher = g.dynamicClient.Resource(gvr).Watch - } else { - // For lister limited to a particular namespace - lister = g.dynamicClient.Resource(gvr).Namespace(namespace).List - watcher = g.dynamicClient.Resource(gvr).Namespace(namespace).Watch - } - - // NewNamespaceKeyedIndexerAndReflector can be - // used for both namespace and cluster scoped resources - store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(&cache.ListWatch{ - ListFunc: func(options v1.ListOptions) (runtime.Object, error) { - return lister(context.Background(), options) - }, - WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { - return watcher(context.Background(), options) - }, - }, unstructured.Unstructured{}, time.Hour) - l := dynamiclister.New(store, gvr) - - // Run reflector in the background so that we get new updates from the api-server - go reflector.Run(g.stopCh) - - // Wait for reflector to sync the cache for the first time - // TODO: check if there's a better way to do this (listing all the nodes seems wasteful) - // Note: Based on the docs WaitForNamedCacheSync seems to be used to check if an informer has synced - // but the function is generic enough so we can use - // it for reflectors as well - synced := cache.WaitForNamedCacheSync(fmt.Sprintf("generic-%s-lister", gvr.Resource), g.stopCh, func() bool { - no, err := l.List(labels.Everything()) - if err != nil { - klog.Error("err", err) - } - return len(no) > 0 - }) - if !synced { - // don't return an error but don't add - // this lister to listers map - // so that another attempt is made - // to create the lister and sync cache - klog.Error("couldn't sync cache") - } else { - // make the lister available in the listers map through GetDynamicListerMap (maps are passed by reference) - // for the next time something requests a lister for the same GVR and namespace - g.listersMap[key] = l - } - - return l + return NewGenericLister(g.dynamicClient, g.listersMap, g.stopCh, gvr, namespace) } // NewListerRegistry returns a registry providing various listers to list pods or nodes matching conditions @@ -464,12 +407,73 @@ func NewConfigMapListerForNamespace(kubeClient client.Interface, stopchannel <-c return lister } -// NewGenericListerFactory returns the factory to -// create lister for a particular GVR -func NewGenericListerFactory(dClient *dynamic.DynamicClient, stopchannel <-chan struct{}) GenericListerFactory { +// NewGenericListerFactory initializes a new generic lister factory +func NewGenericListerFactory(dynamicClient *dynamic.DynamicClient, stopCh <-chan struct{}) GenericListerFactory { return &genericListerFactoryImpl{ - stopCh: stopchannel, + dynamicClient: dynamicClient, + stopCh: stopCh, listersMap: make(map[string]dynamiclister.Lister), - dynamicClient: dClient, } } + +// NewGenericLister is a helper which returns a generic lister given the right gvr and namespace +// This is meant to be a more generic version of GetLister() +func NewGenericLister(dClient dynamic.Interface, listersMap map[string]dynamiclister.Lister, stopCh <-chan struct{}, gvr schema.GroupVersionResource, namespace string) dynamiclister.Lister { + key := fmt.Sprintf("%s_%s_%s_%s", gvr.Group, gvr.Version, gvr.Resource, namespace) + if listersMap[key] != nil { + return listersMap[key] + } + + var lister func(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) + var watcher func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) + + if namespace == apiv1.NamespaceAll { + lister = dClient.Resource(gvr).List + watcher = dClient.Resource(gvr).Watch + } else { + // For lister limited to a particular namespace + lister = dClient.Resource(gvr).Namespace(namespace).List + watcher = dClient.Resource(gvr).Namespace(namespace).Watch + } + + // NewNamespaceKeyedIndexerAndReflector can be + // used for both namespace and cluster scoped resources + store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(&cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + return lister(context.Background(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + return watcher(context.Background(), options) + }, + }, unstructured.Unstructured{}, time.Hour) + l := dynamiclister.New(store, gvr) + + // Run reflector in the background so that we get new updates from the api-server + go reflector.Run(stopCh) + + // Wait for reflector to sync the cache for the first time + // TODO: check if there's a better way to do this (listing all the nodes seems wasteful) + // Note: Based on the docs WaitForNamedCacheSync seems to be used to check if an informer has synced + // but the function is generic enough so we can use + // it for reflectors as well + synced := cache.WaitForNamedCacheSync(fmt.Sprintf("generic-%s-lister", gvr.Resource), stopCh, func() bool { + no, err := l.List(labels.Everything()) + if err != nil { + klog.Error("err", err) + } + return len(no) > 0 + }) + if !synced { + // don't return an error but don't add + // this lister to listers map + // so that another attempt is made + // to create the lister and sync cache + klog.Error("couldn't sync cache") + } else { + // make the lister available in the listers map through GetDynamicListerMap (maps are passed by reference) + // for the next time something requests a lister for the same GVR and namespace + listersMap[key] = l + } + + return l +} diff --git a/cluster-autoscaler/utils/kubernetes/testlisters.go b/cluster-autoscaler/utils/kubernetes/testlisters.go index 571298484a6c..44c5d878bca2 100644 --- a/cluster-autoscaler/utils/kubernetes/testlisters.go +++ b/cluster-autoscaler/utils/kubernetes/testlisters.go @@ -23,6 +23,10 @@ import ( batchv1 "k8s.io/api/batch/v1" apiv1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + dynamiclister "k8s.io/client-go/dynamic/dynamiclister" + fakedynamic "k8s.io/client-go/dynamic/fake" v1appslister "k8s.io/client-go/listers/apps/v1" v1batchlister "k8s.io/client-go/listers/batch/v1" v1lister "k8s.io/client-go/listers/core/v1" @@ -34,6 +38,13 @@ type TestPodLister struct { pods []*apiv1.Pod } +// TestGenericListerFactory is used in tests involving generic listers +type TestGenericListerFactory struct { + dynamicClient *fakedynamic.FakeDynamicClient + stopCh <-chan struct{} + listersMap map[string]dynamiclister.Lister +} + // List returns all pods in test lister. func (lister TestPodLister) List() ([]*apiv1.Pod, error) { return lister.pods, nil @@ -160,3 +171,26 @@ func NewTestConfigMapLister(cms []*apiv1.ConfigMap) (v1lister.ConfigMapLister, e } return v1lister.NewConfigMapLister(store), nil } + +// NewTestGenericListerFactory returns a fake generic lister factory for generic listers +// for use in tests +func NewTestGenericListerFactory(stopCh <-chan struct{}, objects ...runtime.Object) GenericListerFactory { + scheme := runtime.NewScheme() + for _, obj := range objects { + + scheme.AddKnownTypeWithName(obj.GetObjectKind().GroupVersionKind(), obj) + } + + fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme, objects...) + + return &TestGenericListerFactory{ + dynamicClient: fakeDynamicClient, + stopCh: stopCh, + listersMap: map[string]dynamiclister.Lister{}, + } +} + +// GetLister returns the lister for a particular GVR +func (g *TestGenericListerFactory) GetLister(gvr schema.GroupVersionResource, namespace string) dynamiclister.Lister { + return NewGenericLister(g.dynamicClient, g.listersMap, g.stopCh, gvr, namespace) +}