diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go index e747c65785..caf7c81721 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go @@ -7,9 +7,9 @@ import ( "sync" "time" - "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/cache" "github.com/flyteorg/flyte/flytestdlib/contextutils" "github.com/flyteorg/flyte/flytestdlib/logger" @@ -33,8 +33,8 @@ type ResourceLevelMonitor struct { // to monitor current levels. Levels *labeled.Gauge - // This informer will be used to get a list of the underlying objects that we want a tally of - sharedInformer cache.SharedIndexInformer + // cache is used to retrieve the object lists + cache cache.Cache // The kind here will be used to differentiate all the metrics, we'll leave out group and version for now gvk schema.GroupVersionKind @@ -45,36 +45,30 @@ type ResourceLevelMonitor struct { once sync.Once } -// The reason that we use namespace as the one and only thing to cut by is because it's the feature that we are sure that any -// K8s resource created by a plugin will have (as yet, Flyte doesn't have any plugins that create cluster level resources and -// it probably won't for a long time). We can't assume that all the operators and CRDs that Flyte will ever work with will have -// the exact same set of labels or annotations or owner references. The only thing we can really count on is namespace. -func (r *ResourceLevelMonitor) countList(ctx context.Context, objects []interface{}) map[string]int { - // Map of namespace to counts - counts := map[string]int{} - - // Collect the object counts by namespace - for _, v := range objects { - metadata, err := meta.Accessor(v) - if err != nil { - logger.Errorf(ctx, "Error converting obj %v to an Accessor %s\n", v, err) - continue - } - counts[metadata.GetNamespace()]++ - } - - return counts -} - // The context here is expected to already have a value for the KindKey func (r *ResourceLevelMonitor) collect(ctx context.Context) { // Emit gauges at the namespace layer - since these are just K8s resources, we cannot be guaranteed to have the necessary // information to derive project/domain - objects := r.sharedInformer.GetStore().List() - counts := r.countList(ctx, objects) + list := metav1.PartialObjectMetadataList{ + TypeMeta: metav1.TypeMeta{ + Kind: r.gvk.Kind, + APIVersion: r.gvk.Version, + }, + } + if err := r.cache.List(ctx, &list); err != nil { + logger.Warnf(ctx, "failed to list objects for %s.%s: %v", r.gvk.Kind, r.gvk.Version, err) + return + } + + // aggregate the object counts by namespace + namespaceCounts := map[string]int{} + for _, item := range list.Items { + namespaceCounts[item.GetNamespace()]++ + } - for ns, count := range counts { - withNamespaceCtx := contextutils.WithNamespace(ctx, ns) + // emit namespace object count metrics + for namespace, count := range namespaceCounts { + withNamespaceCtx := contextutils.WithNamespace(ctx, namespace) r.Levels.Set(withNamespaceCtx, float64(count)) } } @@ -89,7 +83,7 @@ func (r *ResourceLevelMonitor) RunCollector(ctx context.Context) { go func() { defer ticker.Stop() pprof.SetGoroutineLabels(collectorCtx) - r.sharedInformer.HasSynced() + r.cache.WaitForCacheSync(collectorCtx) logger.Infof(ctx, "K8s resource collector %s has synced", r.gvk.Kind) for { select { @@ -125,7 +119,7 @@ type ResourceMonitorIndex struct { stopwatches map[promutils.Scope]*labeled.StopWatch } -func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, si cache.SharedIndexInformer, +func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, cache cache.Cache, gvk schema.GroupVersionKind) *ResourceLevelMonitor { logger.Infof(ctx, "Attempting to create K8s gauge emitter for kind %s/%s", gvk.Version, gvk.Kind) @@ -157,7 +151,7 @@ func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Conte Scope: scope, CollectorTimer: r.stopwatches[scope], Levels: r.gauges[scope], - sharedInformer: si, + cache: cache, gvk: gvk, } r.monitors[gvk] = rm diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector_test.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector_test.go index 2dcec03e16..46a3ad1ddb 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector_test.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector_test.go @@ -11,7 +11,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/flyteorg/flyte/flytestdlib/promutils" ) @@ -37,34 +38,29 @@ var pods = []interface{}{ }, } -func TestNewResourceLevelMonitor(t *testing.T) { - x := v1.Pod{} - x.GetObjectMeta() - lm := ResourceLevelMonitor{} - res := lm.countList(context.Background(), pods) - assert.Equal(t, 2, res["ns-a"]) - assert.Equal(t, 1, res["ns-b"]) +type MyFakeCache struct { + cache.Cache } -type MyFakeInformer struct { - cache.SharedIndexInformer - store cache.Store -} - -func (m MyFakeInformer) GetStore() cache.Store { - return m.store -} +func (m MyFakeCache) List(_ context.Context, list client.ObjectList, _ ...client.ListOption) error { + objectMetadataList, ok := list.(*metav1.PartialObjectMetadataList) + if !ok { + return fmt.Errorf("unexpected type %T", list) + } -func (m MyFakeInformer) HasSynced() bool { - return true -} + objectMetadataList.Items = make([]metav1.PartialObjectMetadata, 0) + for _, pod := range pods { + objectMetadataList.Items = append(objectMetadataList.Items, metav1.PartialObjectMetadata{ + TypeMeta: objectMetadataList.TypeMeta, + ObjectMeta: pod.(*v1.Pod).ObjectMeta, + }) + } -type MyFakeStore struct { - cache.Store + return nil } -func (m MyFakeStore) List() []interface{} { - return pods +func (m MyFakeCache) WaitForCacheSync(_ context.Context) bool { + return true } func TestResourceLevelMonitor_collect(t *testing.T) { @@ -73,12 +69,10 @@ func TestResourceLevelMonitor_collect(t *testing.T) { kinds, _, err := scheme.Scheme.ObjectKinds(&v1.Pod{}) assert.NoError(t, err) - myInformer := MyFakeInformer{ - store: MyFakeStore{}, - } + myCache := MyFakeCache{} index := NewResourceMonitorIndex() - rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0]) + rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myCache, kinds[0]) rm.collect(ctx) var expected = ` @@ -98,14 +92,11 @@ func TestResourceLevelMonitorSingletonness(t *testing.T) { kinds, _, err := scheme.Scheme.ObjectKinds(&v1.Pod{}) assert.NoError(t, err) - myInformer := MyFakeInformer{ - store: MyFakeStore{}, - } + myCache := MyFakeCache{} index := NewResourceMonitorIndex() - rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0]) - fmt.Println(rm) - //rm2 := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0]) + rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myCache, kinds[0]) + rm2 := index.GetOrCreateResourceLevelMonitor(ctx, scope, myCache, kinds[0]) - //assert.Equal(t, rm, rm2) + assert.Equal(t, rm, rm2) } diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go index f32651caa4..6d1ac32e21 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -3,7 +3,6 @@ package k8s import ( "context" "fmt" - "reflect" "time" "golang.org/x/time/rate" @@ -16,7 +15,6 @@ import ( "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" @@ -641,11 +639,8 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry } // Construct the collector that will emit a gauge indicating current levels of the resource that this K8s plugin operates on - pluginInformer, err := getPluginSharedInformer(ctx, kubeClient, entry.ResourceToWatch) - if err != nil { - return nil, err - } - rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, pluginInformer, gvk) + rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, kubeClient.GetCache(), gvk) + // Start the poller and gauge emitter rm.RunCollectorOnce(ctx) @@ -667,17 +662,3 @@ func getPluginGvk(resourceToWatch runtime.Object) (schema.GroupVersionKind, erro } return kinds[0], nil } - -func getPluginSharedInformer(ctx context.Context, kubeClient pluginsCore.KubeClient, resourceToWatch client.Object) (cache.SharedIndexInformer, error) { - i, err := kubeClient.GetCache().GetInformer(ctx, resourceToWatch) - if err != nil { - return nil, errors.Wrapf(errors.PluginInitializationFailed, err, "Error getting informer for %s", reflect.TypeOf(i)) - } - - si, casted := i.(cache.SharedIndexInformer) - if !casted { - return nil, errors.Errorf(errors.PluginInitializationFailed, "wrong type. Actual: %v", reflect.TypeOf(i)) - } - - return si, nil -} diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 2c7fda0f6e..3541ed3037 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -991,10 +991,7 @@ func TestResourceManagerConstruction(t *testing.T) { gvk, err := getPluginGvk(&v1.Pod{}) assert.NoError(t, err) assert.Equal(t, gvk.Kind, "Pod") - si, err := getPluginSharedInformer(ctx, fakeKubeClient, &v1.Pod{}) - assert.NotNil(t, si) - assert.NoError(t, err) - rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, si, gvk) + rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, fakeKubeClient.GetCache(), gvk) assert.NotNil(t, rm) }