From 24a8fbb6abe942eaf5cc5aeab1548ec0c274ce1a Mon Sep 17 00:00:00 2001 From: Anik Bhattacharjee Date: Fri, 26 Jul 2024 16:16:06 -0400 Subject: [PATCH] (fix) Resolver: list CatSrc using client, instead of referring to registry-server cache Using "available CatalogSources" information from the registry-client cache was causing cache inconsistency problems. This has showed up multiple times in production environments over the years, manifesting itself in the form of the all subscriptions in a namespace being transitioned into an error state when a Catalogsource that the cache claims to exist, has actually been deleted from the cluster, but the cache was not updated. The Subscriptions are transitioned to an error state because of the deleted catalogsource with the follwing error message: "message": "failed to populate resolver cache from source : failed to list bundles: rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing dial tcp: lookup ..svc on 172.....: no such host\"", "reason": "ErrorPreventedResolution", "status": "True", "type": "ResolutionFailed" This PR switches the information lookup from the cache, to using a client to list the CatalogSources present in the cluster. --- pkg/controller/operators/catalog/operator.go | 2 +- .../catalog/subscription/reconciler.go | 7 +-- .../registry/resolver/cache/cache.go | 2 +- .../registry/resolver/cache/cache_test.go | 2 +- .../registry/resolver/source_registry.go | 61 +++++++++++++++---- .../registry/resolver/step_resolver.go | 22 ++++++- .../registry/resolver/step_resolver_test.go | 5 +- 7 files changed, 80 insertions(+), 21 deletions(-) diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 7f8f2c4809..1f637cbe37 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -214,7 +214,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo clientFactory: clients.NewFactory(validatingConfig), } op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState) - op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger) + op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger) resolverSourceProvider := NewOperatorGroupToggleSourceProvider(op.sourceInvalidator, logger, op.lister.OperatorsV1().OperatorGroupLister()) op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage) res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, resolverSourceProvider, logger) diff --git a/pkg/controller/operators/catalog/subscription/reconciler.go b/pkg/controller/operators/catalog/subscription/reconciler.go index 374e49fb77..2368aa5ca8 100644 --- a/pkg/controller/operators/catalog/subscription/reconciler.go +++ b/pkg/controller/operators/catalog/subscription/reconciler.go @@ -92,11 +92,10 @@ func (c *catalogHealthReconciler) Reconcile(ctx context.Context, in kubestate.St var healthUpdated, deprecationUpdated bool next, healthUpdated = s.UpdateHealth(c.now(), catalogHealth...) deprecationUpdated, err = c.updateDeprecatedStatus(ctx, s.Subscription()) - if err != nil { - return next, err - } if healthUpdated || deprecationUpdated { - _, err = c.client.OperatorsV1alpha1().Subscriptions(ns).UpdateStatus(ctx, s.Subscription(), metav1.UpdateOptions{}) + if _, err := c.client.OperatorsV1alpha1().Subscriptions(ns).UpdateStatus(ctx, s.Subscription(), metav1.UpdateOptions{}); err != nil { + return nil, err + } } case SubscriptionExistsState: if s == nil { diff --git a/pkg/controller/registry/resolver/cache/cache.go b/pkg/controller/registry/resolver/cache/cache.go index a42b45246f..a53b40230d 100644 --- a/pkg/controller/registry/resolver/cache/cache.go +++ b/pkg/controller/registry/resolver/cache/cache.go @@ -139,7 +139,7 @@ func (c *NamespacedOperatorCache) Error() error { err := snapshot.err snapshot.m.RUnlock() if err != nil { - errs = append(errs, fmt.Errorf("failed to populate resolver cache from source %v: %w", key.String(), err)) + errs = append(errs, fmt.Errorf("error using catalog %s (in namespace %s): %w", key.Name, key.Namespace, err)) } } return errors.NewAggregate(errs) diff --git a/pkg/controller/registry/resolver/cache/cache_test.go b/pkg/controller/registry/resolver/cache/cache_test.go index 42800fb5d8..fc4b630c2d 100644 --- a/pkg/controller/registry/resolver/cache/cache_test.go +++ b/pkg/controller/registry/resolver/cache/cache_test.go @@ -238,5 +238,5 @@ func TestNamespaceOperatorCacheError(t *testing.T) { key: ErrorSource{Error: errors.New("testing")}, }) - require.EqualError(t, c.Namespaced("dummynamespace").Error(), "failed to populate resolver cache from source dummyname/dummynamespace: testing") + require.EqualError(t, c.Namespaced("dummynamespace").Error(), "error using catalog dummyname (in namespace dummynamespace): testing") } diff --git a/pkg/controller/registry/resolver/source_registry.go b/pkg/controller/registry/resolver/source_registry.go index bbfef6c542..fe193beae8 100644 --- a/pkg/controller/registry/resolver/source_registry.go +++ b/pkg/controller/registry/resolver/source_registry.go @@ -8,12 +8,15 @@ import ( "time" "github.com/blang/semver/v4" + operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1" + v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" "github.com/operator-framework/operator-registry/pkg/api" "github.com/operator-framework/operator-registry/pkg/client" opregistry "github.com/operator-framework/operator-registry/pkg/registry" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/labels" ) // todo: move to pkg/controller/operators/catalog @@ -65,15 +68,17 @@ func (i *sourceInvalidator) GetValidChannel(key cache.SourceKey) <-chan struct{} } type RegistrySourceProvider struct { - rcp RegistryClientProvider - logger logrus.StdLogger - invalidator *sourceInvalidator + rcp RegistryClientProvider + catsrcLister v1alpha1listers.CatalogSourceLister + logger logrus.StdLogger + invalidator *sourceInvalidator } -func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, logger logrus.StdLogger) *RegistrySourceProvider { +func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, catsrcLister v1alpha1listers.CatalogSourceLister, logger logrus.StdLogger) *RegistrySourceProvider { return &RegistrySourceProvider{ - rcp: rcp, - logger: logger, + rcp: rcp, + logger: logger, + catsrcLister: catsrcLister, invalidator: &sourceInvalidator{ validChans: make(map[cache.SourceKey]chan struct{}), ttl: 5 * time.Minute, @@ -81,15 +86,47 @@ func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, logger } } +type errorSource struct { + error +} + +func (s errorSource) Snapshot(_ context.Context) (*cache.Snapshot, error) { + return nil, s.error +} + func (a *RegistrySourceProvider) Sources(namespaces ...string) map[cache.SourceKey]cache.Source { result := make(map[cache.SourceKey]cache.Source) - for key, client := range a.rcp.ClientsForNamespaces(namespaces...) { - result[cache.SourceKey(key)] = ®istrySource{ - key: cache.SourceKey(key), - client: client, - logger: a.logger, - invalidator: a.invalidator, + + cats := []*operatorsv1alpha1.CatalogSource{} + for _, ns := range namespaces { + catsInNamespace, err := a.catsrcLister.CatalogSources(ns).List(labels.Everything()) + if err != nil { + result[cache.SourceKey{Name: "", Namespace: ns}] = errorSource{ + error: fmt.Errorf("failed to list catalogsources for namespace %q: %w", ns, err), + } + return result } + cats = append(cats, catsInNamespace...) + } + + clients := a.rcp.ClientsForNamespaces(namespaces...) + for _, cat := range cats { + key := cache.SourceKey{Name: cat.Name, Namespace: cat.Namespace} + if client, ok := clients[registry.CatalogKey{Name: cat.Name, Namespace: cat.Namespace}]; ok { + result[key] = ®istrySource{ + key: key, + client: client, + logger: a.logger, + invalidator: a.invalidator, + } + } else { + result[key] = errorSource{ + error: fmt.Errorf("no registry client established for catalogsource %s/%s", cat.Namespace, cat.Name), + } + } + } + if len(result) == 0 { + return nil } return result } diff --git a/pkg/controller/registry/resolver/step_resolver.go b/pkg/controller/registry/resolver/step_resolver.go index 2ba07aa0d9..5d2807bceb 100644 --- a/pkg/controller/registry/resolver/step_resolver.go +++ b/pkg/controller/registry/resolver/step_resolver.go @@ -7,6 +7,7 @@ import ( "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned" + v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1" v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1" controllerbundle "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/bundle" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" @@ -16,10 +17,12 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" ) const ( BundleLookupConditionPacked v1alpha1.BundleLookupConditionType = "BundleLookupNotPersisted" + exclusionAnnotation string = "olm.operatorframework.io/exclude-global-namespace-resolution" ) // init hooks provides the downstream a way to modify the upstream behavior @@ -32,6 +35,7 @@ type StepResolver interface { type OperatorStepResolver struct { subLister v1alpha1listers.SubscriptionLister csvLister v1alpha1listers.ClusterServiceVersionLister + ogLister v1listers.OperatorGroupLister client versioned.Interface globalCatalogNamespace string resolver *Resolver @@ -69,6 +73,7 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio stepResolver := &OperatorStepResolver{ subLister: lister.OperatorsV1alpha1().SubscriptionLister(), csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(), + ogLister: lister.OperatorsV1().OperatorGroupLister(), client: client, globalCatalogNamespace: globalCatalogNamespace, resolver: NewDefaultResolver(cacheSourceProvider, catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}, log), @@ -91,7 +96,22 @@ func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, return nil, nil, nil, err } - namespaces := []string{namespace, r.globalCatalogNamespace} + namespaces := []string{namespace} + ogs, err := r.ogLister.OperatorGroups(namespace).List(labels.Everything()) + if err != nil { + return nil, nil, nil, fmt.Errorf("listing operatorgroups in namespace %s: %s", namespace, err) + } + if len(ogs) != 1 { + return nil, nil, nil, fmt.Errorf("expected 1 OperatorGroup in the namespace, found %d", len(ogs)) + } + og := ogs[0] + if val, ok := og.Annotations[exclusionAnnotation]; ok && val == "true" { + // Exclusion specified + // Ignore the globalNamespace for the purposes of resolution in this namespace + r.log.Printf("excluding global catalogs from resolution in namespace %s", namespace) + } else { + namespaces = append(namespaces, r.globalCatalogNamespace) + } operators, err := r.resolver.Resolve(namespaces, subs) if err != nil { return nil, nil, nil, err diff --git a/pkg/controller/registry/resolver/step_resolver_test.go b/pkg/controller/registry/resolver/step_resolver_test.go index a71d64c145..30a2df9c24 100644 --- a/pkg/controller/registry/resolver/step_resolver_test.go +++ b/pkg/controller/registry/resolver/step_resolver_test.go @@ -1218,7 +1218,7 @@ func TestResolver(t *testing.T) { steps: [][]*v1alpha1.Step{}, subs: []*v1alpha1.Subscription{}, errAssert: func(t *testing.T, err error) { - assert.Contains(t, err.Error(), "failed to populate resolver cache from source @existing/catsrc-namespace: csv") + assert.Contains(t, err.Error(), "error using catalog @existing (in namespace catsrc-namespace): csv") assert.Contains(t, err.Error(), "in phase Failed instead of Replacing") }, }, @@ -1377,6 +1377,7 @@ func TestNamespaceResolverRBAC(t *testing.T) { name: "NewSubscription/Permissions/ClusterPermissions", clusterState: []runtime.Object{ newSub(namespace, "a", "alpha", catalog), + newOperatorGroup("test-og", namespace), }, bundlesInCatalog: []*api.Bundle{bundle}, out: out{ @@ -1392,6 +1393,7 @@ func TestNamespaceResolverRBAC(t *testing.T) { name: "don't create default service accounts", clusterState: []runtime.Object{ newSub(namespace, "a", "alpha", catalog), + newOperatorGroup("test-og", namespace), }, bundlesInCatalog: []*api.Bundle{bundleWithDefaultServiceAccount}, out: out{ @@ -1418,6 +1420,7 @@ func TestNamespaceResolverRBAC(t *testing.T) { lister := operatorlister.NewLister() lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, informerFactory.Operators().V1alpha1().Subscriptions().Lister()) lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, informerFactory.Operators().V1alpha1().ClusterServiceVersions().Lister()) + lister.OperatorsV1().RegisterOperatorGroupLister(namespace, informerFactory.Operators().V1().OperatorGroups().Lister()) stubSnapshot := &resolvercache.Snapshot{} for _, bundle := range tt.bundlesInCatalog {