Skip to content

Commit

Permalink
*: filter informers when preconditions are met
Browse files Browse the repository at this point in the history
When we can detect at startup time that all of the objects we're about
to look at have the labels we're expecting, we can filter our informer
factories upfront.

Signed-off-by: Steve Kuznetsov <[email protected]>
  • Loading branch information
stevekuznetsov committed Sep 6, 2023
1 parent e026bf2 commit 795cd4f
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 48 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.3
golang.org/x/net v0.10.0
golang.org/x/sync v0.2.0
golang.org/x/time v0.3.0
google.golang.org/grpc v1.54.0
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -208,7 +209,6 @@ require (
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/term v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
Expand Down
78 changes: 39 additions & 39 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper"
errorwrap "github.com/pkg/errors"
Expand Down Expand Up @@ -187,6 +186,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

canFilter, err := labeller.Validate(ctx, logger, metadataClient)
if err != nil {
return nil, err
}

// Allocate the new instance of an Operator.
op := &Operator{
Operator: queueOperator,
Expand Down Expand Up @@ -363,7 +367,14 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
}

// Wire k8s sharedIndexInformers
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod())
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod(), func() []informers.SharedInformerOption {
if !canFilter {
return nil
}
return []informers.SharedInformerOption{informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labels.SelectorFromSet(labels.Set{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String()
})}
}()...)
sharedIndexInformers := []cache.SharedIndexInformer{}

// Wire Roles
Expand All @@ -372,6 +383,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
sharedIndexInformers = append(sharedIndexInformers, roleInformer.Informer())

labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error {
if canFilter {
return nil
}
op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
Name: gvr.String(),
})
Expand All @@ -392,8 +406,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil
}

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("roles"), roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
rolesgvk := rbacv1.SchemeGroupVersion.WithResource("roles")
if err := labelObjects(rolesgvk, roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
ctx, op.logger, labeller.Filter(rolesgvk),
rbacv1applyconfigurations.Role,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) {
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts)
Expand All @@ -407,8 +422,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.RbacV1().RegisterRoleBindingLister(metav1.NamespaceAll, roleBindingInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, roleBindingInformer.Informer())

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("rolebindings"), roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
rolebindingsgvk := rbacv1.SchemeGroupVersion.WithResource("rolebindings")
if err := labelObjects(rolebindingsgvk, roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
ctx, op.logger, labeller.Filter(rolebindingsgvk),
rbacv1applyconfigurations.RoleBinding,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) {
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts)
Expand All @@ -422,10 +438,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.CoreV1().RegisterServiceAccountLister(metav1.NamespaceAll, serviceAccountInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, serviceAccountInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("serviceaccounts"), serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
return labeller.HasOLMOwnerRef(object) || labeller.HasOLMLabel(object)
},
serviceaccountsgvk := corev1.SchemeGroupVersion.WithResource("serviceaccounts")
if err := labelObjects(serviceaccountsgvk, serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
ctx, op.logger, labeller.Filter(serviceaccountsgvk),
corev1applyconfigurations.ServiceAccount,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (*corev1.ServiceAccount, error) {
return op.opClient.KubernetesInterface().CoreV1().ServiceAccounts(namespace).Apply(ctx, cfg, opts)
Expand All @@ -439,8 +454,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.CoreV1().RegisterServiceLister(metav1.NamespaceAll, serviceInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, serviceInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("services"), serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
servicesgvk := corev1.SchemeGroupVersion.WithResource("services")
if err := labelObjects(servicesgvk, serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
ctx, op.logger, labeller.Filter(servicesgvk),
corev1applyconfigurations.Service,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Service, error) {
return op.opClient.KubernetesInterface().CoreV1().Services(namespace).Apply(ctx, cfg, opts)
Expand All @@ -463,11 +479,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.CoreV1().RegisterPodLister(metav1.NamespaceAll, csPodInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, csPodInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("pods"), csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
_, ok := object.GetLabels()[reconciler.CatalogSourceLabelKey]
return ok
},
podsgvk := corev1.SchemeGroupVersion.WithResource("pods")
if err := labelObjects(podsgvk, csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
ctx, op.logger, labeller.Filter(podsgvk),
corev1applyconfigurations.Pod,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.PodApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Pod, error) {
return op.opClient.KubernetesInterface().CoreV1().Pods(namespace).Apply(ctx, cfg, opts)
Expand Down Expand Up @@ -500,19 +514,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
jobInformer := k8sInformerFactory.Batch().V1().Jobs()
sharedIndexInformers = append(sharedIndexInformers, jobInformer.Informer())

if err := labelObjects(batchv1.SchemeGroupVersion.WithResource("jobs"), jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
for _, ownerRef := range object.GetOwnerReferences() {
if ownerRef.APIVersion == corev1.SchemeGroupVersion.String() && ownerRef.Kind == "ConfigMap" {
cm, err := configMapInformer.Lister().ConfigMaps(object.GetNamespace()).Get(ownerRef.Name)
if err != nil {
return false
}
return labeller.HasOLMOwnerRef(cm)
}
}
return false
},
jobsgvk := batchv1.SchemeGroupVersion.WithResource("jobs")
if err := labelObjects(jobsgvk, jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration](
ctx, op.logger, labeller.JobFilter(func(namespace, name string) (metav1.Object, error) {
return configMapInformer.Lister().ConfigMaps(namespace).Get(name)
}),
batchv1applyconfigurations.Job,
func(namespace string, ctx context.Context, cfg *batchv1applyconfigurations.JobApplyConfiguration, opts metav1.ApplyOptions) (*batchv1.Job, error) {
return op.opClient.KubernetesInterface().BatchV1().Jobs(namespace).Apply(ctx, cfg, opts)
Expand Down Expand Up @@ -585,15 +591,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

if err := labelObjects(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"), crdInformer, labeller.ObjectPatchLabeler(
ctx, op.logger, func(object metav1.Object) bool {
for key := range object.GetAnnotations() {
if strings.HasPrefix(key, alongside.AnnotationPrefix) {
return true
}
}
return false
},
customresourcedefinitionsgvk := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")
if err := labelObjects(customresourcedefinitionsgvk, crdInformer, labeller.ObjectPatchLabeler(
ctx, op.logger, labeller.Filter(customresourcedefinitionsgvk),
op.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Patch,
)); err != nil {
return nil, err
Expand Down
112 changes: 112 additions & 0 deletions pkg/controller/operators/labeller/filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package labeller

import (
"context"
"fmt"
"strings"
"sync"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/metadata"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
)

func Filter(gvr schema.GroupVersionResource) func(metav1.Object) bool {
if f, ok := filters[gvr]; ok {
return f
}
return func(object metav1.Object) bool {
return false
}
}

func JobFilter(getConfigMap func(namespace, name string) (metav1.Object, error)) func(object metav1.Object) bool {
return func(object metav1.Object) bool {
for _, ownerRef := range object.GetOwnerReferences() {
if ownerRef.APIVersion == corev1.SchemeGroupVersion.String() && ownerRef.Kind == "ConfigMap" {
cm, err := getConfigMap(object.GetNamespace(), ownerRef.Name)
if err != nil {
return false
}
return HasOLMOwnerRef(cm)
}
}
return false
}
}

var filters = map[schema.GroupVersionResource]func(metav1.Object) bool{
corev1.SchemeGroupVersion.WithResource("services"): HasOLMOwnerRef,
corev1.SchemeGroupVersion.WithResource("pods"): func(object metav1.Object) bool {
_, ok := object.GetLabels()[reconciler.CatalogSourceLabelKey]
return ok
},
corev1.SchemeGroupVersion.WithResource("serviceaccounts"): func(object metav1.Object) bool {
return HasOLMOwnerRef(object) || HasOLMLabel(object)
},
appsv1.SchemeGroupVersion.WithResource("deployments"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("roles"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("rolebindings"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("clusterroles"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"): HasOLMOwnerRef,
apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"): func(object metav1.Object) bool {
for key := range object.GetAnnotations() {
if strings.HasPrefix(key, alongside.AnnotationPrefix) {
return true
}
}
return false
},
}

func Validate(ctx context.Context, logger *logrus.Logger, metadataClient metadata.Interface) (bool, error) {
okLock := sync.Mutex{}
var ok bool
g, ctx := errgroup.WithContext(ctx)
allFilters := map[schema.GroupVersionResource]func(metav1.Object) bool{}
for gvr, filter := range filters {
allFilters[gvr] = filter
}
allFilters[batchv1.SchemeGroupVersion.WithResource("jobs")] = JobFilter(func(namespace, name string) (metav1.Object, error) {
return metadataClient.Resource(corev1.SchemeGroupVersion.WithResource("configmaps")).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
})
for gvr, filter := range allFilters {
gvr, filter := gvr, filter
g.Go(func() error {
list, err := metadataClient.Resource(gvr).List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list %s: %w", gvr.String(), err)
}
var count int
for _, item := range list.Items {
if filter(&item) && !hasLabel(&item) {
count++
}
}
if count > 0 {
logger.WithFields(logrus.Fields{
"gvr": gvr.String(),
"nonconforming": count,
}).Info("found nonconforming items")
}
okLock.Lock()
ok = ok && count == 0
okLock.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return false, err
}
return ok, nil
}
44 changes: 36 additions & 8 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
return nil, err
}

canFilter, err := labeller.Validate(ctx, config.logger, config.metadataClient)
if err != nil {
return nil, err
}

op := &Operator{
Operator: queueOperator,
clock: config.clock,
Expand Down Expand Up @@ -315,7 +320,17 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
}

// Wire Deployments
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), informers.WithNamespace(namespace))
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), func() []informers.SharedInformerOption {
opts := []informers.SharedInformerOption{
informers.WithNamespace(namespace),
}
if canFilter {
opts = append(opts, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labels.SelectorFromSet(labels.Set{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String()
}))
}
return opts
}()...)
depInformer := k8sInformerFactory.Apps().V1().Deployments()
informersByNamespace[namespace].DeploymentInformer = depInformer
op.lister.AppsV1().RegisterDeploymentLister(namespace, depInformer.Lister())
Expand Down Expand Up @@ -435,6 +450,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
}

labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error {
if canFilter {
return nil
}
op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
Name: gvr.String(),
})
Expand All @@ -455,8 +473,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
return nil
}

if err := labelObjects(appsv1.SchemeGroupVersion.WithResource("deployments"), informersByNamespace[metav1.NamespaceAll].DeploymentInformer.Informer(), labeller.ObjectLabeler[*appsv1.Deployment, *appsv1applyconfigurations.DeploymentApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
deploymentsgvk := appsv1.SchemeGroupVersion.WithResource("deployments")
if err := labelObjects(deploymentsgvk, informersByNamespace[metav1.NamespaceAll].DeploymentInformer.Informer(), labeller.ObjectLabeler[*appsv1.Deployment, *appsv1applyconfigurations.DeploymentApplyConfiguration](
ctx, op.logger, labeller.Filter(deploymentsgvk),
appsv1applyconfigurations.Deployment,
func(namespace string, ctx context.Context, cfg *appsv1applyconfigurations.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (*appsv1.Deployment, error) {
return op.opClient.KubernetesInterface().AppsV1().Deployments(namespace).Apply(ctx, cfg, opts)
Expand Down Expand Up @@ -502,7 +521,14 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
return nil, err
}

k8sInformerFactory := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod())
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), func() []informers.SharedInformerOption {
if !canFilter {
return nil
}
return []informers.SharedInformerOption{informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labels.SelectorFromSet(labels.Set{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String()
})}
}()...)
clusterRoleInformer := k8sInformerFactory.Rbac().V1().ClusterRoles()
informersByNamespace[metav1.NamespaceAll].ClusterRoleInformer = clusterRoleInformer
op.lister.RbacV1().RegisterClusterRoleLister(clusterRoleInformer.Lister())
Expand All @@ -519,8 +545,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
return nil, err
}

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("clusterroles"), clusterRoleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRole, *rbacv1applyconfigurations.ClusterRoleApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
clusterrolesgvk := rbacv1.SchemeGroupVersion.WithResource("clusterroles")
if err := labelObjects(clusterrolesgvk, clusterRoleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRole, *rbacv1applyconfigurations.ClusterRoleApplyConfiguration](
ctx, op.logger, labeller.Filter(clusterrolesgvk),
func(name, _ string) *rbacv1applyconfigurations.ClusterRoleApplyConfiguration {
return rbacv1applyconfigurations.ClusterRole(name)
},
Expand All @@ -547,8 +574,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
return nil, err
}

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"), clusterRoleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRoleBinding, *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
clusterrolebindingssgvk := rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings")
if err := labelObjects(clusterrolebindingssgvk, clusterRoleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRoleBinding, *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration](
ctx, op.logger, labeller.Filter(clusterrolebindingssgvk),
func(name, _ string) *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration {
return rbacv1applyconfigurations.ClusterRoleBinding(name)
},
Expand Down

0 comments on commit 795cd4f

Please sign in to comment.