Skip to content

Commit

Permalink
(fix) Resolver: list CatSrc using client, instead of referring to reg…
Browse files Browse the repository at this point in the history
…istry-server cache

Using "available CatalogSources" information from the registry-client cache was causing
cache inconsistency problems.

This has showed up multiple times in production environments over the years, manifesting
itself in the form of the all subscriptions in a namespace being transitioned into an error
state when a Catalogsource that the cache claims to exist, has actually been deleted from the
cluster, but the cache was not updated.

The Subscriptions are transitioned to an error state because of the deleted catalogsource
with the follwing error message:

"message": "failed to populate resolver cache from source <deleted-catalogsource>: failed to list
bundles: rpc error: code = Unavailable desc = connection error: desc = \"transport:
Error while dialing dial tcp: lookup <deleted-catalogsource>.<ns>.svc on 172.....: no such host\"",
                "reason": "ErrorPreventedResolution",
                "status": "True",
                "type": "ResolutionFailed"

This PR switches the information lookup from the cache, to using a client to list the CatalogSources
present in the cluster.
  • Loading branch information
anik120 committed Aug 6, 2024
1 parent 8089266 commit 1d98c49
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
clientFactory: clients.NewFactory(validatingConfig),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
resolverSourceProvider := NewOperatorGroupToggleSourceProvider(op.sourceInvalidator, logger, op.lister.OperatorsV1().OperatorGroupLister())
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, resolverSourceProvider, logger)
Expand Down
7 changes: 3 additions & 4 deletions pkg/controller/operators/catalog/subscription/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,10 @@ func (c *catalogHealthReconciler) Reconcile(ctx context.Context, in kubestate.St
var healthUpdated, deprecationUpdated bool
next, healthUpdated = s.UpdateHealth(c.now(), catalogHealth...)
deprecationUpdated, err = c.updateDeprecatedStatus(ctx, s.Subscription())
if err != nil {
return next, err
}
if healthUpdated || deprecationUpdated {
_, err = c.client.OperatorsV1alpha1().Subscriptions(ns).UpdateStatus(ctx, s.Subscription(), metav1.UpdateOptions{})
if _, err := c.client.OperatorsV1alpha1().Subscriptions(ns).UpdateStatus(ctx, s.Subscription(), metav1.UpdateOptions{}); err != nil {
return nil, err
}
}
case SubscriptionExistsState:
if s == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/resolver/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (c *NamespacedOperatorCache) Error() error {
err := snapshot.err
snapshot.m.RUnlock()
if err != nil {
errs = append(errs, fmt.Errorf("failed to populate resolver cache from source %v: %w", key.String(), err))
errs = append(errs, fmt.Errorf("error using catalog %s (in namespace %s): %w", key.Name, key.Namespace, err))
}
}
return errors.NewAggregate(errs)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/resolver/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,5 +238,5 @@ func TestNamespaceOperatorCacheError(t *testing.T) {
key: ErrorSource{Error: errors.New("testing")},
})

require.EqualError(t, c.Namespaced("dummynamespace").Error(), "failed to populate resolver cache from source dummyname/dummynamespace: testing")
require.EqualError(t, c.Namespaced("dummynamespace").Error(), "error using catalog dummyname (in namespace dummynamespace): testing")
}
61 changes: 49 additions & 12 deletions pkg/controller/registry/resolver/source_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (
"time"

"github.com/blang/semver/v4"
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
"github.com/operator-framework/operator-registry/pkg/api"
"github.com/operator-framework/operator-registry/pkg/client"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/labels"
)

// todo: move to pkg/controller/operators/catalog
Expand Down Expand Up @@ -65,31 +68,65 @@ func (i *sourceInvalidator) GetValidChannel(key cache.SourceKey) <-chan struct{}
}

type RegistrySourceProvider struct {
rcp RegistryClientProvider
logger logrus.StdLogger
invalidator *sourceInvalidator
rcp RegistryClientProvider
catsrcLister v1alpha1listers.CatalogSourceLister
logger logrus.StdLogger
invalidator *sourceInvalidator
}

func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, logger logrus.StdLogger) *RegistrySourceProvider {
func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, catsrcLister v1alpha1listers.CatalogSourceLister, logger logrus.StdLogger) *RegistrySourceProvider {
return &RegistrySourceProvider{
rcp: rcp,
logger: logger,
rcp: rcp,
logger: logger,
catsrcLister: catsrcLister,
invalidator: &sourceInvalidator{
validChans: make(map[cache.SourceKey]chan struct{}),
ttl: 5 * time.Minute,
},
}
}

type errorSource struct {
error
}

func (s errorSource) Snapshot(_ context.Context) (*cache.Snapshot, error) {
return nil, s.error
}

func (a *RegistrySourceProvider) Sources(namespaces ...string) map[cache.SourceKey]cache.Source {
result := make(map[cache.SourceKey]cache.Source)
for key, client := range a.rcp.ClientsForNamespaces(namespaces...) {
result[cache.SourceKey(key)] = &registrySource{
key: cache.SourceKey(key),
client: client,
logger: a.logger,
invalidator: a.invalidator,

cats := []*operatorsv1alpha1.CatalogSource{}
for _, ns := range namespaces {
catsInNamespace, err := a.catsrcLister.CatalogSources(ns).List(labels.Everything())
if err != nil {
result[cache.SourceKey{Name: "", Namespace: ns}] = errorSource{
error: fmt.Errorf("failed to list catalogsources for namespace %q: %w", ns, err),
}
return result
}
cats = append(cats, catsInNamespace...)
}

clients := a.rcp.ClientsForNamespaces(namespaces...)
for _, cat := range cats {
key := cache.SourceKey{Name: cat.Name, Namespace: cat.Namespace}
if client, ok := clients[registry.CatalogKey{Name: cat.Name, Namespace: cat.Namespace}]; ok {
result[key] = &registrySource{
key: key,
client: client,
logger: a.logger,
invalidator: a.invalidator,
}
} else {
result[key] = errorSource{
error: fmt.Errorf("no registry client established for catalogsource %s/%s", cat.Namespace, cat.Name),
}
}
}
if len(result) == 0 {
return nil
}
return result
}
Expand Down
22 changes: 21 additions & 1 deletion pkg/controller/registry/resolver/step_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1"
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
controllerbundle "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/bundle"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
Expand All @@ -16,10 +17,12 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)

const (
BundleLookupConditionPacked v1alpha1.BundleLookupConditionType = "BundleLookupNotPersisted"
exclusionAnnotation string = "olm.operatorframework.io/exclude-global-namespace-resolution"
)

// init hooks provides the downstream a way to modify the upstream behavior
Expand All @@ -32,6 +35,7 @@ type StepResolver interface {
type OperatorStepResolver struct {
subLister v1alpha1listers.SubscriptionLister
csvLister v1alpha1listers.ClusterServiceVersionLister
ogLister v1listers.OperatorGroupLister
client versioned.Interface
globalCatalogNamespace string
resolver *Resolver
Expand Down Expand Up @@ -69,6 +73,7 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio
stepResolver := &OperatorStepResolver{
subLister: lister.OperatorsV1alpha1().SubscriptionLister(),
csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(),
ogLister: lister.OperatorsV1().OperatorGroupLister(),
client: client,
globalCatalogNamespace: globalCatalogNamespace,
resolver: NewDefaultResolver(cacheSourceProvider, catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}, log),
Expand All @@ -91,7 +96,22 @@ func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step,
return nil, nil, nil, err
}

namespaces := []string{namespace, r.globalCatalogNamespace}
namespaces := []string{namespace}
ogs, err := r.ogLister.OperatorGroups(namespace).List(labels.Everything())
if err != nil {
return nil, nil, nil, fmt.Errorf("listing operatorgroups in namespace %s: %s", namespace, err)
}
if len(ogs) != 1 {
return nil, nil, nil, fmt.Errorf("expected 1 OperatorGroup in the namespace, found %d", len(ogs))
}
og := ogs[0]
if val, ok := og.Annotations[exclusionAnnotation]; ok && val == "true" {
// Exclusion specified
// Ignore the globalNamespace for the purposes of resolution in this namespace
r.log.Printf("excluding global catalogs from resolution in namespace %s", namespace)
} else {
namespaces = append(namespaces, r.globalCatalogNamespace)
}
operators, err := r.resolver.Resolve(namespaces, subs)
if err != nil {
return nil, nil, nil, err
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/registry/resolver/step_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ func TestResolver(t *testing.T) {
steps: [][]*v1alpha1.Step{},
subs: []*v1alpha1.Subscription{},
errAssert: func(t *testing.T, err error) {
assert.Contains(t, err.Error(), "failed to populate resolver cache from source @existing/catsrc-namespace: csv")
assert.Contains(t, err.Error(), "error using catalog @existing (in namespace catsrc-namespace): csv")
assert.Contains(t, err.Error(), "in phase Failed instead of Replacing")
},
},
Expand Down Expand Up @@ -1377,6 +1377,7 @@ func TestNamespaceResolverRBAC(t *testing.T) {
name: "NewSubscription/Permissions/ClusterPermissions",
clusterState: []runtime.Object{
newSub(namespace, "a", "alpha", catalog),
newOperatorGroup("test-og", namespace),
},
bundlesInCatalog: []*api.Bundle{bundle},
out: out{
Expand All @@ -1392,6 +1393,7 @@ func TestNamespaceResolverRBAC(t *testing.T) {
name: "don't create default service accounts",
clusterState: []runtime.Object{
newSub(namespace, "a", "alpha", catalog),
newOperatorGroup("test-og", namespace),
},
bundlesInCatalog: []*api.Bundle{bundleWithDefaultServiceAccount},
out: out{
Expand All @@ -1418,6 +1420,7 @@ func TestNamespaceResolverRBAC(t *testing.T) {
lister := operatorlister.NewLister()
lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, informerFactory.Operators().V1alpha1().Subscriptions().Lister())
lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, informerFactory.Operators().V1alpha1().ClusterServiceVersions().Lister())
lister.OperatorsV1().RegisterOperatorGroupLister(namespace, informerFactory.Operators().V1().OperatorGroups().Lister())

stubSnapshot := &resolvercache.Snapshot{}
for _, bundle := range tt.bundlesInCatalog {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/subscription_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ var _ = Describe("Subscription", func() {
Expect(err).NotTo(HaveOccurred())
})

When("missing target catalog", func() {
FWhen("missing target catalog", func() {
It("should surface the missing catalog", func() {
By(`TestSubscriptionStatusMissingTargetCatalogSource ensures that a Subscription has the appropriate status condition when`)
By(`its target catalog is missing.`)
Expand Down

0 comments on commit 1d98c49

Please sign in to comment.