From 86108ce6c83d58ecb1f20afb0c616ff4316a1489 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 16 Apr 2024 09:06:18 -0500 Subject: [PATCH 1/3] using cache to retrieve object meta lists Signed-off-by: Daniel Rammer --- .../nodes/task/k8s/plugin_collector.go | 67 ++++++++++++++++--- .../nodes/task/k8s/plugin_collector_test.go | 1 + .../nodes/task/k8s/plugin_manager.go | 13 ++-- 3 files changed, 66 insertions(+), 15 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go index e747c65785..32522f45ad 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go @@ -2,14 +2,15 @@ package k8s import ( "context" + "fmt" "runtime/pprof" "strings" "sync" "time" - "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/tools/cache" + cache "sigs.k8s.io/controller-runtime/pkg/cache" "github.com/flyteorg/flyte/flytestdlib/contextutils" "github.com/flyteorg/flyte/flytestdlib/logger" @@ -34,7 +35,9 @@ type ResourceLevelMonitor struct { Levels *labeled.Gauge // This informer will be used to get a list of the underlying objects that we want a tally of - sharedInformer cache.SharedIndexInformer + //sharedInformer cache.SharedIndexInformer + + cache cache.Cache // The kind here will be used to differentiate all the metrics, we'll leave out group and version for now gvk schema.GroupVersionKind @@ -49,7 +52,7 @@ type ResourceLevelMonitor struct { // K8s resource created by a plugin will have (as yet, Flyte doesn't have any plugins that create cluster level resources and // it probably won't for a long time). We can't assume that all the operators and CRDs that Flyte will ever work with will have // the exact same set of labels or annotations or owner references. The only thing we can really count on is namespace. -func (r *ResourceLevelMonitor) countList(ctx context.Context, objects []interface{}) map[string]int { +/*func (r *ResourceLevelMonitor) countList(ctx context.Context, objects []interface{}) map[string]int { // Map of namespace to counts counts := map[string]int{} @@ -63,6 +66,22 @@ func (r *ResourceLevelMonitor) countList(ctx context.Context, objects []interfac counts[metadata.GetNamespace()]++ } + return counts +}*/ +func (r *ResourceLevelMonitor) countList(ctx context.Context, objects []metav1.ObjectMeta) map[string]int { + // Map of namespace to counts + counts := map[string]int{} + + // Collect the object counts by namespace + for _, objectMeta := range objects { + /*metadata, err := meta.Accessor(v) + if err != nil { + logger.Errorf(ctx, "Error converting obj %v to an Accessor %s\n", v, err) + continue + }*/ + counts[objectMeta.GetNamespace()]++ + } + return counts } @@ -70,10 +89,37 @@ func (r *ResourceLevelMonitor) countList(ctx context.Context, objects []interfac func (r *ResourceLevelMonitor) collect(ctx context.Context) { // Emit gauges at the namespace layer - since these are just K8s resources, we cannot be guaranteed to have the necessary // information to derive project/domain - objects := r.sharedInformer.GetStore().List() - counts := r.countList(ctx, objects) + //objects := r.sharedInformer.GetStore().List() + + /*list := v1.List{ + TypeMeta: metav1.TypeMeta{ + Kind: r.gvk.Kind, + APIVersion: r.gvk.GroupVersion().String(), + }, + }*/ + list := metav1.PartialObjectMetadataList{ + TypeMeta: metav1.TypeMeta{ + Kind: r.gvk.Kind, // TODO t.gvk.GroupKind().String()? + APIVersion: r.gvk.Version, // TODO is this right? + }, + } + if err := r.cache.List(ctx, &list); err != nil { + // TODO @hamersaw - handle + } + + //objects := make([]client.Object, 0) + /*for _, item := range list.Items { + objects = append(objects, item) + }*/ + + objectMetas := make([]metav1.ObjectMeta, 0) + for _, item := range list.Items { + objectMetas = append(objectMetas, item.ObjectMeta) + } + counts := r.countList(ctx, objectMetas) for ns, count := range counts { + fmt.Printf("HAMERSAW - %s.%s ns:%s, count:%d\n", r.gvk.Kind, r.gvk.Version, ns, count) withNamespaceCtx := contextutils.WithNamespace(ctx, ns) r.Levels.Set(withNamespaceCtx, float64(count)) } @@ -89,7 +135,8 @@ func (r *ResourceLevelMonitor) RunCollector(ctx context.Context) { go func() { defer ticker.Stop() pprof.SetGoroutineLabels(collectorCtx) - r.sharedInformer.HasSynced() + //r.sharedInformer.HasSynced() + r.cache.WaitForCacheSync(collectorCtx) logger.Infof(ctx, "K8s resource collector %s has synced", r.gvk.Kind) for { select { @@ -125,7 +172,8 @@ type ResourceMonitorIndex struct { stopwatches map[promutils.Scope]*labeled.StopWatch } -func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, si cache.SharedIndexInformer, +//func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, si cache.SharedIndexInformer, +func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, cache cache.Cache, gvk schema.GroupVersionKind) *ResourceLevelMonitor { logger.Infof(ctx, "Attempting to create K8s gauge emitter for kind %s/%s", gvk.Version, gvk.Kind) @@ -157,7 +205,8 @@ func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Conte Scope: scope, CollectorTimer: r.stopwatches[scope], Levels: r.gauges[scope], - sharedInformer: si, + //sharedInformer: si, + cache: cache, gvk: gvk, } r.monitors[gvk] = rm diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector_test.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector_test.go index 2dcec03e16..43700fdeaa 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector_test.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector_test.go @@ -12,6 +12,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/flyteorg/flyte/flytestdlib/promutils" ) diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go index f32651caa4..3b09257679 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -3,7 +3,7 @@ package k8s import ( "context" "fmt" - "reflect" + //"reflect" "time" "golang.org/x/time/rate" @@ -16,7 +16,7 @@ import ( "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/cache" + //"k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" @@ -641,11 +641,12 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry } // Construct the collector that will emit a gauge indicating current levels of the resource that this K8s plugin operates on - pluginInformer, err := getPluginSharedInformer(ctx, kubeClient, entry.ResourceToWatch) + /*pluginInformer, err := getPluginSharedInformer(ctx, kubeClient, entry.ResourceToWatch) if err != nil { return nil, err } - rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, pluginInformer, gvk) + rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, pluginInformer, gvk)*/ + rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, kubeClient.GetCache(), gvk) // Start the poller and gauge emitter rm.RunCollectorOnce(ctx) @@ -668,7 +669,7 @@ func getPluginGvk(resourceToWatch runtime.Object) (schema.GroupVersionKind, erro return kinds[0], nil } -func getPluginSharedInformer(ctx context.Context, kubeClient pluginsCore.KubeClient, resourceToWatch client.Object) (cache.SharedIndexInformer, error) { +/*func getPluginSharedInformer(ctx context.Context, kubeClient pluginsCore.KubeClient, resourceToWatch client.Object) (cache.SharedIndexInformer, error) { i, err := kubeClient.GetCache().GetInformer(ctx, resourceToWatch) if err != nil { return nil, errors.Wrapf(errors.PluginInitializationFailed, err, "Error getting informer for %s", reflect.TypeOf(i)) @@ -680,4 +681,4 @@ func getPluginSharedInformer(ctx context.Context, kubeClient pluginsCore.KubeCli } return si, nil -} +}*/ From bab6249d37f88b8e26920bf0ed336295afce2e7c Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 18 Apr 2024 09:13:01 -0500 Subject: [PATCH 2/3] cleaned up PR Signed-off-by: Daniel Rammer --- .../nodes/task/k8s/plugin_collector.go | 77 +++---------------- .../nodes/task/k8s/plugin_manager.go | 22 +----- 2 files changed, 12 insertions(+), 87 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go index 32522f45ad..fd2bb49401 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go @@ -2,7 +2,6 @@ package k8s import ( "context" - "fmt" "runtime/pprof" "strings" "sync" @@ -34,9 +33,7 @@ type ResourceLevelMonitor struct { // to monitor current levels. Levels *labeled.Gauge - // This informer will be used to get a list of the underlying objects that we want a tally of - //sharedInformer cache.SharedIndexInformer - + // cache is used to retrieve the object lists cache cache.Cache // The kind here will be used to differentiate all the metrics, we'll leave out group and version for now @@ -48,79 +45,30 @@ type ResourceLevelMonitor struct { once sync.Once } -// The reason that we use namespace as the one and only thing to cut by is because it's the feature that we are sure that any -// K8s resource created by a plugin will have (as yet, Flyte doesn't have any plugins that create cluster level resources and -// it probably won't for a long time). We can't assume that all the operators and CRDs that Flyte will ever work with will have -// the exact same set of labels or annotations or owner references. The only thing we can really count on is namespace. -/*func (r *ResourceLevelMonitor) countList(ctx context.Context, objects []interface{}) map[string]int { - // Map of namespace to counts - counts := map[string]int{} - - // Collect the object counts by namespace - for _, v := range objects { - metadata, err := meta.Accessor(v) - if err != nil { - logger.Errorf(ctx, "Error converting obj %v to an Accessor %s\n", v, err) - continue - } - counts[metadata.GetNamespace()]++ - } - - return counts -}*/ -func (r *ResourceLevelMonitor) countList(ctx context.Context, objects []metav1.ObjectMeta) map[string]int { - // Map of namespace to counts - counts := map[string]int{} - - // Collect the object counts by namespace - for _, objectMeta := range objects { - /*metadata, err := meta.Accessor(v) - if err != nil { - logger.Errorf(ctx, "Error converting obj %v to an Accessor %s\n", v, err) - continue - }*/ - counts[objectMeta.GetNamespace()]++ - } - - return counts -} - // The context here is expected to already have a value for the KindKey func (r *ResourceLevelMonitor) collect(ctx context.Context) { // Emit gauges at the namespace layer - since these are just K8s resources, we cannot be guaranteed to have the necessary // information to derive project/domain - //objects := r.sharedInformer.GetStore().List() - - /*list := v1.List{ - TypeMeta: metav1.TypeMeta{ - Kind: r.gvk.Kind, - APIVersion: r.gvk.GroupVersion().String(), - }, - }*/ list := metav1.PartialObjectMetadataList{ TypeMeta: metav1.TypeMeta{ - Kind: r.gvk.Kind, // TODO t.gvk.GroupKind().String()? - APIVersion: r.gvk.Version, // TODO is this right? + Kind: r.gvk.Kind, + APIVersion: r.gvk.Version, }, } if err := r.cache.List(ctx, &list); err != nil { - // TODO @hamersaw - handle + logger.Warnf(ctx, "failed to list objects for %s.%s: %v", r.gvk.Kind, r.gvk.Version, err) + return } - //objects := make([]client.Object, 0) - /*for _, item := range list.Items { - objects = append(objects, item) - }*/ - - objectMetas := make([]metav1.ObjectMeta, 0) + // aggregate the object counts by namespace + namespaceCounts := map[string]int{} for _, item := range list.Items { - objectMetas = append(objectMetas, item.ObjectMeta) + namespaceCounts[item.GetNamespace()]++ } - counts := r.countList(ctx, objectMetas) - for ns, count := range counts { - fmt.Printf("HAMERSAW - %s.%s ns:%s, count:%d\n", r.gvk.Kind, r.gvk.Version, ns, count) - withNamespaceCtx := contextutils.WithNamespace(ctx, ns) + // emit namespace object count metrics + for namespace, count := range namespaceCounts { + withNamespaceCtx := contextutils.WithNamespace(ctx, namespace) r.Levels.Set(withNamespaceCtx, float64(count)) } } @@ -135,7 +83,6 @@ func (r *ResourceLevelMonitor) RunCollector(ctx context.Context) { go func() { defer ticker.Stop() pprof.SetGoroutineLabels(collectorCtx) - //r.sharedInformer.HasSynced() r.cache.WaitForCacheSync(collectorCtx) logger.Infof(ctx, "K8s resource collector %s has synced", r.gvk.Kind) for { @@ -172,7 +119,6 @@ type ResourceMonitorIndex struct { stopwatches map[promutils.Scope]*labeled.StopWatch } -//func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, si cache.SharedIndexInformer, func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, cache cache.Cache, gvk schema.GroupVersionKind) *ResourceLevelMonitor { @@ -205,7 +151,6 @@ func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Conte Scope: scope, CollectorTimer: r.stopwatches[scope], Levels: r.gauges[scope], - //sharedInformer: si, cache: cache, gvk: gvk, } diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go index 3b09257679..6d1ac32e21 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -3,7 +3,6 @@ package k8s import ( "context" "fmt" - //"reflect" "time" "golang.org/x/time/rate" @@ -16,7 +15,6 @@ import ( "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - //"k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" @@ -641,12 +639,8 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry } // Construct the collector that will emit a gauge indicating current levels of the resource that this K8s plugin operates on - /*pluginInformer, err := getPluginSharedInformer(ctx, kubeClient, entry.ResourceToWatch) - if err != nil { - return nil, err - } - rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, pluginInformer, gvk)*/ rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, kubeClient.GetCache(), gvk) + // Start the poller and gauge emitter rm.RunCollectorOnce(ctx) @@ -668,17 +662,3 @@ func getPluginGvk(resourceToWatch runtime.Object) (schema.GroupVersionKind, erro } return kinds[0], nil } - -/*func getPluginSharedInformer(ctx context.Context, kubeClient pluginsCore.KubeClient, resourceToWatch client.Object) (cache.SharedIndexInformer, error) { - i, err := kubeClient.GetCache().GetInformer(ctx, resourceToWatch) - if err != nil { - return nil, errors.Wrapf(errors.PluginInitializationFailed, err, "Error getting informer for %s", reflect.TypeOf(i)) - } - - si, casted := i.(cache.SharedIndexInformer) - if !casted { - return nil, errors.Errorf(errors.PluginInitializationFailed, "wrong type. Actual: %v", reflect.TypeOf(i)) - } - - return si, nil -}*/ From a8bbaa9882a3937a933fc516da9c8d0438fcd0bf Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 18 Apr 2024 09:40:40 -0500 Subject: [PATCH 3/3] added unit tests and fixed linter Signed-off-by: Daniel Rammer --- .../nodes/task/k8s/plugin_collector.go | 2 +- .../nodes/task/k8s/plugin_collector_test.go | 58 ++++++++----------- .../nodes/task/k8s/plugin_manager_test.go | 5 +- 3 files changed, 26 insertions(+), 39 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go index fd2bb49401..caf7c81721 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go @@ -9,7 +9,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" - cache "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/cache" "github.com/flyteorg/flyte/flytestdlib/contextutils" "github.com/flyteorg/flyte/flytestdlib/logger" diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector_test.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector_test.go index 43700fdeaa..46a3ad1ddb 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector_test.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector_test.go @@ -11,7 +11,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/flyteorg/flyte/flytestdlib/promutils" @@ -38,34 +38,29 @@ var pods = []interface{}{ }, } -func TestNewResourceLevelMonitor(t *testing.T) { - x := v1.Pod{} - x.GetObjectMeta() - lm := ResourceLevelMonitor{} - res := lm.countList(context.Background(), pods) - assert.Equal(t, 2, res["ns-a"]) - assert.Equal(t, 1, res["ns-b"]) +type MyFakeCache struct { + cache.Cache } -type MyFakeInformer struct { - cache.SharedIndexInformer - store cache.Store -} - -func (m MyFakeInformer) GetStore() cache.Store { - return m.store -} +func (m MyFakeCache) List(_ context.Context, list client.ObjectList, _ ...client.ListOption) error { + objectMetadataList, ok := list.(*metav1.PartialObjectMetadataList) + if !ok { + return fmt.Errorf("unexpected type %T", list) + } -func (m MyFakeInformer) HasSynced() bool { - return true -} + objectMetadataList.Items = make([]metav1.PartialObjectMetadata, 0) + for _, pod := range pods { + objectMetadataList.Items = append(objectMetadataList.Items, metav1.PartialObjectMetadata{ + TypeMeta: objectMetadataList.TypeMeta, + ObjectMeta: pod.(*v1.Pod).ObjectMeta, + }) + } -type MyFakeStore struct { - cache.Store + return nil } -func (m MyFakeStore) List() []interface{} { - return pods +func (m MyFakeCache) WaitForCacheSync(_ context.Context) bool { + return true } func TestResourceLevelMonitor_collect(t *testing.T) { @@ -74,12 +69,10 @@ func TestResourceLevelMonitor_collect(t *testing.T) { kinds, _, err := scheme.Scheme.ObjectKinds(&v1.Pod{}) assert.NoError(t, err) - myInformer := MyFakeInformer{ - store: MyFakeStore{}, - } + myCache := MyFakeCache{} index := NewResourceMonitorIndex() - rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0]) + rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myCache, kinds[0]) rm.collect(ctx) var expected = ` @@ -99,14 +92,11 @@ func TestResourceLevelMonitorSingletonness(t *testing.T) { kinds, _, err := scheme.Scheme.ObjectKinds(&v1.Pod{}) assert.NoError(t, err) - myInformer := MyFakeInformer{ - store: MyFakeStore{}, - } + myCache := MyFakeCache{} index := NewResourceMonitorIndex() - rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0]) - fmt.Println(rm) - //rm2 := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0]) + rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myCache, kinds[0]) + rm2 := index.GetOrCreateResourceLevelMonitor(ctx, scope, myCache, kinds[0]) - //assert.Equal(t, rm, rm2) + assert.Equal(t, rm, rm2) } diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 2c7fda0f6e..3541ed3037 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -991,10 +991,7 @@ func TestResourceManagerConstruction(t *testing.T) { gvk, err := getPluginGvk(&v1.Pod{}) assert.NoError(t, err) assert.Equal(t, gvk.Kind, "Pod") - si, err := getPluginSharedInformer(ctx, fakeKubeClient, &v1.Pod{}) - assert.NotNil(t, si) - assert.NoError(t, err) - rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, si, gvk) + rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, fakeKubeClient.GetCache(), gvk) assert.NotNil(t, rm) }