diff --git a/internal/certmanager/cm_controller.go b/internal/certmanager/cm_controller.go index 2f0f78da57..03a6876b1c 100644 --- a/internal/certmanager/cm_controller.go +++ b/internal/certmanager/cm_controller.go @@ -56,16 +56,15 @@ const ( // and creates/ updates certificates for VS resources as required, // and VS resources when certificate objects are created/ updated type CmController struct { - vsLister []listers_v1.VirtualServerLister - sync SyncFn - ctx context.Context - mustSync []cache.InformerSynced - queue workqueue.RateLimitingInterface - vsSharedInformerFactory []vsinformers.SharedInformerFactory - cmSharedInformerFactory []cm_informers.SharedInformerFactory - kubeSharedInformerFactory []kubeinformers.SharedInformerFactory - recorder record.EventRecorder - cmClient *cm_clientset.Clientset + sync SyncFn + ctx context.Context + mustSync []cache.InformerSynced + queue workqueue.RateLimitingInterface + informerGroup map[string]*namespacedInformer + recorder record.EventRecorder + cmClient *cm_clientset.Clientset + kubeClient kubernetes.Interface + vsClient k8s_nginx.Interface } // CmOpts is the options required for building the CmController @@ -78,27 +77,42 @@ type CmOpts struct { vsClient k8s_nginx.Interface } +type namespacedInformer struct { + vsSharedInformerFactory vsinformers.SharedInformerFactory + cmSharedInformerFactory cm_informers.SharedInformerFactory + kubeSharedInformerFactory kubeinformers.SharedInformerFactory + vsLister listers_v1.VirtualServerLister + cmLister cmlisters.CertificateLister +} + func (c *CmController) register() workqueue.RateLimitingInterface { - var cmLister []cmlisters.CertificateLister - for _, sif := range c.vsSharedInformerFactory { - c.vsLister = append(c.vsLister, sif.K8s().V1().VirtualServers().Lister()) - sif.K8s().V1().VirtualServers().Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{ - Queue: c.queue, - }) - c.mustSync = append(c.mustSync, sif.K8s().V1().VirtualServers().Informer().HasSynced) - } + c.sync = SyncFnFor(c.recorder, c.cmClient, c.informerGroup) + return c.queue +} - for _, cif := range c.cmSharedInformerFactory { - cif.Certmanager().V1().Certificates().Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{ - WorkFunc: certificateHandler(c.queue), - }) - cmLister = append(cmLister, cif.Certmanager().V1().Certificates().Lister()) - c.mustSync = append(c.mustSync, cif.Certmanager().V1().Certificates().Informer().HasSynced) - } +func (c *CmController) newNamespacedInformer(ns string) { + nsi := &namespacedInformer{} + nsi.cmSharedInformerFactory = cm_informers.NewSharedInformerFactoryWithOptions(c.cmClient, resyncPeriod, cm_informers.WithNamespace(ns)) + nsi.kubeSharedInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(c.kubeClient, resyncPeriod, kubeinformers.WithNamespace(ns)) + nsi.vsSharedInformerFactory = vsinformers.NewSharedInformerFactoryWithOptions(c.vsClient, resyncPeriod, vsinformers.WithNamespace(ns)) - c.sync = SyncFnFor(c.recorder, c.cmClient, cmLister) + c.addHandlers(nsi) - return c.queue + c.informerGroup[ns] = nsi +} + +func (c *CmController) addHandlers(nsi *namespacedInformer) { + nsi.vsLister = nsi.vsSharedInformerFactory.K8s().V1().VirtualServers().Lister() + nsi.vsSharedInformerFactory.K8s().V1().VirtualServers().Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{ + Queue: c.queue, + }) + c.mustSync = append(c.mustSync, nsi.vsSharedInformerFactory.K8s().V1().VirtualServers().Informer().HasSynced) + + nsi.cmSharedInformerFactory.Certmanager().V1().Certificates().Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{ + WorkFunc: certificateHandler(c.queue), + }) + nsi.cmLister = nsi.cmSharedInformerFactory.Certmanager().V1().Certificates().Lister() + c.mustSync = append(c.mustSync, nsi.cmSharedInformerFactory.Certmanager().V1().Certificates().Informer().HasSynced) } func (c *CmController) processItem(ctx context.Context, key string) error { @@ -108,14 +122,11 @@ func (c *CmController) processItem(ctx context.Context, key string) error { runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return err } + nsi := getNamespacedInformer(namespace, c.informerGroup) var vs *conf_v1.VirtualServer - for _, vl := range c.vsLister { - vs, err = vl.VirtualServers(namespace).Get(name) - if err == nil { - break - } - } + vs, err = nsi.vsLister.VirtualServers(namespace).Get(name) + if err != nil { return err } @@ -168,25 +179,22 @@ func NewCmController(opts *CmOpts) *CmController { // Create a cert-manager api client intcl, _ := cm_clientset.NewForConfig(opts.kubeConfig) - var vsSharedInformerFactory []vsinformers.SharedInformerFactory - var cmSharedInformerFactory []cm_informers.SharedInformerFactory - var kubeSharedInformerFactory []kubeinformers.SharedInformerFactory + ig := make(map[string]*namespacedInformer) - for _, ns := range opts.namespace { - cmSharedInformerFactory = append(cmSharedInformerFactory, cm_informers.NewSharedInformerFactoryWithOptions(intcl, resyncPeriod, cm_informers.WithNamespace(ns))) - kubeSharedInformerFactory = append(kubeSharedInformerFactory, kubeinformers.NewSharedInformerFactoryWithOptions(opts.kubeClient, resyncPeriod, kubeinformers.WithNamespace(ns))) - vsSharedInformerFactory = append(vsSharedInformerFactory, vsinformers.NewSharedInformerFactoryWithOptions(opts.vsClient, resyncPeriod, vsinformers.WithNamespace(ns))) + cm := &CmController{ + ctx: opts.context, + queue: workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName), + informerGroup: ig, + recorder: opts.eventRecorder, + cmClient: intcl, + kubeClient: opts.kubeClient, + vsClient: opts.vsClient, } - cm := &CmController{ - ctx: opts.context, - queue: workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName), - cmSharedInformerFactory: cmSharedInformerFactory, - kubeSharedInformerFactory: kubeSharedInformerFactory, - recorder: opts.eventRecorder, - cmClient: intcl, - vsSharedInformerFactory: vsSharedInformerFactory, + for _, ns := range opts.namespace { + cm.newNamespacedInformer(ns) } + cm.register() return cm } @@ -201,14 +209,10 @@ func (c *CmController) Run(stopCh <-chan struct{}) { glog.Infof("Starting cert-manager control loop") - for _, vif := range c.vsSharedInformerFactory { - go vif.Start(c.ctx.Done()) - } - for _, cif := range c.cmSharedInformerFactory { - go cif.Start(c.ctx.Done()) - } - for _, kif := range c.kubeSharedInformerFactory { - go kif.Start(c.ctx.Done()) + for _, ig := range c.informerGroup { + go ig.vsSharedInformerFactory.Start(c.ctx.Done()) + go ig.cmSharedInformerFactory.Start(c.ctx.Done()) + go ig.kubeSharedInformerFactory.Start(c.ctx.Done()) } // // wait for all the informer caches we depend on are synced glog.V(3).Infof("Waiting for %d caches to sync", len(c.mustSync)) diff --git a/internal/certmanager/cm_controller_test.go b/internal/certmanager/cm_controller_test.go index b7b0da7980..92c9dcaaaf 100644 --- a/internal/certmanager/cm_controller_test.go +++ b/internal/certmanager/cm_controller_test.go @@ -23,19 +23,16 @@ import ( cmapi "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1" cmclient "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned" - cm_informers "github.com/cert-manager/cert-manager/pkg/client/informers/externalversions" controllerpkg "github.com/cert-manager/cert-manager/pkg/controller" testpkg "github.com/nginxinc/kubernetes-ingress/internal/certmanager/test_files" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/util/workqueue" vsapi "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1" k8s_nginx "github.com/nginxinc/kubernetes-ingress/pkg/client/clientset/versioned" - vsinformers "github.com/nginxinc/kubernetes-ingress/pkg/client/informers/externalversions" ) func Test_controller_Register(t *testing.T) { @@ -138,15 +135,27 @@ func Test_controller_Register(t *testing.T) { // Certificate event is received then HasSynced has not been setup // properly. + ig := make(map[string]*namespacedInformer) + + nsi := &namespacedInformer{ + cmSharedInformerFactory: b.Context.SharedInformerFactory, + kubeSharedInformerFactory: b.Context.KubeSharedInformerFactory, + vsSharedInformerFactory: b.VsSharedInformerFactory, + } + + ig[""] = nsi + cm := &CmController{ - ctx: b.RootContext, - queue: workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName), - cmSharedInformerFactory: []cm_informers.SharedInformerFactory{b.FakeCMInformerFactory()}, - kubeSharedInformerFactory: []kubeinformers.SharedInformerFactory{b.FakeKubeInformerFactory()}, - recorder: b.Recorder, - vsSharedInformerFactory: []vsinformers.SharedInformerFactory{b.VsSharedInformerFactory}, + ctx: b.RootContext, + queue: workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName), + informerGroup: ig, + recorder: b.Recorder, + kubeClient: b.Client, + vsClient: b.VSClient, } + cm.addHandlers(nsi) + queue := cm.register() b.Start() diff --git a/internal/certmanager/helper.go b/internal/certmanager/helper.go index 1b41ddd954..03c60d05b1 100644 --- a/internal/certmanager/helper.go +++ b/internal/certmanager/helper.go @@ -120,3 +120,21 @@ func translateVsSpec(crt *cmapi.Certificate, vsCmSpec *vsapi.CertManager) error } return nil } + +func getNamespacedInformer(ns string, ig map[string]*namespacedInformer) *namespacedInformer { + var nsi *namespacedInformer + var isGlobalNs bool + var exists bool + + nsi, isGlobalNs = ig[""] + + if !isGlobalNs { + // get the correct namespaced informers + nsi, exists = ig[ns] + if !exists { + // we are not watching this namespace + return nil + } + } + return nsi +} diff --git a/internal/certmanager/sync.go b/internal/certmanager/sync.go index f466adcd23..e4c7cd8e34 100644 --- a/internal/certmanager/sync.go +++ b/internal/certmanager/sync.go @@ -60,7 +60,7 @@ type SyncFn func(context.Context, *vsapi.VirtualServer) error func SyncFnFor( rec record.EventRecorder, cmClient clientset.Interface, - cmLister []cmlisters.CertificateLister, + ig map[string]*namespacedInformer, ) SyncFn { return func(ctx context.Context, vs *vsapi.VirtualServer) error { var err error @@ -75,7 +75,9 @@ func SyncFnFor( return err } - newCrts, updateCrts, err := buildCertificates(cmLister, vs, issuerName, issuerKind, issuerGroup) + nsi := getNamespacedInformer(vs.GetNamespace(), ig) + + newCrts, updateCrts, err := buildCertificates(nsi.cmLister, vs, issuerName, issuerKind, issuerGroup) if err != nil { glog.Errorf("Incorrect cert-manager configuration for VirtualServer resource: %v", err) rec.Eventf(vs, corev1.EventTypeWarning, reasonBadConfig, "Incorrect cert-manager configuration for VirtualServer resource: %s", @@ -106,12 +108,8 @@ func SyncFnFor( } var certs []*cmapi.Certificate - for _, cl := range cmLister { - certs, err = cl.Certificates(vs.GetNamespace()).List(labels.Everything()) - if len(certs) > 0 { - break - } - } + certs, err = nsi.cmLister.Certificates(vs.GetNamespace()).List(labels.Everything()) + if err != nil { return err } @@ -131,7 +129,7 @@ func SyncFnFor( } func buildCertificates( - cmLister []cmlisters.CertificateLister, + cmLister cmlisters.CertificateLister, vs *vsapi.VirtualServer, issuerName, issuerKind, issuerGroup string, ) (newCert, update []*cmapi.Certificate, _ error) { @@ -140,12 +138,8 @@ func buildCertificates( var existingCrt *cmapi.Certificate var err error - for _, cl := range cmLister { - existingCrt, err = cl.Certificates(vs.Namespace).Get(vs.Spec.TLS.Secret) - if err == nil { - break - } - } + existingCrt, err = cmLister.Certificates(vs.Namespace).Get(vs.Spec.TLS.Secret) + if !apierrors.IsNotFound(err) && err != nil { return nil, nil, err } diff --git a/internal/certmanager/sync_test.go b/internal/certmanager/sync_test.go index 453f7b1975..b3b662fd14 100644 --- a/internal/certmanager/sync_test.go +++ b/internal/certmanager/sync_test.go @@ -23,7 +23,6 @@ import ( cmapi "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1" cmmeta "github.com/cert-manager/cert-manager/pkg/apis/meta/v1" - cmlisters "github.com/cert-manager/cert-manager/pkg/client/listers/certmanager/v1" "github.com/cert-manager/cert-manager/test/unit/gen" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -491,7 +490,19 @@ func TestSync(t *testing.T) { } b.Init() defer b.Stop() - sync := SyncFnFor(b.Recorder, b.CMClient, []cmlisters.CertificateLister{b.SharedInformerFactory.Certmanager().V1().Certificates().Lister()}) + + ig := make(map[string]*namespacedInformer) + + nsi := &namespacedInformer{ + cmSharedInformerFactory: b.FakeCMInformerFactory(), + kubeSharedInformerFactory: b.FakeKubeInformerFactory(), + vsSharedInformerFactory: b.VsSharedInformerFactory, + cmLister: b.SharedInformerFactory.Certmanager().V1().Certificates().Lister(), + } + + ig[""] = nsi + + sync := SyncFnFor(b.Recorder, b.CMClient, ig) b.Start() err := sync(context.Background(), &test.VirtualServer) diff --git a/internal/externaldns/controller.go b/internal/externaldns/controller.go index 8522e03a64..db3732cd31 100644 --- a/internal/externaldns/controller.go +++ b/internal/externaldns/controller.go @@ -28,15 +28,20 @@ const ( // ExtDNSController represents ExternalDNS controller. type ExtDNSController struct { - vsLister []listersV1.VirtualServerLister - sync SyncFn - ctx context.Context - mustSync []cache.InformerSynced - queue workqueue.RateLimitingInterface - sharedInformerFactory []k8s_nginx_informers.SharedInformerFactory - recorder record.EventRecorder - client k8s_nginx.Interface - extdnslister []extdnslisters.DNSEndpointLister + sync SyncFn + ctx context.Context + mustSync []cache.InformerSynced + queue workqueue.RateLimitingInterface + recorder record.EventRecorder + client k8s_nginx.Interface + informerGroup map[string]*namespacedInformer + resync time.Duration +} + +type namespacedInformer struct { + vsLister listersV1.VirtualServerLister + sharedInformerFactory k8s_nginx_informers.SharedInformerFactory + extdnslister extdnslisters.DNSEndpointLister } // ExtDNSOpts represents config required for building the External DNS Controller. @@ -50,45 +55,44 @@ type ExtDNSOpts struct { // NewController takes external dns config and return a new External DNS Controller. func NewController(opts *ExtDNSOpts) *ExtDNSController { - var sharedInformerFactory []k8s_nginx_informers.SharedInformerFactory - for _, ns := range opts.namespace { - sif := k8s_nginx_informers.NewSharedInformerFactoryWithOptions(opts.client, opts.resyncPeriod, k8s_nginx_informers.WithNamespace(ns)) - sharedInformerFactory = append(sharedInformerFactory, sif) + ig := make(map[string]*namespacedInformer) + c := &ExtDNSController{ + ctx: opts.context, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), + informerGroup: ig, + recorder: opts.eventRecorder, + client: opts.client, + resync: opts.resyncPeriod, } - c := &ExtDNSController{ - ctx: opts.context, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), - sharedInformerFactory: sharedInformerFactory, - recorder: opts.eventRecorder, - client: opts.client, + for _, ns := range opts.namespace { + c.newNamespacedInformer(ns) } - c.register() + + c.sync = SyncFnFor(c.recorder, c.client, c.informerGroup) return c } -func (c *ExtDNSController) register() workqueue.Interface { - for _, sif := range c.sharedInformerFactory { - c.vsLister = append(c.vsLister, sif.K8s().V1().VirtualServers().Lister()) - c.extdnslister = append(c.extdnslister, sif.Externaldns().V1().DNSEndpoints().Lister()) - - sif.K8s().V1().VirtualServers().Informer().AddEventHandler( - &QueuingEventHandler{ - Queue: c.queue, - }, - ) - - sif.Externaldns().V1().DNSEndpoints().Informer().AddEventHandler(&BlockingEventHandler{ - WorkFunc: externalDNSHandler(c.queue), - }) - - c.mustSync = append(c.mustSync, - sif.K8s().V1().VirtualServers().Informer().HasSynced, - sif.Externaldns().V1().DNSEndpoints().Informer().HasSynced, - ) - } - c.sync = SyncFnFor(c.recorder, c.client, c.extdnslister) - return c.queue +func (c *ExtDNSController) newNamespacedInformer(ns string) { + nsi := namespacedInformer{sharedInformerFactory: k8s_nginx_informers.NewSharedInformerFactoryWithOptions(c.client, c.resync, k8s_nginx_informers.WithNamespace(ns))} + nsi.vsLister = nsi.sharedInformerFactory.K8s().V1().VirtualServers().Lister() + nsi.extdnslister = nsi.sharedInformerFactory.Externaldns().V1().DNSEndpoints().Lister() + + nsi.sharedInformerFactory.K8s().V1().VirtualServers().Informer().AddEventHandler( + &QueuingEventHandler{ + Queue: c.queue, + }, + ) + + nsi.sharedInformerFactory.Externaldns().V1().DNSEndpoints().Informer().AddEventHandler(&BlockingEventHandler{ + WorkFunc: externalDNSHandler(c.queue), + }) + + c.mustSync = append(c.mustSync, + nsi.sharedInformerFactory.K8s().V1().VirtualServers().Informer().HasSynced, + nsi.sharedInformerFactory.Externaldns().V1().DNSEndpoints().Informer().HasSynced, + ) + c.informerGroup[ns] = &nsi } // Run sets up the event handlers for types we are interested in, as well @@ -101,8 +105,8 @@ func (c *ExtDNSController) Run(stopCh <-chan struct{}) { glog.Infof("Starting external-dns control loop") - for _, sif := range c.sharedInformerFactory { - go sif.Start(c.ctx.Done()) + for _, ig := range c.informerGroup { + go ig.sharedInformerFactory.Start(c.ctx.Done()) } // wait for all informer caches to be synced @@ -154,12 +158,9 @@ func (c *ExtDNSController) processItem(ctx context.Context, key string) error { return err } var vs *conf_v1.VirtualServer - for _, vl := range c.vsLister { - vs, err = vl.VirtualServers(namespace).Get(name) - if err == nil { - break - } - } + nsi := getNamespacedInformer(namespace, c.informerGroup) + vs, err = nsi.vsLister.VirtualServers(namespace).Get(name) + if err != nil { return err } @@ -209,3 +210,21 @@ func BuildOpts( resyncPeriod: resync, } } + +func getNamespacedInformer(ns string, ig map[string]*namespacedInformer) *namespacedInformer { + var nsi *namespacedInformer + var isGlobalNs bool + var exists bool + + nsi, isGlobalNs = ig[""] + + if !isGlobalNs { + // get the correct namespaced informers + nsi, exists = ig[ns] + if !exists { + // we are not watching this namespace + return nil + } + } + return nsi +} diff --git a/internal/externaldns/sync.go b/internal/externaldns/sync.go index debaf6a6c1..cc607cb661 100644 --- a/internal/externaldns/sync.go +++ b/internal/externaldns/sync.go @@ -36,7 +36,7 @@ var vsGVK = vsapi.SchemeGroupVersion.WithKind("VirtualServer") type SyncFn func(context.Context, *vsapi.VirtualServer) error // SyncFnFor knows how to reconcile VirtualServer DNSEndpoint object. -func SyncFnFor(rec record.EventRecorder, client clientset.Interface, extdnsLister []extdnslisters.DNSEndpointLister) SyncFn { +func SyncFnFor(rec record.EventRecorder, client clientset.Interface, ig map[string]*namespacedInformer) SyncFn { return func(ctx context.Context, vs *vsapi.VirtualServer) error { // Do nothing if ExternalDNS is not present (nil) in VS or is not enabled. if !vs.Spec.ExternalDNS.Enable { @@ -56,7 +56,9 @@ func SyncFnFor(rec record.EventRecorder, client clientset.Interface, extdnsListe return err } - newDNSEndpoint, updateDNSEndpoint, err := buildDNSEndpoint(extdnsLister, vs, targets, recordType) + nsi := getNamespacedInformer(vs.Namespace, ig) + + newDNSEndpoint, updateDNSEndpoint, err := buildDNSEndpoint(nsi.extdnslister, vs, targets, recordType) if err != nil { glog.Errorf("error message here %s", err) rec.Eventf(vs, corev1.EventTypeWarning, reasonBadConfig, "Incorrect DNSEndpoint config for VirtualServer resource: %s", err) @@ -136,17 +138,14 @@ func getValidTargets(endpoints []vsapi.ExternalEndpoint) (extdnsapi.Targets, str return targets, recordType, err } -func buildDNSEndpoint(extdnsLister []extdnslisters.DNSEndpointLister, vs *vsapi.VirtualServer, targets extdnsapi.Targets, recordType string) (*extdnsapi.DNSEndpoint, *extdnsapi.DNSEndpoint, error) { +func buildDNSEndpoint(extdnsLister extdnslisters.DNSEndpointLister, vs *vsapi.VirtualServer, targets extdnsapi.Targets, recordType string) (*extdnsapi.DNSEndpoint, *extdnsapi.DNSEndpoint, error) { var updateDNSEndpoint *extdnsapi.DNSEndpoint var newDNSEndpoint *extdnsapi.DNSEndpoint var existingDNSEndpoint *extdnsapi.DNSEndpoint var err error - for _, el := range extdnsLister { - existingDNSEndpoint, err = el.DNSEndpoints(vs.Namespace).Get(vs.ObjectMeta.Name) - if err == nil { - break - } - } + + existingDNSEndpoint, err = extdnsLister.DNSEndpoints(vs.Namespace).Get(vs.ObjectMeta.Name) + if !apierrors.IsNotFound(err) && err != nil { return nil, nil, err } diff --git a/internal/externaldns/sync_test.go b/internal/externaldns/sync_test.go index 58ca613a24..812e4a2e66 100644 --- a/internal/externaldns/sync_test.go +++ b/internal/externaldns/sync_test.go @@ -276,8 +276,10 @@ func TestSync_ReturnsErrorOnFailure(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { rec := EventRecorder{} - eplister := []extdnsclient.DNSEndpointLister{DNSEPLister{}} - fn := SyncFnFor(rec, nil, eplister) + ig := make(map[string]*namespacedInformer) + nsi := namespacedInformer{extdnslister: DNSEPLister{}} + ig[""] = &nsi + fn := SyncFnFor(rec, nil, ig) err := fn(context.TODO(), tc.input) if err == nil { t.Error("want error, got nil") diff --git a/internal/k8s/controller.go b/internal/k8s/controller.go index 64822d1d62..4b34be3057 100644 --- a/internal/k8s/controller.go +++ b/internal/k8s/controller.go @@ -105,30 +105,12 @@ type LoadBalancerController struct { dynClient dynamic.Interface restConfig *rest.Config cacheSyncs []cache.InformerSynced - sharedInformerFactory []informers.SharedInformerFactory - confSharedInformerFactory []k8s_nginx_informers.SharedInformerFactory - secretInformerFactory []informers.SharedInformerFactory + namespacedInformers map[string]*namespacedInformer configMapController cache.Controller - dynInformerFactory []dynamicinformer.DynamicSharedInformerFactory globalConfigurationController cache.Controller ingressLinkInformer cache.SharedIndexInformer - ingressLister []storeToIngressLister - svcLister []cache.Store - endpointLister []storeToEndpointLister configMapLister storeToConfigMapLister - podLister []indexerToPodLister - secretLister []cache.Store - virtualServerLister []cache.Store - virtualServerRouteLister []cache.Store - appProtectPolicyLister []cache.Store - appProtectLogConfLister []cache.Store - appProtectDosPolicyLister []cache.Store - appProtectDosLogConfLister []cache.Store - appProtectDosProtectedLister []cache.Store globalConfigurationLister cache.Store - appProtectUserSigLister []cache.Store - transportServerLister []cache.Store - policyLister []cache.Store ingressLinkLister cache.Store syncQueue *taskQueue ctx context.Context @@ -277,45 +259,12 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc glog.V(3).Infof("Nginx Ingress Controller has class: %v", input.IngressClass) + lbc.namespacedInformers = make(map[string]*namespacedInformer) for _, ns := range lbc.namespaceList { - lbc.sharedInformerFactory = append(lbc.sharedInformerFactory, informers.NewSharedInformerFactoryWithOptions(lbc.client, input.ResyncPeriod, informers.WithNamespace(ns))) + lbc.newNamespacedInformer(ns) } - // create handlers for resources we care about - lbc.addIngressHandler(createIngressHandlers(lbc)) - lbc.addServiceHandler(createServiceHandlers(lbc)) - lbc.addEndpointHandler(createEndpointHandlers(lbc)) - lbc.addPodHandler() - - secretsTweakListOptionsFunc := func(options *meta_v1.ListOptions) { - // Filter for helm release secrets. - helmSecretSelector := fields.OneTermNotEqualSelector(typeKeyword, helmReleaseType) - baseSelector, err := fields.ParseSelector(options.FieldSelector) - - if err != nil { - options.FieldSelector = helmSecretSelector.String() - } else { - options.FieldSelector = fields.AndSelectors(baseSelector, helmSecretSelector).String() - } - } - - // Creating a separate informer for secrets. - for _, ns := range lbc.secretNamespaceList { - lbc.secretInformerFactory = append(lbc.secretInformerFactory, informers.NewSharedInformerFactoryWithOptions(lbc.client, input.ResyncPeriod, informers.WithNamespace(ns), informers.WithTweakListOptions(secretsTweakListOptionsFunc))) - } - - lbc.addSecretHandler(createSecretHandlers(lbc)) - if lbc.areCustomResourcesEnabled { - for _, ns := range lbc.namespaceList { - lbc.confSharedInformerFactory = append(lbc.confSharedInformerFactory, k8s_nginx_informers.NewSharedInformerFactoryWithOptions(lbc.confClient, input.ResyncPeriod, k8s_nginx_informers.WithNamespace(ns))) - } - - lbc.addVirtualServerHandler(createVirtualServerHandlers(lbc)) - lbc.addVirtualServerRouteHandler(createVirtualServerRouteHandlers(lbc)) - lbc.addTransportServerHandler(createTransportServerHandlers(lbc)) - lbc.addPolicyHandler(createPolicyHandlers(lbc)) - if input.GlobalConfiguration != "" { lbc.watchGlobalConfiguration = true ns, name, _ := ParseNamespaceName(input.GlobalConfiguration) @@ -323,23 +272,6 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc } } - if lbc.appProtectEnabled || lbc.appProtectDosEnabled { - for _, ns := range lbc.namespaceList { - lbc.dynInformerFactory = append(lbc.dynInformerFactory, dynamicinformer.NewFilteredDynamicSharedInformerFactory(lbc.dynClient, 0, ns, nil)) - } - if lbc.appProtectEnabled { - lbc.addAppProtectPolicyHandler(createAppProtectPolicyHandlers(lbc)) - lbc.addAppProtectLogConfHandler(createAppProtectLogConfHandlers(lbc)) - lbc.addAppProtectUserSigHandler(createAppProtectUserSigHandlers(lbc)) - } - - if lbc.appProtectDosEnabled { - lbc.addAppProtectDosPolicyHandler(createAppProtectDosPolicyHandlers(lbc)) - lbc.addAppProtectDosLogConfHandler(createAppProtectDosLogConfHandlers(lbc)) - lbc.addAppProtectDosProtectedResourceHandler(createAppProtectDosProtectedResourceHandlers(lbc)) - } - } - if input.ConfigMaps != "" { nginxConfigMapsNS, nginxConfigMapsName, err := ParseNamespaceName(input.ConfigMaps) if err != nil { @@ -360,17 +292,13 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc } lbc.statusUpdater = &statusUpdater{ - client: input.KubeClient, - namespace: input.ControllerNamespace, - externalServiceName: input.ExternalServiceName, - ingressLister: lbc.ingressLister, - virtualServerLister: lbc.virtualServerLister, - virtualServerRouteLister: lbc.virtualServerRouteLister, - transportServerLister: lbc.transportServerLister, - policyLister: lbc.policyLister, - keyFunc: keyFunc, - confClient: input.ConfClient, - hasCorrectIngressClass: lbc.HasCorrectIngressClass, + client: input.KubeClient, + namespace: input.ControllerNamespace, + externalServiceName: input.ExternalServiceName, + namespacedInformers: lbc.namespacedInformers, + keyFunc: keyFunc, + confClient: input.ConfClient, + hasCorrectIngressClass: lbc.HasCorrectIngressClass, } lbc.configuration = NewConfiguration( @@ -396,6 +324,99 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc return lbc } +type namespacedInformer struct { + // namespace string + sharedInformerFactory informers.SharedInformerFactory + confSharedInformerFactory k8s_nginx_informers.SharedInformerFactory + secretInformerFactory informers.SharedInformerFactory + dynInformerFactory dynamicinformer.DynamicSharedInformerFactory + ingressLister storeToIngressLister + svcLister cache.Store + endpointLister storeToEndpointLister + podLister indexerToPodLister + secretLister cache.Store + virtualServerLister cache.Store + virtualServerRouteLister cache.Store + appProtectPolicyLister cache.Store + appProtectLogConfLister cache.Store + appProtectDosPolicyLister cache.Store + appProtectDosLogConfLister cache.Store + appProtectDosProtectedLister cache.Store + appProtectUserSigLister cache.Store + transportServerLister cache.Store + policyLister cache.Store + isSecretsEnabledNamespace bool + areCustomResourcesEnabled bool + appProtectEnabled bool + appProtectDosEnabled bool + stopCh <-chan struct{} +} + +func (lbc *LoadBalancerController) newNamespacedInformer(ns string) { + nsi := &namespacedInformer{} + nsi.sharedInformerFactory = informers.NewSharedInformerFactoryWithOptions(lbc.client, lbc.resync, informers.WithNamespace(ns)) + + // create handlers for resources we care about + lbc.addIngressHandler(createIngressHandlers(lbc), nsi) + lbc.addServiceHandler(createServiceHandlers(lbc), nsi) + lbc.addEndpointHandler(createEndpointHandlers(lbc), nsi) + lbc.addPodHandler(nsi) + + secretsTweakListOptionsFunc := func(options *meta_v1.ListOptions) { + // Filter for helm release secrets. + helmSecretSelector := fields.OneTermNotEqualSelector(typeKeyword, helmReleaseType) + baseSelector, err := fields.ParseSelector(options.FieldSelector) + + if err != nil { + options.FieldSelector = helmSecretSelector.String() + } else { + options.FieldSelector = fields.AndSelectors(baseSelector, helmSecretSelector).String() + } + } + + // Check if secrets informer should be created for this namespace + for _, v := range lbc.secretNamespaceList { + if v == ns { + nsi.isSecretsEnabledNamespace = true + nsi.secretInformerFactory = informers.NewSharedInformerFactoryWithOptions(lbc.client, lbc.resync, informers.WithNamespace(ns), informers.WithTweakListOptions(secretsTweakListOptionsFunc)) + lbc.addSecretHandler(createSecretHandlers(lbc), nsi) + break + } + } + + if lbc.areCustomResourcesEnabled { + nsi.areCustomResourcesEnabled = true + nsi.confSharedInformerFactory = k8s_nginx_informers.NewSharedInformerFactoryWithOptions(lbc.confClient, lbc.resync, k8s_nginx_informers.WithNamespace(ns)) + + lbc.addVirtualServerHandler(createVirtualServerHandlers(lbc), nsi) + lbc.addVirtualServerRouteHandler(createVirtualServerRouteHandlers(lbc), nsi) + lbc.addTransportServerHandler(createTransportServerHandlers(lbc), nsi) + lbc.addPolicyHandler(createPolicyHandlers(lbc), nsi) + + } + + if lbc.appProtectEnabled || lbc.appProtectDosEnabled { + for _, ns := range lbc.namespaceList { + nsi.dynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(lbc.dynClient, 0, ns, nil) + } + if lbc.appProtectEnabled { + nsi.appProtectEnabled = true + lbc.addAppProtectPolicyHandler(createAppProtectPolicyHandlers(lbc), nsi) + lbc.addAppProtectLogConfHandler(createAppProtectLogConfHandlers(lbc), nsi) + lbc.addAppProtectUserSigHandler(createAppProtectUserSigHandlers(lbc), nsi) + } + + if lbc.appProtectDosEnabled { + nsi.appProtectDosEnabled = true + lbc.addAppProtectDosPolicyHandler(createAppProtectDosPolicyHandlers(lbc), nsi) + lbc.addAppProtectDosLogConfHandler(createAppProtectDosLogConfHandlers(lbc), nsi) + lbc.addAppProtectDosProtectedResourceHandler(createAppProtectDosProtectedResourceHandlers(lbc), nsi) + } + } + + lbc.namespacedInformers[ns] = nsi +} + // addLeaderHandler adds the handler for leader election to the controller func (lbc *LoadBalancerController) addLeaderHandler(leaderHandler leaderelection.LeaderCallbacks) { var err error @@ -411,115 +432,95 @@ func (lbc *LoadBalancerController) AddSyncQueue(item interface{}) { } // addAppProtectPolicyHandler creates dynamic informers for custom appprotect policy resource -func (lbc *LoadBalancerController) addAppProtectPolicyHandler(handlers cache.ResourceEventHandlerFuncs) { - for _, dif := range lbc.dynInformerFactory { - informer := dif.ForResource(appprotect.PolicyGVR).Informer() - informer.AddEventHandler(handlers) - lbc.appProtectPolicyLister = append(lbc.appProtectPolicyLister, informer.GetStore()) +func (lbc *LoadBalancerController) addAppProtectPolicyHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { + informer := nsi.dynInformerFactory.ForResource(appprotect.PolicyGVR).Informer() + informer.AddEventHandler(handlers) + nsi.appProtectPolicyLister = informer.GetStore() - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) - } + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } // addAppProtectLogConfHandler creates dynamic informer for custom appprotect logging config resource -func (lbc *LoadBalancerController) addAppProtectLogConfHandler(handlers cache.ResourceEventHandlerFuncs) { - for _, dif := range lbc.dynInformerFactory { - informer := dif.ForResource(appprotect.LogConfGVR).Informer() - informer.AddEventHandler(handlers) - lbc.appProtectLogConfLister = append(lbc.appProtectLogConfLister, informer.GetStore()) +func (lbc *LoadBalancerController) addAppProtectLogConfHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { + informer := nsi.dynInformerFactory.ForResource(appprotect.LogConfGVR).Informer() + informer.AddEventHandler(handlers) + nsi.appProtectLogConfLister = informer.GetStore() - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) - } + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } // addAppProtectUserSigHandler creates dynamic informer for custom appprotect user defined signature resource -func (lbc *LoadBalancerController) addAppProtectUserSigHandler(handlers cache.ResourceEventHandlerFuncs) { - for _, dif := range lbc.dynInformerFactory { - informer := dif.ForResource(appprotect.UserSigGVR).Informer() - informer.AddEventHandler(handlers) - lbc.appProtectUserSigLister = append(lbc.appProtectUserSigLister, informer.GetStore()) +func (lbc *LoadBalancerController) addAppProtectUserSigHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { + informer := nsi.dynInformerFactory.ForResource(appprotect.UserSigGVR).Informer() + informer.AddEventHandler(handlers) + nsi.appProtectUserSigLister = informer.GetStore() - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) - } + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } // addAppProtectDosPolicyHandler creates dynamic informers for custom appprotectdos policy resource -func (lbc *LoadBalancerController) addAppProtectDosPolicyHandler(handlers cache.ResourceEventHandlerFuncs) { - for _, dif := range lbc.dynInformerFactory { - informer := dif.ForResource(appprotectdos.DosPolicyGVR).Informer() - informer.AddEventHandler(handlers) - lbc.appProtectDosPolicyLister = append(lbc.appProtectDosPolicyLister, informer.GetStore()) +func (lbc *LoadBalancerController) addAppProtectDosPolicyHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { + informer := nsi.dynInformerFactory.ForResource(appprotectdos.DosPolicyGVR).Informer() + informer.AddEventHandler(handlers) + nsi.appProtectDosPolicyLister = informer.GetStore() - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) - } + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } // addAppProtectDosLogConfHandler creates dynamic informer for custom appprotectdos logging config resource -func (lbc *LoadBalancerController) addAppProtectDosLogConfHandler(handlers cache.ResourceEventHandlerFuncs) { - for _, dif := range lbc.dynInformerFactory { - informer := dif.ForResource(appprotectdos.DosLogConfGVR).Informer() - informer.AddEventHandler(handlers) - lbc.appProtectDosLogConfLister = append(lbc.appProtectDosLogConfLister, informer.GetStore()) +func (lbc *LoadBalancerController) addAppProtectDosLogConfHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { + informer := nsi.dynInformerFactory.ForResource(appprotectdos.DosLogConfGVR).Informer() + informer.AddEventHandler(handlers) + nsi.appProtectDosLogConfLister = informer.GetStore() - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) - } + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } // addAppProtectDosLogConfHandler creates dynamic informers for custom appprotectdos logging config resource -func (lbc *LoadBalancerController) addAppProtectDosProtectedResourceHandler(handlers cache.ResourceEventHandlerFuncs) { - for _, cif := range lbc.confSharedInformerFactory { - informer := cif.Appprotectdos().V1beta1().DosProtectedResources().Informer() - informer.AddEventHandler(handlers) - lbc.appProtectDosProtectedLister = append(lbc.appProtectDosProtectedLister, informer.GetStore()) +func (lbc *LoadBalancerController) addAppProtectDosProtectedResourceHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { + informer := nsi.confSharedInformerFactory.Appprotectdos().V1beta1().DosProtectedResources().Informer() + informer.AddEventHandler(handlers) + nsi.appProtectDosProtectedLister = informer.GetStore() - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) - } + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } // addSecretHandler adds the handler for secrets to the controller -func (lbc *LoadBalancerController) addSecretHandler(handlers cache.ResourceEventHandlerFuncs) { - for _, sif := range lbc.secretInformerFactory { - informer := sif.Core().V1().Secrets().Informer() - informer.AddEventHandler(handlers) - lbc.secretLister = append(lbc.secretLister, informer.GetStore()) +func (lbc *LoadBalancerController) addSecretHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { + informer := nsi.secretInformerFactory.Core().V1().Secrets().Informer() + informer.AddEventHandler(handlers) + nsi.secretLister = informer.GetStore() - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) - } + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } // addServiceHandler adds the handler for services to the controller -func (lbc *LoadBalancerController) addServiceHandler(handlers cache.ResourceEventHandlerFuncs) { - for _, sif := range lbc.sharedInformerFactory { - informer := sif.Core().V1().Services().Informer() - informer.AddEventHandler(handlers) - lbc.svcLister = append(lbc.svcLister, informer.GetStore()) +func (lbc *LoadBalancerController) addServiceHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { + informer := nsi.sharedInformerFactory.Core().V1().Services().Informer() + informer.AddEventHandler(handlers) + nsi.svcLister = informer.GetStore() - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) - } + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } // addIngressHandler adds the handler for ingresses to the controller -func (lbc *LoadBalancerController) addIngressHandler(handlers cache.ResourceEventHandlerFuncs) { - for _, sif := range lbc.sharedInformerFactory { - informer := sif.Networking().V1().Ingresses().Informer() - informer.AddEventHandler(handlers) - lbc.ingressLister = append(lbc.ingressLister, storeToIngressLister{Store: informer.GetStore()}) +func (lbc *LoadBalancerController) addIngressHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { + informer := nsi.sharedInformerFactory.Networking().V1().Ingresses().Informer() + informer.AddEventHandler(handlers) + nsi.ingressLister = storeToIngressLister{Store: informer.GetStore()} - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) - } + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } // addEndpointHandler adds the handler for endpoints to the controller -func (lbc *LoadBalancerController) addEndpointHandler(handlers cache.ResourceEventHandlerFuncs) { - for _, sif := range lbc.sharedInformerFactory { - informer := sif.Core().V1().Endpoints().Informer() - informer.AddEventHandler(handlers) - var el storeToEndpointLister - el.Store = informer.GetStore() - lbc.endpointLister = append(lbc.endpointLister, el) +func (lbc *LoadBalancerController) addEndpointHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { + informer := nsi.sharedInformerFactory.Core().V1().Endpoints().Informer() + informer.AddEventHandler(handlers) + var el storeToEndpointLister + el.Store = informer.GetStore() + nsi.endpointLister = el - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) - } + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } // addConfigMapHandler adds the handler for config maps to the controller @@ -537,43 +538,35 @@ func (lbc *LoadBalancerController) addConfigMapHandler(handlers cache.ResourceEv lbc.cacheSyncs = append(lbc.cacheSyncs, lbc.configMapController.HasSynced) } -func (lbc *LoadBalancerController) addPodHandler() { - for _, sif := range lbc.sharedInformerFactory { - informer := sif.Core().V1().Pods().Informer() - lbc.podLister = append(lbc.podLister, indexerToPodLister{Indexer: informer.GetIndexer()}) +func (lbc *LoadBalancerController) addPodHandler(nsi *namespacedInformer) { + informer := nsi.sharedInformerFactory.Core().V1().Pods().Informer() + nsi.podLister = indexerToPodLister{Indexer: informer.GetIndexer()} - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) - } + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } -func (lbc *LoadBalancerController) addVirtualServerHandler(handlers cache.ResourceEventHandlerFuncs) { - for _, cif := range lbc.confSharedInformerFactory { - informer := cif.K8s().V1().VirtualServers().Informer() - informer.AddEventHandler(handlers) - lbc.virtualServerLister = append(lbc.virtualServerLister, informer.GetStore()) +func (lbc *LoadBalancerController) addVirtualServerHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { + informer := nsi.confSharedInformerFactory.K8s().V1().VirtualServers().Informer() + informer.AddEventHandler(handlers) + nsi.virtualServerLister = informer.GetStore() - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) - } + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } -func (lbc *LoadBalancerController) addVirtualServerRouteHandler(handlers cache.ResourceEventHandlerFuncs) { - for _, cif := range lbc.confSharedInformerFactory { - informer := cif.K8s().V1().VirtualServerRoutes().Informer() - informer.AddEventHandler(handlers) - lbc.virtualServerRouteLister = append(lbc.virtualServerRouteLister, informer.GetStore()) +func (lbc *LoadBalancerController) addVirtualServerRouteHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { + informer := nsi.confSharedInformerFactory.K8s().V1().VirtualServerRoutes().Informer() + informer.AddEventHandler(handlers) + nsi.virtualServerRouteLister = informer.GetStore() - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) - } + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } -func (lbc *LoadBalancerController) addPolicyHandler(handlers cache.ResourceEventHandlerFuncs) { - for _, cif := range lbc.confSharedInformerFactory { - informer := cif.K8s().V1().Policies().Informer() - informer.AddEventHandler(handlers) - lbc.policyLister = append(lbc.policyLister, informer.GetStore()) +func (lbc *LoadBalancerController) addPolicyHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { + informer := nsi.confSharedInformerFactory.K8s().V1().Policies().Informer() + informer.AddEventHandler(handlers) + nsi.policyLister = informer.GetStore() - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) - } + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } func (lbc *LoadBalancerController) addGlobalConfigurationHandler(handlers cache.ResourceEventHandlerFuncs, namespace string, name string) { @@ -590,14 +583,12 @@ func (lbc *LoadBalancerController) addGlobalConfigurationHandler(handlers cache. lbc.cacheSyncs = append(lbc.cacheSyncs, lbc.globalConfigurationController.HasSynced) } -func (lbc *LoadBalancerController) addTransportServerHandler(handlers cache.ResourceEventHandlerFuncs) { - for _, cif := range lbc.confSharedInformerFactory { - informer := cif.K8s().V1alpha1().TransportServers().Informer() - informer.AddEventHandler(handlers) - lbc.transportServerLister = append(lbc.transportServerLister, informer.GetStore()) +func (lbc *LoadBalancerController) addTransportServerHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { + informer := nsi.confSharedInformerFactory.K8s().V1alpha1().TransportServers().Informer() + informer.AddEventHandler(handlers) + nsi.transportServerLister = informer.GetStore() - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) - } + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } func (lbc *LoadBalancerController) addIngressLinkHandler(handlers cache.ResourceEventHandlerFuncs, name string) { @@ -636,33 +627,21 @@ func (lbc *LoadBalancerController) Run() { go lbc.leaderElector.Run(lbc.ctx) } - for _, sif := range lbc.sharedInformerFactory { - go sif.Start(lbc.ctx.Done()) - } - - for _, secif := range lbc.secretInformerFactory { - go secif.Start(lbc.ctx.Done()) + for _, nif := range lbc.namespacedInformers { + nif.stopCh = lbc.ctx.Done() + nif.start() } if lbc.watchNginxConfigMaps { go lbc.configMapController.Run(lbc.ctx.Done()) } - if lbc.areCustomResourcesEnabled { - for _, cif := range lbc.confSharedInformerFactory { - go cif.Start(lbc.ctx.Done()) - } - } + if lbc.watchGlobalConfiguration { go lbc.globalConfigurationController.Run(lbc.ctx.Done()) } if lbc.watchIngressLink { go lbc.ingressLinkInformer.Run(lbc.ctx.Done()) } - if lbc.appProtectEnabled || lbc.appProtectDosEnabled { - for _, dif := range lbc.dynInformerFactory { - go dif.Start(lbc.ctx.Done()) - } - } glog.V(3).Infof("Waiting for %d caches to sync", len(lbc.cacheSyncs)) @@ -678,13 +657,46 @@ func (lbc *LoadBalancerController) Run() { <-lbc.ctx.Done() } -// Stop shutdowns the load balancer controller +// Stop shutsdown the load balancer controller func (lbc *LoadBalancerController) Stop() { lbc.cancel() - lbc.syncQueue.Shutdown() } +func (nsi *namespacedInformer) start() { + go nsi.sharedInformerFactory.Start(nsi.stopCh) + + if nsi.isSecretsEnabledNamespace { + go nsi.secretInformerFactory.Start(nsi.stopCh) + } + + if nsi.areCustomResourcesEnabled { + go nsi.confSharedInformerFactory.Start(nsi.stopCh) + } + + if nsi.appProtectEnabled || nsi.appProtectDosEnabled { + go nsi.dynInformerFactory.Start(nsi.stopCh) + } +} + +func (lbc *LoadBalancerController) getNamespacedInformer(ns string) *namespacedInformer { + var nsi *namespacedInformer + var isGlobalNs bool + var exists bool + + nsi, isGlobalNs = lbc.namespacedInformers[""] + + if !isGlobalNs { + // get the correct namespaced informers + nsi, exists = lbc.namespacedInformers[ns] + if !exists { + // we are not watching this namespace + return nil + } + } + return nsi +} + func (lbc *LoadBalancerController) syncEndpoints(task task) { key := task.Key var obj interface{} @@ -692,12 +704,8 @@ func (lbc *LoadBalancerController) syncEndpoints(task task) { var err error glog.V(3).Infof("Syncing endpoints %v", key) - for _, el := range lbc.endpointLister { - obj, endpExists, err = el.GetByKey(key) - if endpExists { - break - } - } + ns, _, _ := cache.SplitMetaNamespaceKey(key) + obj, endpExists, err = lbc.getNamespacedInformer(ns).endpointLister.GetByKey(key) if err != nil { lbc.syncQueue.Requeue(task, err) @@ -857,8 +865,11 @@ func (lbc *LoadBalancerController) updateAllConfigs() { // As a result, the IC will generate configuration for that resource assuming that the Secret is missing and // it will report warnings. (See https://github.com/nginxinc/kubernetes-ingress/issues/1448 ) func (lbc *LoadBalancerController) preSyncSecrets() { - for _, sl := range lbc.secretLister { - objects := sl.List() + for _, ni := range lbc.namespacedInformers { + if !ni.isSecretsEnabledNamespace { + break + } + objects := ni.secretLister.List() glog.V(3).Infof("PreSync %d Secrets", len(objects)) for _, obj := range objects { @@ -1010,12 +1021,8 @@ func (lbc *LoadBalancerController) syncPolicy(task task) { var polExists bool var err error - for _, pl := range lbc.policyLister { - obj, polExists, err = pl.GetByKey(key) - if polExists { - break - } - } + ns, _, _ := cache.SplitMetaNamespaceKey(key) + obj, polExists, err = lbc.getNamespacedInformer(ns).policyLister.GetByKey(key) if err != nil { lbc.syncQueue.Requeue(task, err) @@ -1073,12 +1080,8 @@ func (lbc *LoadBalancerController) syncTransportServer(task task) { var tsExists bool var err error - for _, tl := range lbc.transportServerLister { - obj, tsExists, err = tl.GetByKey(key) - if tsExists { - break - } - } + ns, _, _ := cache.SplitMetaNamespaceKey(key) + obj, tsExists, err = lbc.getNamespacedInformer(ns).transportServerLister.GetByKey(key) if err != nil { lbc.syncQueue.Requeue(task, err) @@ -1156,12 +1159,8 @@ func (lbc *LoadBalancerController) syncVirtualServer(task task) { var vsExists bool var err error - for _, vl := range lbc.virtualServerLister { - obj, vsExists, err = vl.GetByKey(key) - if vsExists { - break - } - } + ns, _, _ := cache.SplitMetaNamespaceKey(key) + obj, vsExists, err = lbc.getNamespacedInformer(ns).virtualServerLister.GetByKey(key) if err != nil { lbc.syncQueue.Requeue(task, err) @@ -1269,12 +1268,8 @@ func (lbc *LoadBalancerController) processChanges(changes []ResourceChange) { var vsExists bool var err error - for _, vl := range lbc.virtualServerLister { - _, vsExists, err = vl.GetByKey(key) - if vsExists { - break - } - } + ns, _, _ := cache.SplitMetaNamespaceKey(key) + _, vsExists, err = lbc.getNamespacedInformer(ns).virtualServerLister.GetByKey(key) if err != nil { glog.Errorf("Error when getting VirtualServer for %v: %v", key, err) @@ -1296,12 +1291,8 @@ func (lbc *LoadBalancerController) processChanges(changes []ResourceChange) { var ingExists bool var err error - for _, il := range lbc.ingressLister { - _, ingExists, err = il.GetByKeySafe(key) - if ingExists { - break - } - } + ns, _, _ := cache.SplitMetaNamespaceKey(key) + _, ingExists, err = lbc.getNamespacedInformer(ns).ingressLister.GetByKeySafe(key) if err != nil { glog.Errorf("Error when getting Ingress for %v: %v", key, err) @@ -1322,12 +1313,8 @@ func (lbc *LoadBalancerController) processChanges(changes []ResourceChange) { var tsExists bool var err error - for _, tl := range lbc.transportServerLister { - _, tsExists, err = tl.GetByKey(key) - if tsExists { - break - } - } + ns, _, _ := cache.SplitMetaNamespaceKey(key) + _, tsExists, err = lbc.getNamespacedInformer(ns).transportServerLister.GetByKey(key) if err != nil { glog.Errorf("Error when getting TransportServer for %v: %v", key, err) @@ -1916,12 +1903,8 @@ func (lbc *LoadBalancerController) syncVirtualServerRoute(task task) { var exists bool var err error - for _, vrl := range lbc.virtualServerRouteLister { - obj, exists, err = vrl.GetByKey(key) - if exists { - break - } - } + ns, _, _ := cache.SplitMetaNamespaceKey(key) + obj, exists, err = lbc.getNamespacedInformer(ns).virtualServerRouteLister.GetByKey(key) if err != nil { lbc.syncQueue.Requeue(task, err) @@ -1952,12 +1935,9 @@ func (lbc *LoadBalancerController) syncIngress(task task) { var ingExists bool var err error - for _, il := range lbc.ingressLister { - ing, ingExists, err = il.GetByKeySafe(key) - if ingExists { - break - } - } + ns, _, _ := cache.SplitMetaNamespaceKey(key) + ing, ingExists, err = lbc.getNamespacedInformer(ns).ingressLister.GetByKeySafe(key) + if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -2010,12 +1990,8 @@ func (lbc *LoadBalancerController) syncService(task task) { var exists bool var err error - for _, sl := range lbc.svcLister { - obj, exists, err = sl.GetByKey(key) - if exists { - break - } - } + ns, _, _ := cache.SplitMetaNamespaceKey(key) + obj, exists, err = lbc.getNamespacedInformer(ns).svcLister.GetByKey(key) if err != nil { lbc.syncQueue.Requeue(task, err) @@ -2116,20 +2092,15 @@ func (lbc *LoadBalancerController) syncSecret(task task) { var secrExists bool var err error - for _, sl := range lbc.secretLister { - obj, secrExists, err = sl.GetByKey(key) - if secrExists { - break - } - } + namespace, name, err := ParseNamespaceName(key) if err != nil { - lbc.syncQueue.Requeue(task, err) + glog.Warningf("Secret key %v is invalid: %v", key, err) return } + obj, secrExists, err = lbc.getNamespacedInformer(namespace).secretLister.GetByKey(key) - namespace, name, err := ParseNamespaceName(key) if err != nil { - glog.Warningf("Secret key %v is invalid: %v", key, err) + lbc.syncQueue.Requeue(task, err) return } @@ -2261,8 +2232,8 @@ func getStatusFromEventTitle(eventTitle string) string { func (lbc *LoadBalancerController) updateVirtualServersStatusFromEvents() error { var allErrs []error - for _, vl := range lbc.virtualServerLister { - for _, obj := range vl.List() { + for _, nsi := range lbc.namespacedInformers { + for _, obj := range nsi.virtualServerLister.List() { vs := obj.(*conf_v1.VirtualServer) if !lbc.HasCorrectIngressClass(vs) { @@ -2305,8 +2276,8 @@ func (lbc *LoadBalancerController) updateVirtualServersStatusFromEvents() error func (lbc *LoadBalancerController) updateVirtualServerRoutesStatusFromEvents() error { var allErrs []error - for _, vsrl := range lbc.virtualServerRouteLister { - for _, obj := range vsrl.List() { + for _, nsi := range lbc.namespacedInformers { + for _, obj := range nsi.virtualServerRouteLister.List() { vsr := obj.(*conf_v1.VirtualServerRoute) if !lbc.HasCorrectIngressClass(vsr) { @@ -2349,8 +2320,8 @@ func (lbc *LoadBalancerController) updateVirtualServerRoutesStatusFromEvents() e func (lbc *LoadBalancerController) updatePoliciesStatus() error { var allErrs []error - for _, pl := range lbc.policyLister { - for _, obj := range pl.List() { + for _, nsi := range lbc.namespacedInformers { + for _, obj := range nsi.policyLister.List() { pol := obj.(*conf_v1.Policy) err := validation.ValidatePolicy(pol, lbc.isNginxPlus, lbc.enableOIDC, lbc.appProtectEnabled) @@ -2379,8 +2350,8 @@ func (lbc *LoadBalancerController) updatePoliciesStatus() error { func (lbc *LoadBalancerController) updateTransportServersStatusFromEvents() error { var allErrs []error - for _, tl := range lbc.transportServerLister { - for _, obj := range tl.List() { + for _, nsi := range lbc.namespacedInformers { + for _, obj := range nsi.transportServerLister.List() { ts := obj.(*conf_v1alpha1.TransportServer) events, err := lbc.client.CoreV1().Events(ts.Namespace).List(context.TODO(), @@ -2919,8 +2890,8 @@ func createPolicyMap(policies []*conf_v1.Policy) map[string]*conf_v1.Policy { func (lbc *LoadBalancerController) getAllPolicies() []*conf_v1.Policy { var policies []*conf_v1.Policy - for _, pl := range lbc.policyLister { - for _, obj := range pl.List() { + for _, nsi := range lbc.namespacedInformers { + for _, obj := range nsi.policyLister.List() { pol := obj.(*conf_v1.Policy) err := validation.ValidatePolicy(pol, lbc.isNginxPlus, lbc.enableOIDC, lbc.appProtectEnabled) @@ -2952,12 +2923,8 @@ func (lbc *LoadBalancerController) getPolicies(policies []conf_v1.PolicyReferenc var exists bool var err error - for _, pl := range lbc.policyLister { - policyObj, exists, err = pl.GetByKey(policyKey) - if exists { - break - } - } + policyObj, exists, err = lbc.getNamespacedInformer(polNamespace).policyLister.GetByKey(policyKey) + if err != nil { errors = append(errors, fmt.Errorf("failed to get policy %s: %w", policyKey, err)) continue @@ -3306,23 +3273,15 @@ func (lbc *LoadBalancerController) getEndpointsForSubselector(namespace string, func (lbc *LoadBalancerController) getEndpointsForServiceWithSubselector(targetPort int32, subselector map[string]string, svc *api_v1.Service) (endps []podEndpoint, err error) { var pods []*api_v1.Pod - for _, pl := range lbc.podLister { - pods, err = pl.ListByNamespace(svc.Namespace, labels.Merge(svc.Spec.Selector, subselector).AsSelector()) - if len(pods) > 0 { - break - } - } + nsi := lbc.getNamespacedInformer(svc.Namespace) + pods, err = nsi.podLister.ListByNamespace(svc.Namespace, labels.Merge(svc.Spec.Selector, subselector).AsSelector()) + if err != nil { return nil, fmt.Errorf("error getting pods in namespace %v that match the selector %v: %w", svc.Namespace, labels.Merge(svc.Spec.Selector, subselector), err) } var svcEps api_v1.Endpoints - for _, el := range lbc.endpointLister { - svcEps, err = el.GetServiceEndpoints(svc) - if err == nil { - break - } - } + svcEps, err = nsi.endpointLister.GetServiceEndpoints(svc) if err != nil { glog.V(3).Infof("Error getting endpoints for service %s from the cache: %v", svc.Name, err) return nil, err @@ -3382,12 +3341,9 @@ func (lbc *LoadBalancerController) getHealthChecksForIngressBackend(backend *net return nil } var pods []*api_v1.Pod - for _, pl := range lbc.podLister { - pods, err = pl.ListByNamespace(svc.Namespace, labels.Set(svc.Spec.Selector).AsSelector()) - if len(pods) > 0 { - break - } - } + nsi := lbc.getNamespacedInformer(svc.Namespace) + pods, err = nsi.podLister.ListByNamespace(svc.Namespace, labels.Set(svc.Spec.Selector).AsSelector()) + if err != nil { glog.V(3).Infof("Error fetching pods for namespace %v: %v", svc.Namespace, err) return nil @@ -3439,12 +3395,8 @@ func (lbc *LoadBalancerController) getExternalEndpointsForIngressBackend(backend func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *networking.IngressBackend, svc *api_v1.Service) (result []podEndpoint, isExternal bool, err error) { var endps api_v1.Endpoints - for _, el := range lbc.endpointLister { - endps, err = el.GetServiceEndpoints(svc) - if err == nil { - break - } - } + endps, err = lbc.getNamespacedInformer(svc.Namespace).endpointLister.GetServiceEndpoints(svc) + if err != nil { if svc.Spec.Type == api_v1.ServiceTypeExternalName { if !lbc.isNginxPlus { @@ -3512,12 +3464,9 @@ func (lbc *LoadBalancerController) getPodOwnerTypeAndNameFromAddress(ns, name st var obj interface{} var exists bool var err error - for _, pl := range lbc.podLister { - obj, exists, err = pl.GetByKey(fmt.Sprintf("%s/%s", ns, name)) - if exists { - break - } - } + + obj, exists, err = lbc.getNamespacedInformer(ns).podLister.GetByKey(fmt.Sprintf("%s/%s", ns, name)) + if err != nil { glog.Warningf("could not get pod by key %s/%s: %v", ns, name, err) return "", "" @@ -3565,12 +3514,8 @@ func (lbc *LoadBalancerController) getTargetPort(svcPort api_v1.ServicePort, svc var pods []*api_v1.Pod var err error - for _, pl := range lbc.podLister { - pods, err = pl.ListByNamespace(svc.Namespace, labels.Set(svc.Spec.Selector).AsSelector()) - if len(pods) > 0 { - break - } - } + pods, err = lbc.getNamespacedInformer(svc.Namespace).podLister.ListByNamespace(svc.Namespace, labels.Set(svc.Spec.Selector).AsSelector()) + if err != nil { return 0, fmt.Errorf("error getting pod information: %w", err) } @@ -3606,12 +3551,9 @@ func (lbc *LoadBalancerController) getServiceForIngressBackend(backend *networki var svcObj interface{} var svcExists bool var err error - for _, sl := range lbc.svcLister { - svcObj, svcExists, err = sl.GetByKey(svcKey) - if svcExists { - break - } - } + + svcObj, svcExists, err = lbc.getNamespacedInformer(namespace).svcLister.GetByKey(svcKey) + if err != nil { return nil, err } @@ -3684,12 +3626,10 @@ func (lbc *LoadBalancerController) syncAppProtectPolicy(task task) { var obj interface{} var polExists bool var err error - for _, apl := range lbc.appProtectPolicyLister { - obj, polExists, err = apl.GetByKey(key) - if polExists { - break - } - } + + ns, _, _ := cache.SplitMetaNamespaceKey(key) + obj, polExists, err = lbc.getNamespacedInformer(ns).appProtectPolicyLister.GetByKey(key) + if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -3718,12 +3658,10 @@ func (lbc *LoadBalancerController) syncAppProtectLogConf(task task) { var obj interface{} var confExists bool var err error - for _, apl := range lbc.appProtectLogConfLister { - obj, confExists, err = apl.GetByKey(key) - if confExists { - break - } - } + + ns, _, _ := cache.SplitMetaNamespaceKey(key) + obj, confExists, err = lbc.getNamespacedInformer(ns).appProtectLogConfLister.GetByKey(key) + if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -3752,12 +3690,10 @@ func (lbc *LoadBalancerController) syncAppProtectUserSig(task task) { var obj interface{} var sigExists bool var err error - for _, apl := range lbc.appProtectUserSigLister { - obj, sigExists, err = apl.GetByKey(key) - if sigExists { - break - } - } + + ns, _, _ := cache.SplitMetaNamespaceKey(key) + obj, sigExists, err = lbc.getNamespacedInformer(ns).appProtectUserSigLister.GetByKey(key) + if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -3786,12 +3722,10 @@ func (lbc *LoadBalancerController) syncAppProtectDosPolicy(task task) { var obj interface{} var polExists bool var err error - for _, apl := range lbc.appProtectDosPolicyLister { - obj, polExists, err = apl.GetByKey(key) - if polExists { - break - } - } + + ns, _, _ := cache.SplitMetaNamespaceKey(key) + obj, polExists, err = lbc.getNamespacedInformer(ns).appProtectDosPolicyLister.GetByKey(key) + if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -3818,12 +3752,10 @@ func (lbc *LoadBalancerController) syncAppProtectDosLogConf(task task) { var obj interface{} var confExists bool var err error - for _, apl := range lbc.appProtectDosLogConfLister { - obj, confExists, err = apl.GetByKey(key) - if confExists { - break - } - } + + ns, _, _ := cache.SplitMetaNamespaceKey(key) + obj, confExists, err = lbc.getNamespacedInformer(ns).appProtectDosLogConfLister.GetByKey(key) + if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -3850,12 +3782,10 @@ func (lbc *LoadBalancerController) syncDosProtectedResource(task task) { var obj interface{} var confExists bool var err error - for _, apl := range lbc.appProtectDosProtectedLister { - obj, confExists, err = apl.GetByKey(key) - if confExists { - break - } - } + + ns, _, _ := cache.SplitMetaNamespaceKey(key) + obj, confExists, err = lbc.getNamespacedInformer(ns).appProtectDosProtectedLister.GetByKey(key) + if err != nil { lbc.syncQueue.Requeue(task, err) return diff --git a/internal/k8s/controller_test.go b/internal/k8s/controller_test.go index 6034380aa1..37c9ae02c2 100644 --- a/internal/k8s/controller_test.go +++ b/internal/k8s/controller_test.go @@ -653,12 +653,12 @@ func TestGetPolicies(t *testing.T) { }, } - var pl []cache.Store - pl = append(pl, policyLister) + nsi := make(map[string]*namespacedInformer) + nsi[""] = &namespacedInformer{policyLister: policyLister} lbc := LoadBalancerController{ - isNginxPlus: true, - policyLister: pl, + isNginxPlus: true, + namespacedInformers: nsi, } policyRefs := []conf_v1.PolicyReference{ @@ -2297,13 +2297,13 @@ func TestPreSyncSecrets(t *testing.T) { } }, } - var sl []cache.Store - sl = append(sl, secretLister) + nsi := make(map[string]*namespacedInformer) + nsi[""] = &namespacedInformer{secretLister: secretLister, isSecretsEnabledNamespace: true} lbc := LoadBalancerController{ - isNginxPlus: true, - secretStore: secrets.NewEmptyFakeSecretsStore(), - secretLister: sl, + isNginxPlus: true, + secretStore: secrets.NewEmptyFakeSecretsStore(), + namespacedInformers: nsi, } lbc.preSyncSecrets() diff --git a/internal/k8s/status.go b/internal/k8s/status.go index 672c8aeeb8..78b9a67235 100644 --- a/internal/k8s/status.go +++ b/internal/k8s/status.go @@ -38,11 +38,7 @@ type statusUpdater struct { status []api_v1.LoadBalancerIngress statusInitialized bool keyFunc func(obj interface{}) (string, error) - ingressLister []storeToIngressLister - virtualServerLister []cache.Store - virtualServerRouteLister []cache.Store - transportServerLister []cache.Store - policyLister []cache.Store + namespacedInformers map[string]*namespacedInformer confClient k8s_nginx.Interface hasCorrectIngressClass func(interface{}) bool } @@ -111,6 +107,24 @@ func (su *statusUpdater) UpdateIngressStatus(ing networking.Ingress) error { return su.updateIngressWithStatus(ing, su.status) } +func (su *statusUpdater) getNamespacedInformer(ns string) *namespacedInformer { + var nsi *namespacedInformer + var isGlobalNs bool + var exists bool + + nsi, isGlobalNs = su.namespacedInformers[""] + + if !isGlobalNs { + // get the correct namespaced informers + nsi, exists = su.namespacedInformers[ns] + if !exists { + // we are not watching this namespace + return nil + } + } + return nsi +} + // updateIngressWithStatus sets the provided status on the selected Ingress. func (su *statusUpdater) updateIngressWithStatus(ing networking.Ingress, status []api_v1.LoadBalancerIngress) error { // Get an up-to-date Ingress from the Store @@ -120,14 +134,11 @@ func (su *statusUpdater) updateIngressWithStatus(ing networking.Ingress, status return err } + ns, _, _ := cache.SplitMetaNamespaceKey(key) var ingCopy *networking.Ingress var exists bool - for _, il := range su.ingressLister { - ingCopy, exists, err = il.GetByKeySafe(key) - if exists { - break - } - } + ingCopy, exists, err = su.getNamespacedInformer(ns).ingressLister.GetByKeySafe(key) + if err != nil { glog.V(3).Infof("error getting ing from Store by key: %v", err) return err @@ -415,12 +426,9 @@ func (su *statusUpdater) UpdateTransportServerStatus(ts *conf_v1alpha1.Transport var tsLatest interface{} var exists bool var err error - for _, tl := range su.transportServerLister { - tsLatest, exists, err = tl.Get(ts) - if exists { - break - } - } + + tsLatest, exists, err = su.getNamespacedInformer(ts.Namespace).transportServerLister.Get(ts) + if err != nil { glog.V(3).Infof("error getting TransportServer from Store: %v", err) return err @@ -466,12 +474,9 @@ func (su *statusUpdater) UpdateVirtualServerStatus(vs *conf_v1.VirtualServer, st var vsLatest interface{} var exists bool var err error - for _, vl := range su.virtualServerLister { - vsLatest, exists, err = vl.Get(vs) - if exists { - break - } - } + + vsLatest, exists, err = su.getNamespacedInformer(vs.Namespace).virtualServerLister.Get(vs) + if err != nil { glog.V(3).Infof("error getting VirtualServer from Store: %v", err) return err @@ -532,12 +537,9 @@ func (su *statusUpdater) UpdateVirtualServerRouteStatusWithReferencedBy(vsr *con var vsrLatest interface{} var exists bool var err error - for _, vl := range su.virtualServerRouteLister { - vsrLatest, exists, err = vl.Get(vsr) - if exists { - break - } - } + + vsrLatest, exists, err = su.getNamespacedInformer(vsr.Namespace).virtualServerRouteLister.Get(vsr) + if err != nil { glog.V(3).Infof("error getting VirtualServerRoute from Store: %v", err) return err @@ -575,12 +577,9 @@ func (su *statusUpdater) UpdateVirtualServerRouteStatus(vsr *conf_v1.VirtualServ var vsrLatest interface{} var exists bool var err error - for _, vl := range su.virtualServerRouteLister { - vsrLatest, exists, err = vl.Get(vsr) - if exists { - break - } - } + + vsrLatest, exists, err = su.getNamespacedInformer(vsr.Namespace).virtualServerRouteLister.Get(vsr) + if err != nil { glog.V(3).Infof("error getting VirtualServerRoute from Store: %v", err) return err @@ -614,12 +613,9 @@ func (su *statusUpdater) updateVirtualServerExternalEndpoints(vs *conf_v1.Virtua var vsLatest interface{} var exists bool var err error - for _, vl := range su.virtualServerLister { - vsLatest, exists, err = vl.Get(vs) - if exists { - break - } - } + + vsLatest, exists, err = su.getNamespacedInformer(vs.Namespace).virtualServerLister.Get(vs) + if err != nil { glog.V(3).Infof("error getting VirtualServer from Store: %v", err) return err @@ -645,12 +641,9 @@ func (su *statusUpdater) updateVirtualServerRouteExternalEndpoints(vsr *conf_v1. var vsrLatest interface{} var exists bool var err error - for _, vl := range su.virtualServerRouteLister { - vsrLatest, exists, err = vl.Get(vsr) - if exists { - break - } - } + + vsrLatest, exists, err = su.getNamespacedInformer(vsr.Namespace).virtualServerRouteLister.Get(vsr) + if err != nil { glog.V(3).Infof("error getting VirtualServerRoute from Store: %v", err) return err @@ -696,12 +689,9 @@ func (su *statusUpdater) UpdatePolicyStatus(pol *conf_v1.Policy, state string, r var polLatest interface{} var exists bool var err error - for _, vl := range su.policyLister { - polLatest, exists, err = vl.Get(pol) - if exists { - break - } - } + + polLatest, exists, err = su.getNamespacedInformer(pol.Namespace).policyLister.Get(pol) + if err != nil { glog.V(3).Infof("error getting policy from Store: %v", err) return err diff --git a/internal/k8s/status_test.go b/internal/k8s/status_test.go index 141597ed0c..43572f680d 100644 --- a/internal/k8s/status_test.go +++ b/internal/k8s/status_test.go @@ -44,11 +44,12 @@ func TestUpdateTransportServerStatus(t *testing.T) { if err != nil { t.Errorf("Error adding TransportServer to the transportserver lister: %v", err) } - tsl := []cache.Store{tsLister} + nsi := make(map[string]*namespacedInformer) + nsi["default"] = &namespacedInformer{transportServerLister: tsLister} su := statusUpdater{ - transportServerLister: tsl, - confClient: fakeClient, - keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc, + namespacedInformers: nsi, + confClient: fakeClient, + keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc, } err = su.UpdateTransportServerStatus(ts, "after status", "after reason", "after message") @@ -104,11 +105,12 @@ func TestUpdateTransportServerStatusIgnoreNoChange(t *testing.T) { if err != nil { t.Errorf("Error adding TransportServer to the transportserver lister: %v", err) } - tsl := []cache.Store{tsLister} + nsi := make(map[string]*namespacedInformer) + nsi["default"] = &namespacedInformer{transportServerLister: tsLister} su := statusUpdater{ - transportServerLister: tsl, - confClient: fakeClient, - keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc, + namespacedInformers: nsi, + confClient: fakeClient, + keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc, } err = su.UpdateTransportServerStatus(ts, "same status", "same reason", "same message") @@ -158,12 +160,13 @@ func TestUpdateTransportServerStatusMissingTransportServer(t *testing.T) { nil, ) - tsl := []cache.Store{tsLister} + nsi := make(map[string]*namespacedInformer) + nsi[""] = &namespacedInformer{transportServerLister: tsLister} su := statusUpdater{ - transportServerLister: tsl, - confClient: fakeClient, - keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc, + namespacedInformers: nsi, + confClient: fakeClient, + keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc, externalEndpoints: []conf_v1.ExternalEndpoint{ { IP: "123.123.123.123", @@ -214,14 +217,15 @@ func TestStatusUpdateWithExternalStatusAndExternalService(t *testing.T) { t.Errorf("Error adding Ingress to the ingress lister: %v", err) } - isl := []storeToIngressLister{ingLister} + nsi := make(map[string]*namespacedInformer) + nsi[""] = &namespacedInformer{ingressLister: ingLister} su := statusUpdater{ client: fakeClient, namespace: "namespace", externalServiceName: "service-name", externalStatusAddress: "123.123.123.123", - ingressLister: isl, + namespacedInformers: nsi, keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc, } err = su.ClearIngressStatus(ing) @@ -319,13 +323,14 @@ func TestStatusUpdateWithExternalStatusAndIngressLink(t *testing.T) { t.Errorf("Error adding Ingress to the ingress lister: %v", err) } - isl := []storeToIngressLister{ingLister} + nsi := make(map[string]*namespacedInformer) + nsi[""] = &namespacedInformer{ingressLister: ingLister} su := statusUpdater{ client: fakeClient, namespace: "namespace", externalStatusAddress: "", - ingressLister: isl, + namespacedInformers: nsi, keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc, }