Skip to content

Commit

Permalink
feat: add fake dynamic listers
Browse files Browse the repository at this point in the history
- in drain test for starters
- the test is still failing because the scheme is empty
- and type meta for the objects is empty
- because of which scheme can't be set in the code either
Signed-off-by: vadasambar <[email protected]>
  • Loading branch information
vadasambar committed Jan 19, 2023
1 parent 8f7d8dc commit f8438b1
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 64 deletions.
14 changes: 13 additions & 1 deletion cluster-autoscaler/utils/drain/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
v1appslister "k8s.io/client-go/listers/apps/v1"
Expand Down Expand Up @@ -644,7 +645,18 @@ func TestDrain(t *testing.T) {
ssLister, err := kube_util.NewTestStatefulSetLister([]*appsv1.StatefulSet{&statefulset})
assert.NoError(t, err)

registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, dsLister, rcLister, jobLister, rsLister, ssLister)
stopCh := make(<-chan struct{})
allResources := []runtime.Object{&ds, &job, &statefulset}
for _, rc := range test.rcs {
allResources = append(allResources, rc)
}
for _, replicaset := range test.replicaSets {
allResources = append(allResources, replicaset)
}

genericListerFactory := kube_util.NewTestGenericListerFactory(stopCh, allResources...)

registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, dsLister, rcLister, jobLister, rsLister, ssLister, genericListerFactory)

pods, daemonSetPods, blockingPod, err := GetPodsForDeletionOnNodeDrain(test.pods, test.pdbs, true, true, registry, 0, testTime)

Expand Down
130 changes: 67 additions & 63 deletions cluster-autoscaler/utils/kubernetes/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ type ListerRegistry interface {

// GenericListerFactory is a factory for creating
// listers for a new GVRs identified during runtime

type GenericListerFactory interface {
GetLister(gvr schema.GroupVersionResource, namespace string) dynamiclister.Lister
}
Expand All @@ -89,63 +88,7 @@ type listerRegistryImpl struct {

// GetLister returns the lister for a particular GVR
func (g *genericListerFactoryImpl) GetLister(gvr schema.GroupVersionResource, namespace string) dynamiclister.Lister {
key := fmt.Sprintf("%s_%s_%s_%s", gvr.Group, gvr.Version, gvr.Resource, namespace)
if g.listersMap[key] != nil {
return g.listersMap[key]
}

var lister func(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
var watcher func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)

if namespace == apiv1.NamespaceAll {
lister = g.dynamicClient.Resource(gvr).List
watcher = g.dynamicClient.Resource(gvr).Watch
} else {
// For lister limited to a particular namespace
lister = g.dynamicClient.Resource(gvr).Namespace(namespace).List
watcher = g.dynamicClient.Resource(gvr).Namespace(namespace).Watch
}

// NewNamespaceKeyedIndexerAndReflector can be
// used for both namespace and cluster scoped resources
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
return lister(context.Background(), options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
return watcher(context.Background(), options)
},
}, unstructured.Unstructured{}, time.Hour)
l := dynamiclister.New(store, gvr)

// Run reflector in the background so that we get new updates from the api-server
go reflector.Run(g.stopCh)

// Wait for reflector to sync the cache for the first time
// TODO: check if there's a better way to do this (listing all the nodes seems wasteful)
// Note: Based on the docs WaitForNamedCacheSync seems to be used to check if an informer has synced
// but the function is generic enough so we can use
// it for reflectors as well
synced := cache.WaitForNamedCacheSync(fmt.Sprintf("generic-%s-lister", gvr.Resource), g.stopCh, func() bool {
no, err := l.List(labels.Everything())
if err != nil {
klog.Error("err", err)
}
return len(no) > 0
})
if !synced {
// don't return an error but don't add
// this lister to listers map
// so that another attempt is made
// to create the lister and sync cache
klog.Error("couldn't sync cache")
} else {
// make the lister available in the listers map through GetDynamicListerMap (maps are passed by reference)
// for the next time something requests a lister for the same GVR and namespace
g.listersMap[key] = l
}

return l
return NewGenericLister(g.dynamicClient, g.listersMap, g.stopCh, gvr, namespace)
}

// NewListerRegistry returns a registry providing various listers to list pods or nodes matching conditions
Expand Down Expand Up @@ -464,12 +407,73 @@ func NewConfigMapListerForNamespace(kubeClient client.Interface, stopchannel <-c
return lister
}

// NewGenericListerFactory returns the factory to
// create lister for a particular GVR
func NewGenericListerFactory(dClient *dynamic.DynamicClient, stopchannel <-chan struct{}) GenericListerFactory {
// NewGenericListerFactory initializes a new generic lister factory
func NewGenericListerFactory(dynamicClient *dynamic.DynamicClient, stopCh <-chan struct{}) GenericListerFactory {
return &genericListerFactoryImpl{
stopCh: stopchannel,
dynamicClient: dynamicClient,
stopCh: stopCh,
listersMap: make(map[string]dynamiclister.Lister),
dynamicClient: dClient,
}
}

// NewGenericLister is a helper which returns a generic lister given the right gvr and namespace
// This is meant to be a more generic version of GetLister()
func NewGenericLister(dClient dynamic.Interface, listersMap map[string]dynamiclister.Lister, stopCh <-chan struct{}, gvr schema.GroupVersionResource, namespace string) dynamiclister.Lister {
key := fmt.Sprintf("%s_%s_%s_%s", gvr.Group, gvr.Version, gvr.Resource, namespace)
if listersMap[key] != nil {
return listersMap[key]
}

var lister func(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
var watcher func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)

if namespace == apiv1.NamespaceAll {
lister = dClient.Resource(gvr).List
watcher = dClient.Resource(gvr).Watch
} else {
// For lister limited to a particular namespace
lister = dClient.Resource(gvr).Namespace(namespace).List
watcher = dClient.Resource(gvr).Namespace(namespace).Watch
}

// NewNamespaceKeyedIndexerAndReflector can be
// used for both namespace and cluster scoped resources
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
return lister(context.Background(), options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
return watcher(context.Background(), options)
},
}, unstructured.Unstructured{}, time.Hour)
l := dynamiclister.New(store, gvr)

// Run reflector in the background so that we get new updates from the api-server
go reflector.Run(stopCh)

// Wait for reflector to sync the cache for the first time
// TODO: check if there's a better way to do this (listing all the nodes seems wasteful)
// Note: Based on the docs WaitForNamedCacheSync seems to be used to check if an informer has synced
// but the function is generic enough so we can use
// it for reflectors as well
synced := cache.WaitForNamedCacheSync(fmt.Sprintf("generic-%s-lister", gvr.Resource), stopCh, func() bool {
no, err := l.List(labels.Everything())
if err != nil {
klog.Error("err", err)
}
return len(no) > 0
})
if !synced {
// don't return an error but don't add
// this lister to listers map
// so that another attempt is made
// to create the lister and sync cache
klog.Error("couldn't sync cache")
} else {
// make the lister available in the listers map through GetDynamicListerMap (maps are passed by reference)
// for the next time something requests a lister for the same GVR and namespace
listersMap[key] = l
}

return l
}
34 changes: 34 additions & 0 deletions cluster-autoscaler/utils/kubernetes/testlisters.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
dynamiclister "k8s.io/client-go/dynamic/dynamiclister"
fakedynamic "k8s.io/client-go/dynamic/fake"
v1appslister "k8s.io/client-go/listers/apps/v1"
v1batchlister "k8s.io/client-go/listers/batch/v1"
v1lister "k8s.io/client-go/listers/core/v1"
Expand All @@ -34,6 +38,13 @@ type TestPodLister struct {
pods []*apiv1.Pod
}

// TestGenericListerFactory is used in tests involving generic listers
type TestGenericListerFactory struct {
dynamicClient *fakedynamic.FakeDynamicClient
stopCh <-chan struct{}
listersMap map[string]dynamiclister.Lister
}

// List returns all pods in test lister.
func (lister TestPodLister) List() ([]*apiv1.Pod, error) {
return lister.pods, nil
Expand Down Expand Up @@ -160,3 +171,26 @@ func NewTestConfigMapLister(cms []*apiv1.ConfigMap) (v1lister.ConfigMapLister, e
}
return v1lister.NewConfigMapLister(store), nil
}

// NewTestGenericListerFactory returns a fake generic lister factory for generic listers
// for use in tests
func NewTestGenericListerFactory(stopCh <-chan struct{}, objects ...runtime.Object) GenericListerFactory {
scheme := runtime.NewScheme()
for _, obj := range objects {

scheme.AddKnownTypeWithName(obj.GetObjectKind().GroupVersionKind(), obj)
}

fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme, objects...)

return &TestGenericListerFactory{
dynamicClient: fakeDynamicClient,
stopCh: stopCh,
listersMap: map[string]dynamiclister.Lister{},
}
}

// GetLister returns the lister for a particular GVR
func (g *TestGenericListerFactory) GetLister(gvr schema.GroupVersionResource, namespace string) dynamiclister.Lister {
return NewGenericLister(g.dynamicClient, g.listersMap, g.stopCh, gvr, namespace)
}

0 comments on commit f8438b1

Please sign in to comment.