diff --git a/go.mod b/go.mod index 07552c3ba6f..ced11fb6ec0 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.3 golang.org/x/net v0.10.0 + golang.org/x/sync v0.2.0 golang.org/x/time v0.3.0 google.golang.org/grpc v1.54.0 gopkg.in/yaml.v2 v2.4.0 @@ -208,7 +209,6 @@ require ( golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect golang.org/x/mod v0.10.0 // indirect golang.org/x/oauth2 v0.5.0 // indirect - golang.org/x/sync v0.2.0 // indirect golang.org/x/sys v0.8.0 // indirect golang.org/x/term v0.8.0 // indirect golang.org/x/text v0.9.0 // indirect diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index db46d25ef7c..2904c10c38b 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -11,7 +11,6 @@ import ( "sync" "time" - "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper" errorwrap "github.com/pkg/errors" @@ -187,6 +186,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo return nil, err } + canFilter, err := labeller.Validate(ctx, logger, metadataClient) + if err != nil { + return nil, err + } + // Allocate the new instance of an Operator. op := &Operator{ Operator: queueOperator, @@ -363,7 +367,14 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo } // Wire k8s sharedIndexInformers - k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod()) + k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod(), func() []informers.SharedInformerOption { + if !canFilter { + return nil + } + return []informers.SharedInformerOption{informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.LabelSelector = labels.SelectorFromSet(labels.Set{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String() + })} + }()...) sharedIndexInformers := []cache.SharedIndexInformer{} // Wire Roles @@ -372,6 +383,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo sharedIndexInformers = append(sharedIndexInformers, roleInformer.Informer()) labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error { + if canFilter { + return nil + } op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{ Name: gvr.String(), }) @@ -392,8 +406,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo return nil } - if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("roles"), roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration]( - ctx, op.logger, labeller.HasOLMOwnerRef, + rolesgvk := rbacv1.SchemeGroupVersion.WithResource("roles") + if err := labelObjects(rolesgvk, roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration]( + ctx, op.logger, labeller.Filter(rolesgvk), rbacv1applyconfigurations.Role, func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) { return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts) @@ -407,8 +422,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo op.lister.RbacV1().RegisterRoleBindingLister(metav1.NamespaceAll, roleBindingInformer.Lister()) sharedIndexInformers = append(sharedIndexInformers, roleBindingInformer.Informer()) - if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("rolebindings"), roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration]( - ctx, op.logger, labeller.HasOLMOwnerRef, + rolebindingsgvk := rbacv1.SchemeGroupVersion.WithResource("rolebindings") + if err := labelObjects(rolebindingsgvk, roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration]( + ctx, op.logger, labeller.Filter(rolebindingsgvk), rbacv1applyconfigurations.RoleBinding, func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) { return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts) @@ -422,10 +438,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo op.lister.CoreV1().RegisterServiceAccountLister(metav1.NamespaceAll, serviceAccountInformer.Lister()) sharedIndexInformers = append(sharedIndexInformers, serviceAccountInformer.Informer()) - if err := labelObjects(corev1.SchemeGroupVersion.WithResource("serviceaccounts"), serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration]( - ctx, op.logger, func(object metav1.Object) bool { - return labeller.HasOLMOwnerRef(object) || labeller.HasOLMLabel(object) - }, + serviceaccountsgvk := corev1.SchemeGroupVersion.WithResource("serviceaccounts") + if err := labelObjects(serviceaccountsgvk, serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration]( + ctx, op.logger, labeller.Filter(serviceaccountsgvk), corev1applyconfigurations.ServiceAccount, func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (*corev1.ServiceAccount, error) { return op.opClient.KubernetesInterface().CoreV1().ServiceAccounts(namespace).Apply(ctx, cfg, opts) @@ -439,8 +454,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo op.lister.CoreV1().RegisterServiceLister(metav1.NamespaceAll, serviceInformer.Lister()) sharedIndexInformers = append(sharedIndexInformers, serviceInformer.Informer()) - if err := labelObjects(corev1.SchemeGroupVersion.WithResource("services"), serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration]( - ctx, op.logger, labeller.HasOLMOwnerRef, + servicesgvk := corev1.SchemeGroupVersion.WithResource("services") + if err := labelObjects(servicesgvk, serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration]( + ctx, op.logger, labeller.Filter(servicesgvk), corev1applyconfigurations.Service, func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Service, error) { return op.opClient.KubernetesInterface().CoreV1().Services(namespace).Apply(ctx, cfg, opts) @@ -463,11 +479,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo op.lister.CoreV1().RegisterPodLister(metav1.NamespaceAll, csPodInformer.Lister()) sharedIndexInformers = append(sharedIndexInformers, csPodInformer.Informer()) - if err := labelObjects(corev1.SchemeGroupVersion.WithResource("pods"), csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration]( - ctx, op.logger, func(object metav1.Object) bool { - _, ok := object.GetLabels()[reconciler.CatalogSourceLabelKey] - return ok - }, + podsgvk := corev1.SchemeGroupVersion.WithResource("pods") + if err := labelObjects(podsgvk, csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration]( + ctx, op.logger, labeller.Filter(podsgvk), corev1applyconfigurations.Pod, func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.PodApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Pod, error) { return op.opClient.KubernetesInterface().CoreV1().Pods(namespace).Apply(ctx, cfg, opts) @@ -500,19 +514,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo jobInformer := k8sInformerFactory.Batch().V1().Jobs() sharedIndexInformers = append(sharedIndexInformers, jobInformer.Informer()) - if err := labelObjects(batchv1.SchemeGroupVersion.WithResource("jobs"), jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration]( - ctx, op.logger, func(object metav1.Object) bool { - for _, ownerRef := range object.GetOwnerReferences() { - if ownerRef.APIVersion == corev1.SchemeGroupVersion.String() && ownerRef.Kind == "ConfigMap" { - cm, err := configMapInformer.Lister().ConfigMaps(object.GetNamespace()).Get(ownerRef.Name) - if err != nil { - return false - } - return labeller.HasOLMOwnerRef(cm) - } - } - return false - }, + jobsgvk := batchv1.SchemeGroupVersion.WithResource("jobs") + if err := labelObjects(jobsgvk, jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration]( + ctx, op.logger, labeller.JobFilter(func(namespace, name string) (metav1.Object, error) { + return configMapInformer.Lister().ConfigMaps(namespace).Get(name) + }), batchv1applyconfigurations.Job, func(namespace string, ctx context.Context, cfg *batchv1applyconfigurations.JobApplyConfiguration, opts metav1.ApplyOptions) (*batchv1.Job, error) { return op.opClient.KubernetesInterface().BatchV1().Jobs(namespace).Apply(ctx, cfg, opts) @@ -585,15 +591,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo return nil, err } - if err := labelObjects(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"), crdInformer, labeller.ObjectPatchLabeler( - ctx, op.logger, func(object metav1.Object) bool { - for key := range object.GetAnnotations() { - if strings.HasPrefix(key, alongside.AnnotationPrefix) { - return true - } - } - return false - }, + customresourcedefinitionsgvk := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions") + if err := labelObjects(customresourcedefinitionsgvk, crdInformer, labeller.ObjectPatchLabeler( + ctx, op.logger, labeller.Filter(customresourcedefinitionsgvk), op.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Patch, )); err != nil { return nil, err diff --git a/pkg/controller/operators/labeller/filters.go b/pkg/controller/operators/labeller/filters.go new file mode 100644 index 00000000000..18dc334b011 --- /dev/null +++ b/pkg/controller/operators/labeller/filters.go @@ -0,0 +1,112 @@ +package labeller + +import ( + "context" + "fmt" + "strings" + "sync" + + "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler" + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/metadata" + + "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside" +) + +func Filter(gvr schema.GroupVersionResource) func(metav1.Object) bool { + if f, ok := filters[gvr]; ok { + return f + } + return func(object metav1.Object) bool { + return false + } +} + +func JobFilter(getConfigMap func(namespace, name string) (metav1.Object, error)) func(object metav1.Object) bool { + return func(object metav1.Object) bool { + for _, ownerRef := range object.GetOwnerReferences() { + if ownerRef.APIVersion == corev1.SchemeGroupVersion.String() && ownerRef.Kind == "ConfigMap" { + cm, err := getConfigMap(object.GetNamespace(), ownerRef.Name) + if err != nil { + return false + } + return HasOLMOwnerRef(cm) + } + } + return false + } +} + +var filters = map[schema.GroupVersionResource]func(metav1.Object) bool{ + corev1.SchemeGroupVersion.WithResource("services"): HasOLMOwnerRef, + corev1.SchemeGroupVersion.WithResource("pods"): func(object metav1.Object) bool { + _, ok := object.GetLabels()[reconciler.CatalogSourceLabelKey] + return ok + }, + corev1.SchemeGroupVersion.WithResource("serviceaccounts"): func(object metav1.Object) bool { + return HasOLMOwnerRef(object) || HasOLMLabel(object) + }, + appsv1.SchemeGroupVersion.WithResource("deployments"): HasOLMOwnerRef, + rbacv1.SchemeGroupVersion.WithResource("roles"): HasOLMOwnerRef, + rbacv1.SchemeGroupVersion.WithResource("rolebindings"): HasOLMOwnerRef, + rbacv1.SchemeGroupVersion.WithResource("clusterroles"): HasOLMOwnerRef, + rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"): HasOLMOwnerRef, + apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"): func(object metav1.Object) bool { + for key := range object.GetAnnotations() { + if strings.HasPrefix(key, alongside.AnnotationPrefix) { + return true + } + } + return false + }, +} + +func Validate(ctx context.Context, logger *logrus.Logger, metadataClient metadata.Interface) (bool, error) { + okLock := sync.Mutex{} + var ok bool + g, ctx := errgroup.WithContext(ctx) + allFilters := map[schema.GroupVersionResource]func(metav1.Object) bool{} + for gvr, filter := range filters { + allFilters[gvr] = filter + } + allFilters[batchv1.SchemeGroupVersion.WithResource("jobs")] = JobFilter(func(namespace, name string) (metav1.Object, error) { + return metadataClient.Resource(corev1.SchemeGroupVersion.WithResource("configmaps")).Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) + }) + for gvr, filter := range allFilters { + gvr, filter := gvr, filter + g.Go(func() error { + list, err := metadataClient.Resource(gvr).List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list %s: %w", gvr.String(), err) + } + var count int + for _, item := range list.Items { + if filter(&item) && !hasLabel(&item) { + count++ + } + } + if count > 0 { + logger.WithFields(logrus.Fields{ + "gvr": gvr.String(), + "nonconforming": count, + }).Info("found nonconforming items") + } + okLock.Lock() + ok = ok && count == 0 + okLock.Unlock() + return nil + }) + } + if err := g.Wait(); err != nil { + return false, err + } + return ok, nil +} diff --git a/pkg/controller/operators/olm/operator.go b/pkg/controller/operators/olm/operator.go index d476b52d489..f7afa1ed6df 100644 --- a/pkg/controller/operators/olm/operator.go +++ b/pkg/controller/operators/olm/operator.go @@ -141,6 +141,11 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat return nil, err } + canFilter, err := labeller.Validate(ctx, config.logger, config.metadataClient) + if err != nil { + return nil, err + } + op := &Operator{ Operator: queueOperator, clock: config.clock, @@ -315,7 +320,17 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat } // Wire Deployments - k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), informers.WithNamespace(namespace)) + k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), func() []informers.SharedInformerOption { + opts := []informers.SharedInformerOption{ + informers.WithNamespace(namespace), + } + if canFilter { + opts = append(opts, informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.LabelSelector = labels.SelectorFromSet(labels.Set{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String() + })) + } + return opts + }()...) depInformer := k8sInformerFactory.Apps().V1().Deployments() informersByNamespace[namespace].DeploymentInformer = depInformer op.lister.AppsV1().RegisterDeploymentLister(namespace, depInformer.Lister()) @@ -435,6 +450,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat } labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error { + if canFilter { + return nil + } op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{ Name: gvr.String(), }) @@ -455,8 +473,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat return nil } - if err := labelObjects(appsv1.SchemeGroupVersion.WithResource("deployments"), informersByNamespace[metav1.NamespaceAll].DeploymentInformer.Informer(), labeller.ObjectLabeler[*appsv1.Deployment, *appsv1applyconfigurations.DeploymentApplyConfiguration]( - ctx, op.logger, labeller.HasOLMOwnerRef, + deploymentsgvk := appsv1.SchemeGroupVersion.WithResource("deployments") + if err := labelObjects(deploymentsgvk, informersByNamespace[metav1.NamespaceAll].DeploymentInformer.Informer(), labeller.ObjectLabeler[*appsv1.Deployment, *appsv1applyconfigurations.DeploymentApplyConfiguration]( + ctx, op.logger, labeller.Filter(deploymentsgvk), appsv1applyconfigurations.Deployment, func(namespace string, ctx context.Context, cfg *appsv1applyconfigurations.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (*appsv1.Deployment, error) { return op.opClient.KubernetesInterface().AppsV1().Deployments(namespace).Apply(ctx, cfg, opts) @@ -502,7 +521,14 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat return nil, err } - k8sInformerFactory := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod()) + k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), func() []informers.SharedInformerOption { + if !canFilter { + return nil + } + return []informers.SharedInformerOption{informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.LabelSelector = labels.SelectorFromSet(labels.Set{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String() + })} + }()...) clusterRoleInformer := k8sInformerFactory.Rbac().V1().ClusterRoles() informersByNamespace[metav1.NamespaceAll].ClusterRoleInformer = clusterRoleInformer op.lister.RbacV1().RegisterClusterRoleLister(clusterRoleInformer.Lister()) @@ -519,8 +545,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat return nil, err } - if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("clusterroles"), clusterRoleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRole, *rbacv1applyconfigurations.ClusterRoleApplyConfiguration]( - ctx, op.logger, labeller.HasOLMOwnerRef, + clusterrolesgvk := rbacv1.SchemeGroupVersion.WithResource("clusterroles") + if err := labelObjects(clusterrolesgvk, clusterRoleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRole, *rbacv1applyconfigurations.ClusterRoleApplyConfiguration]( + ctx, op.logger, labeller.Filter(clusterrolesgvk), func(name, _ string) *rbacv1applyconfigurations.ClusterRoleApplyConfiguration { return rbacv1applyconfigurations.ClusterRole(name) }, @@ -547,8 +574,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat return nil, err } - if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"), clusterRoleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRoleBinding, *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration]( - ctx, op.logger, labeller.HasOLMOwnerRef, + clusterrolebindingssgvk := rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings") + if err := labelObjects(clusterrolebindingssgvk, clusterRoleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRoleBinding, *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration]( + ctx, op.logger, labeller.Filter(clusterrolebindingssgvk), func(name, _ string) *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration { return rbacv1applyconfigurations.ClusterRoleBinding(name) },