Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(propeller): Replace SharedIndexInformer with Informer #5129

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/api/meta"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
Expand All @@ -34,7 +35,10 @@ type ResourceLevelMonitor struct {
Levels *labeled.Gauge

// This informer will be used to get a list of the underlying objects that we want a tally of
sharedInformer cache.SharedIndexInformer
informer cache.Informer

// This kubeClient will be used to get a list of the underlying objects that we want a tally of
kubeClient core.KubeClient

// The kind here will be used to differentiate all the metrics, we'll leave out group and version for now
gvk schema.GroupVersionKind
Expand All @@ -49,34 +53,39 @@ type ResourceLevelMonitor struct {
// K8s resource created by a plugin will have (as yet, Flyte doesn't have any plugins that create cluster level resources and
// it probably won't for a long time). We can't assume that all the operators and CRDs that Flyte will ever work with will have
// the exact same set of labels or annotations or owner references. The only thing we can really count on is namespace.
func (r *ResourceLevelMonitor) countList(ctx context.Context, objects []interface{}) map[string]int {
func (r *ResourceLevelMonitor) countList(pods *corev1.PodList) map[string]int {
// Map of namespace to counts
counts := map[string]int{}

// Collect the object counts by namespace
for _, v := range objects {
metadata, err := meta.Accessor(v)
if err != nil {
logger.Errorf(ctx, "Error converting obj %v to an Accessor %s\n", v, err)
continue
}
counts[metadata.GetNamespace()]++
for _, pod := range pods.Items {
counts[pod.Namespace]++
}

return counts
}

func (r *ResourceLevelMonitor) setLevels(ctx context.Context, counts map[string]int) {
for ns, count := range counts {
withNamespaceCtx := contextutils.WithNamespace(ctx, ns)
r.Levels.Set(withNamespaceCtx, float64(count))
}
}

// The context here is expected to already have a value for the KindKey
func (r *ResourceLevelMonitor) collect(ctx context.Context) {
// Emit gauges at the namespace layer - since these are just K8s resources, we cannot be guaranteed to have the necessary
// information to derive project/domain
objects := r.sharedInformer.GetStore().List()
counts := r.countList(ctx, objects)

for ns, count := range counts {
withNamespaceCtx := contextutils.WithNamespace(ctx, ns)
r.Levels.Set(withNamespaceCtx, float64(count))
pods := &corev1.PodList{}
if err := r.kubeClient.GetClient().List(ctx, pods); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am uncertain about the change here. Originally, sharedInformer fetches the resources it listens to, but now it changes to only fetches pods. Also, the code seems be metrics collecting and is from four years ago. I am not sure if the code is still relevant. cc @wild-endeavor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, it is not safe to update this to a PodList because it will not capture other k8s resource types for other Flyte plugins (ex. Spark, Ray, TF, PyTorch, etc).

logger.Errorf(ctx, "Error listing objects %s\n", err)
return
}

counts := r.countList(pods)

r.setLevels(ctx, counts)
}

func (r *ResourceLevelMonitor) RunCollector(ctx context.Context) {
Expand All @@ -89,7 +98,7 @@ func (r *ResourceLevelMonitor) RunCollector(ctx context.Context) {
go func() {
defer ticker.Stop()
pprof.SetGoroutineLabels(collectorCtx)
r.sharedInformer.HasSynced()
r.informer.HasSynced()
logger.Infof(ctx, "K8s resource collector %s has synced", r.gvk.Kind)
for {
select {
Expand Down Expand Up @@ -125,8 +134,8 @@ type ResourceMonitorIndex struct {
stopwatches map[promutils.Scope]*labeled.StopWatch
}

func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, si cache.SharedIndexInformer,
gvk schema.GroupVersionKind) *ResourceLevelMonitor {
func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, informer cache.Informer,
gvk schema.GroupVersionKind, kubeClient core.KubeClient) *ResourceLevelMonitor {

logger.Infof(ctx, "Attempting to create K8s gauge emitter for kind %s/%s", gvk.Version, gvk.Kind)

Expand Down Expand Up @@ -157,8 +166,9 @@ func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Conte
Scope: scope,
CollectorTimer: r.stopwatches[scope],
Levels: r.gauges[scope],
sharedInformer: si,
informer: informer,
gvk: gvk,
kubeClient: kubeClient,
}
r.monitors[gvk] = rm

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,44 @@ import (

"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"

"github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors/mocks"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)

var pods = []interface{}{
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "a",
Namespace: "ns-a",
// Set variable podList using v1.PodList
var pods = &corev1.PodList{
Items: []corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "a",
Namespace: "ns-a",
},
},
},
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "b",
Namespace: "ns-a",
{
ObjectMeta: metav1.ObjectMeta{
Name: "b",
Namespace: "ns-a",
},
},
},
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "c",
Namespace: "ns-b",
{
ObjectMeta: metav1.ObjectMeta{
Name: "c",
Namespace: "ns-b",
},
},
},
}

func TestNewResourceLevelMonitor(t *testing.T) {
x := v1.Pod{}
x := corev1.Pod{}
x.GetObjectMeta()
lm := ResourceLevelMonitor{}
res := lm.countList(context.Background(), pods)
res := lm.countList(pods)
assert.Equal(t, 2, res["ns-a"])
assert.Equal(t, 1, res["ns-b"])
}
Expand All @@ -63,23 +67,24 @@ type MyFakeStore struct {
cache.Store
}

func (m MyFakeStore) List() []interface{} {
return pods
}

func TestResourceLevelMonitor_collect(t *testing.T) {
func TestResourceLevelMonitor_setLevels(t *testing.T) {
ctx := context.Background()
scope := promutils.NewScope("testscope")
fakeKubeClient := mocks.NewFakeKubeClient()

kinds, _, err := scheme.Scheme.ObjectKinds(&v1.Pod{})
kinds, _, err := scheme.Scheme.ObjectKinds(&corev1.Pod{})
assert.NoError(t, err)
myInformer := MyFakeInformer{
store: MyFakeStore{},
}

index := NewResourceMonitorIndex()
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0])
rm.collect(ctx)
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0], fakeKubeClient)
counts := map[string]int{
"ns-a": 2,
"ns-b": 1,
}
rm.setLevels(ctx, counts)

var expected = `
# HELP testscope:k8s_resources Current levels of K8s objects as seen from their informer caches
Expand All @@ -95,15 +100,16 @@ func TestResourceLevelMonitor_collect(t *testing.T) {
func TestResourceLevelMonitorSingletonness(t *testing.T) {
ctx := context.Background()
scope := promutils.NewScope("testscope")
fakeKubeClient := mocks.NewFakeKubeClient()

kinds, _, err := scheme.Scheme.ObjectKinds(&v1.Pod{})
kinds, _, err := scheme.Scheme.ObjectKinds(&corev1.Pod{})
assert.NoError(t, err)
myInformer := MyFakeInformer{
store: MyFakeStore{},
}

index := NewResourceMonitorIndex()
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0])
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0], fakeKubeClient)
fmt.Println(rm)
//rm2 := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0])

Expand Down
16 changes: 5 additions & 11 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -641,11 +641,11 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
}

// Construct the collector that will emit a gauge indicating current levels of the resource that this K8s plugin operates on
pluginInformer, err := getPluginSharedInformer(ctx, kubeClient, entry.ResourceToWatch)
pluginInformer, err := getPluginInformer(ctx, kubeClient, entry.ResourceToWatch)
if err != nil {
return nil, err
}
rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, pluginInformer, gvk)
rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, pluginInformer, gvk, kubeClient)
// Start the poller and gauge emitter
rm.RunCollectorOnce(ctx)

Expand All @@ -668,16 +668,10 @@ func getPluginGvk(resourceToWatch runtime.Object) (schema.GroupVersionKind, erro
return kinds[0], nil
}

func getPluginSharedInformer(ctx context.Context, kubeClient pluginsCore.KubeClient, resourceToWatch client.Object) (cache.SharedIndexInformer, error) {
func getPluginInformer(ctx context.Context, kubeClient pluginsCore.KubeClient, resourceToWatch client.Object) (cache.Informer, error) {
i, err := kubeClient.GetCache().GetInformer(ctx, resourceToWatch)
if err != nil {
return nil, errors.Wrapf(errors.PluginInitializationFailed, err, "Error getting informer for %s", reflect.TypeOf(i))
}

si, casted := i.(cache.SharedIndexInformer)
if !casted {
return nil, errors.Errorf(errors.PluginInitializationFailed, "wrong type. Actual: %v", reflect.TypeOf(i))
}

return si, nil
return i, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -991,10 +991,10 @@ func TestResourceManagerConstruction(t *testing.T) {
gvk, err := getPluginGvk(&v1.Pod{})
assert.NoError(t, err)
assert.Equal(t, gvk.Kind, "Pod")
si, err := getPluginSharedInformer(ctx, fakeKubeClient, &v1.Pod{})
si, err := getPluginInformer(ctx, fakeKubeClient, &v1.Pod{})
assert.NotNil(t, si)
assert.NoError(t, err)
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, si, gvk)
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, si, gvk, fakeKubeClient)
assert.NotNil(t, rm)
}

Expand Down
Loading