diff --git a/cmd/nginx-ingress/flags.go b/cmd/nginx-ingress/flags.go index e3b0427aa5..eb9c56109e 100644 --- a/cmd/nginx-ingress/flags.go +++ b/cmd/nginx-ingress/flags.go @@ -36,7 +36,9 @@ var ( The Ingress Controller does not start NGINX and does not write any generated NGINX configuration files to disk`) watchNamespace = flag.String("watch-namespace", api_v1.NamespaceAll, - `Namespace to watch for Ingress resources. By default the Ingress Controller watches all namespaces`) + `Comma separated list of namespaces the Ingress Controller should watch for resources. By default the Ingress Controller watches all namespaces`) + + watchNamespaces []string nginxConfigMaps = flag.String("nginx-configmaps", "", `A ConfigMap resource for customizing NGINX configuration. If a ConfigMap is set, @@ -194,6 +196,8 @@ func parseFlags(versionInfo string, binaryInfo string) { glog.Infof("Starting NGINX Ingress Controller %v PlusFlag=%v", versionInfo, *nginxPlus) glog.Info(binaryInfo) + watchNamespaces = strings.Split(*watchNamespace, ",") + validationChecks() if *enableTLSPassthrough && !*enableCustomResources { @@ -302,6 +306,11 @@ func validationChecks() { glog.Fatalf("Invalid value for leader-election-lock-name: %v", statusLockNameValidationError) } + namespacesNameValidationError := validateNamespaceNames(watchNamespaces) + if namespacesNameValidationError != nil { + glog.Fatalf("Invalid values for namespaces: %v", namespacesNameValidationError) + } + statusPortValidationError := validatePort(*nginxStatusPort) if statusPortValidationError != nil { glog.Fatalf("Invalid value for nginx-status-port: %v", statusPortValidationError) @@ -331,11 +340,31 @@ func validationChecks() { } } +// validateNamespaceNames validates the namespaces are in the correct format +func validateNamespaceNames(namespaces []string) error { + var allErrs []error + + for _, ns := range namespaces { + if ns != "" { + ns = strings.TrimSpace(ns) + err := validateResourceName(ns) + if err != nil { + allErrs = append(allErrs, err) + fmt.Printf("error %v ", err) + } + } + } + if len(allErrs) > 0 { + return fmt.Errorf("errors validating namespaces: %v", allErrs) + } + return nil +} + // validateResourceName validates the name of a resource -func validateResourceName(lock string) error { - allErrs := validation.IsDNS1123Subdomain(lock) +func validateResourceName(name string) error { + allErrs := validation.IsDNS1123Subdomain(name) if len(allErrs) > 0 { - return fmt.Errorf("invalid resource name %v: %v", lock, allErrs) + return fmt.Errorf("invalid resource name %v: %v", name, allErrs) } return nil } diff --git a/cmd/nginx-ingress/main.go b/cmd/nginx-ingress/main.go index 87a4c785d6..4a393fe19d 100644 --- a/cmd/nginx-ingress/main.go +++ b/cmd/nginx-ingress/main.go @@ -47,6 +47,8 @@ func main() { validateIngressClass(kubeClient) + checkNamespaceExists(kubeClient) + dynClient, confClient := createCustomClients(config) constLabels := map[string]string{"class": *ingressClass} @@ -117,7 +119,7 @@ func main() { DynClient: dynClient, RestConfig: config, ResyncPeriod: 30 * time.Second, - Namespace: *watchNamespace, + Namespace: watchNamespaces, NginxConfigurator: cnf, DefaultServerSecret: *defaultServerSecret, AppProtectEnabled: *appProtect, @@ -230,6 +232,17 @@ func validateIngressClass(kubeClient kubernetes.Interface) { } } +func checkNamespaceExists(kubeClient kubernetes.Interface) { + for _, ns := range watchNamespaces { + if ns != "" { + _, err := kubeClient.CoreV1().Namespaces().Get(context.TODO(), ns, meta_v1.GetOptions{}) + if err != nil { + glog.Warningf("Error when getting Namespace %v: %v", ns, err) + } + } + } +} + func createCustomClients(config *rest.Config) (dynamic.Interface, k8s_nginx.Interface) { var dynClient dynamic.Interface var err error diff --git a/cmd/nginx-ingress/main_test.go b/cmd/nginx-ingress/main_test.go index 0b98cbed23..1715d2a3e8 100644 --- a/cmd/nginx-ingress/main_test.go +++ b/cmd/nginx-ingress/main_test.go @@ -3,6 +3,7 @@ package main import ( "errors" "reflect" + "strings" "testing" ) @@ -153,3 +154,21 @@ func TestValidateAppProtectLogLevel(t *testing.T) { } } } + +func TestValidateNamespaces(t *testing.T) { + badNamespaces := []string{"watchns1, watchns2, watchns%$", "watchns1,watchns2,watchns%$"} + for _, badNs := range badNamespaces { + err := validateNamespaceNames(strings.Split(badNs, ",")) + if err == nil { + t.Errorf("Expected error for invalid namespace %v\n", badNs) + } + } + + goodNamespaces := []string{"watched-namespace", "watched-namespace,", "watched-namespace1,watched-namespace2", "watched-namespace1, watched-namespace2"} + for _, goodNs := range goodNamespaces { + err := validateNamespaceNames(strings.Split(goodNs, ",")) + if err != nil { + t.Errorf("Error for valid namespace: %v err: %v\n", goodNs, err) + } + } +} diff --git a/deployments/helm-chart/README.md b/deployments/helm-chart/README.md index c5d82d0880..d05398b59d 100644 --- a/deployments/helm-chart/README.md +++ b/deployments/helm-chart/README.md @@ -181,7 +181,7 @@ Parameter | Description | Default `controller.replicaCount` | The number of replicas of the Ingress Controller deployment. | 1 `controller.ingressClass` | A class of the Ingress Controller. An IngressClass resource with the name equal to the class must be deployed. Otherwise, the Ingress Controller will fail to start. The Ingress Controller only processes resources that belong to its class - i.e. have the "ingressClassName" field resource equal to the class. The Ingress Controller processes all the VirtualServer/VirtualServerRoute/TransportServer resources that do not have the "ingressClassName" field for all versions of kubernetes. | nginx `controller.setAsDefaultIngress` | New Ingresses without an `"ingressClassName"` field specified will be assigned the class specified in `controller.ingressClass`. | false -`controller.watchNamespace` | Namespace to watch for Ingress resources. By default the Ingress Controller watches all namespaces. | "" +`controller.watchNamespace` | Comma separated list of namespaces the Ingress Controller should watch for resources. By default the Ingress Controller watches all namespaces. | "" `controller.enableCustomResources` | Enable the custom resources. | true `controller.enablePreviewPolicies` | Enable preview policies. This parameter is deprecated. To enable OIDC Policies please use `controller.enableOIDC` instead. | false `controller.enableOIDC` | Enable OIDC policies. | false diff --git a/deployments/helm-chart/values.yaml b/deployments/helm-chart/values.yaml index 827d15d5e0..f5735e41c9 100644 --- a/deployments/helm-chart/values.yaml +++ b/deployments/helm-chart/values.yaml @@ -166,7 +166,7 @@ controller: ## New Ingresses without an ingressClassName field specified will be assigned the class specified in `controller.ingressClass`. setAsDefaultIngress: false - ## Namespace to watch for Ingress resources. By default the Ingress Controller watches all namespaces. + ## Comma separated list of namespaces to watch for Ingress resources. By default the Ingress Controller watches all namespaces. watchNamespace: "" ## Enable the custom resources. diff --git a/deployments/rbac/rbac.yaml b/deployments/rbac/rbac.yaml index eae30edc0d..0501d9f04e 100644 --- a/deployments/rbac/rbac.yaml +++ b/deployments/rbac/rbac.yaml @@ -37,6 +37,14 @@ rules: verbs: - list - watch +- apiGroups: + - "" + resources: + - namespaces + verbs: + - get + - list + - watch - apiGroups: - "" resources: diff --git a/docs/content/configuration/global-configuration/command-line-arguments.md b/docs/content/configuration/global-configuration/command-line-arguments.md index 52b8631c6f..e313461bd8 100644 --- a/docs/content/configuration/global-configuration/command-line-arguments.md +++ b/docs/content/configuration/global-configuration/command-line-arguments.md @@ -302,7 +302,7 @@ A comma-separated list of pattern=N settings for file-filtered logging. ### -watch-namespace `` -Namespace to watch for Ingress resources. By default the Ingress Controller watches all namespaces. +Comma separated list of namespaces the Ingress Controller should watch for resources. By default the Ingress Controller watches all namespaces.   diff --git a/docs/content/installation/installation-with-helm.md b/docs/content/installation/installation-with-helm.md index 7c1d84e032..ef5802c39c 100644 --- a/docs/content/installation/installation-with-helm.md +++ b/docs/content/installation/installation-with-helm.md @@ -183,7 +183,7 @@ The following tables lists the configurable parameters of the NGINX Ingress Cont |``controller.replicaCount`` | The number of replicas of the Ingress Controller deployment. | 1 | |``controller.ingressClass`` | A class of the Ingress Controller. An IngressClass resource with the name equal to the class must be deployed. Otherwise, the Ingress Controller will fail to start. The Ingress Controller only processes resources that belong to its class - i.e. have the "ingressClassName" field resource equal to the class. The Ingress Controller processes all the VirtualServer/VirtualServerRoute/TransportServer resources that do not have the "ingressClassName" field for all versions of kubernetes. | nginx | |``controller.setAsDefaultIngress`` | New Ingresses without an ingressClassName field specified will be assigned the class specified in `controller.ingressClass`. | false | -|``controller.watchNamespace`` | Namespace to watch for Ingress resources. By default the Ingress Controller watches all namespaces. | "" | +|``controller.watchNamespace`` | Comma separated list of namespaces the Ingress Controller should watch for resources. By default the Ingress Controller watches all namespaces. | "" | |``controller.enableCustomResources`` | Enable the custom resources. | true | |``controller.enablePreviewPolicies`` | Enable preview policies. This parameter is deprecated. To enable OIDC Policies please use ``controller.enableOIDC`` instead. | false | |``controller.enableOIDC`` | Enable OIDC policies. | false | diff --git a/docs/content/installation/running-multiple-ingress-controllers.md b/docs/content/installation/running-multiple-ingress-controllers.md index d1425e2371..4cf93d5427 100644 --- a/docs/content/installation/running-multiple-ingress-controllers.md +++ b/docs/content/installation/running-multiple-ingress-controllers.md @@ -44,7 +44,7 @@ To make sure that NGINX Ingress Controller handles particular configuration reso When running NGINX Ingress Controller, you have the following options with regards to which configuration resources it handles: * **Cluster-wide Ingress Controller (default)**. The Ingress Controller handles configuration resources created in any namespace of the cluster. As NGINX is a high-performance load balancer capable of serving many applications at the same time, this option is used by default in our installation manifests and Helm chart. -* **Single-namespace Ingress Controller**. You can configure the Ingress Controller to handle configuration resources only from a particular namespace, which is controlled through the `-watch-namespace` command-line argument. This can be useful if you want to use different NGINX Ingress Controllers for different applications, both in terms of isolation and/or operation. +* **Defined-namespace Ingress Controller**. You can configure the Ingress Controller to handle configuration resources only from particular namespaces, which is controlled through the `-watch-namespace` command-line argument. This can be useful if you want to use different NGINX Ingress Controllers for different applications, both in terms of isolation and/or operation. * **Ingress Controller for Specific Ingress Class**. This option works in conjunction with either of the options above. You can further customize which configuration resources are handled by the Ingress Controller by configuring the class of the Ingress Controller and using that class in your configuration resources. See the section [Configuring Ingress Class](#configuring-ingress-class). Considering the options above, you can run multiple NGINX Ingress Controllers, each handling a different set of configuration resources. diff --git a/internal/certmanager/cm_controller.go b/internal/certmanager/cm_controller.go index 51ae86a186..5138389452 100644 --- a/internal/certmanager/cm_controller.go +++ b/internal/certmanager/cm_controller.go @@ -23,6 +23,7 @@ import ( cmapi "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1" cm_clientset "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned" cm_informers "github.com/cert-manager/cert-manager/pkg/client/informers/externalversions" + cmlisters "github.com/cert-manager/cert-manager/pkg/client/listers/certmanager/v1" controllerpkg "github.com/cert-manager/cert-manager/pkg/controller" "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -34,6 +35,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + conf_v1 "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1" k8s_nginx "github.com/nginxinc/kubernetes-ingress/pkg/client/clientset/versioned" vsinformers "github.com/nginxinc/kubernetes-ingress/pkg/client/informers/externalversions" listers_v1 "github.com/nginxinc/kubernetes-ingress/pkg/client/listers/configuration/v1" @@ -54,14 +56,14 @@ const ( // and creates/ updates certificates for VS resources as required, // and VS resources when certificate objects are created/ updated type CmController struct { - vsLister listers_v1.VirtualServerLister + vsLister []listers_v1.VirtualServerLister sync SyncFn ctx context.Context mustSync []cache.InformerSynced queue workqueue.RateLimitingInterface - vsSharedInformerFactory vsinformers.SharedInformerFactory - cmSharedInformerFactory cm_informers.SharedInformerFactory - kubeSharedInformerFactory kubeinformers.SharedInformerFactory + vsSharedInformerFactory []vsinformers.SharedInformerFactory + cmSharedInformerFactory []cm_informers.SharedInformerFactory + kubeSharedInformerFactory []kubeinformers.SharedInformerFactory recorder record.EventRecorder cmClient *cm_clientset.Clientset } @@ -71,27 +73,31 @@ type CmOpts struct { context context.Context kubeConfig *rest.Config kubeClient kubernetes.Interface - namespace string + namespace []string eventRecorder record.EventRecorder vsClient k8s_nginx.Interface } func (c *CmController) register() workqueue.RateLimitingInterface { - c.vsLister = c.vsSharedInformerFactory.K8s().V1().VirtualServers().Lister() - c.vsSharedInformerFactory.K8s().V1().VirtualServers().Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{ - Queue: c.queue, - }) + var cmLister []cmlisters.CertificateLister + for _, sif := range c.vsSharedInformerFactory { + c.vsLister = append(c.vsLister, sif.K8s().V1().VirtualServers().Lister()) + sif.K8s().V1().VirtualServers().Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{ + Queue: c.queue, + }) + c.mustSync = append(c.mustSync, sif.K8s().V1().VirtualServers().Informer().HasSynced) + } - c.sync = SyncFnFor(c.recorder, c.cmClient, c.cmSharedInformerFactory.Certmanager().V1().Certificates().Lister()) + for _, cif := range c.cmSharedInformerFactory { + cif.Certmanager().V1().Certificates().Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{ + WorkFunc: certificateHandler(c.queue), + }) + cmLister = append(cmLister, cif.Certmanager().V1().Certificates().Lister()) + c.mustSync = append(c.mustSync, cif.Certmanager().V1().Certificates().Informer().HasSynced) + } - c.cmSharedInformerFactory.Certmanager().V1().Certificates().Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{ - WorkFunc: certificateHandler(c.queue), - }) + c.sync = SyncFnFor(c.recorder, c.cmClient, cmLister) - c.mustSync = []cache.InformerSynced{ - c.vsSharedInformerFactory.K8s().V1().VirtualServers().Informer().HasSynced, - c.cmSharedInformerFactory.Certmanager().V1().Certificates().Informer().HasSynced, - } return c.queue } @@ -103,7 +109,13 @@ func (c *CmController) processItem(ctx context.Context, key string) error { return err } - vs, err := c.vsLister.VirtualServers(namespace).Get(name) + var vs *conf_v1.VirtualServer + for _, vl := range c.vsLister { + vs, err = vl.VirtualServers(namespace).Get(name) + if err == nil { + break + } + } if err != nil { return err } @@ -156,9 +168,15 @@ func NewCmController(opts *CmOpts) *CmController { // Create a cert-manager api client intcl, _ := cm_clientset.NewForConfig(opts.kubeConfig) - cmSharedInformerFactory := cm_informers.NewSharedInformerFactoryWithOptions(intcl, resyncPeriod, cm_informers.WithNamespace(opts.namespace)) - kubeSharedInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(opts.kubeClient, resyncPeriod, kubeinformers.WithNamespace(opts.namespace)) - vsSharedInformerFactory := vsinformers.NewSharedInformerFactoryWithOptions(opts.vsClient, resyncPeriod, vsinformers.WithNamespace(opts.namespace)) + var vsSharedInformerFactory []vsinformers.SharedInformerFactory + var cmSharedInformerFactory []cm_informers.SharedInformerFactory + var kubeSharedInformerFactory []kubeinformers.SharedInformerFactory + + for _, ns := range opts.namespace { + cmSharedInformerFactory = append(cmSharedInformerFactory, cm_informers.NewSharedInformerFactoryWithOptions(intcl, resyncPeriod, cm_informers.WithNamespace(ns))) + kubeSharedInformerFactory = append(kubeSharedInformerFactory, kubeinformers.NewSharedInformerFactoryWithOptions(opts.kubeClient, resyncPeriod, kubeinformers.WithNamespace(ns))) + vsSharedInformerFactory = append(vsSharedInformerFactory, vsinformers.NewSharedInformerFactoryWithOptions(opts.vsClient, resyncPeriod, vsinformers.WithNamespace(ns))) + } cm := &CmController{ ctx: opts.context, @@ -183,9 +201,15 @@ func (c *CmController) Run(stopCh <-chan struct{}) { glog.Infof("Starting cert-manager control loop") - go c.vsSharedInformerFactory.Start(c.ctx.Done()) - go c.cmSharedInformerFactory.Start(c.ctx.Done()) - go c.kubeSharedInformerFactory.Start(c.ctx.Done()) + for _, vif := range c.vsSharedInformerFactory { + go vif.Start(c.ctx.Done()) + } + for _, cif := range c.cmSharedInformerFactory { + go cif.Start(c.ctx.Done()) + } + for _, kif := range c.kubeSharedInformerFactory { + go kif.Start(c.ctx.Done()) + } // // wait for all the informer caches we depend on are synced glog.V(3).Infof("Waiting for %d caches to sync", len(c.mustSync)) if !cache.WaitForNamedCacheSync(ControllerName, stopCh, c.mustSync...) { @@ -234,7 +258,7 @@ func (c *CmController) runWorker(ctx context.Context) { } // BuildOpts builds a CmOpts from the given parameters -func BuildOpts(ctx context.Context, kc *rest.Config, cl kubernetes.Interface, ns string, er record.EventRecorder, vsc k8s_nginx.Interface) *CmOpts { +func BuildOpts(ctx context.Context, kc *rest.Config, cl kubernetes.Interface, ns []string, er record.EventRecorder, vsc k8s_nginx.Interface) *CmOpts { return &CmOpts{ context: ctx, kubeClient: cl, diff --git a/internal/certmanager/cm_controller_test.go b/internal/certmanager/cm_controller_test.go index ba9c9a9786..b7b0da7980 100644 --- a/internal/certmanager/cm_controller_test.go +++ b/internal/certmanager/cm_controller_test.go @@ -23,16 +23,19 @@ import ( cmapi "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1" cmclient "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned" + cm_informers "github.com/cert-manager/cert-manager/pkg/client/informers/externalversions" controllerpkg "github.com/cert-manager/cert-manager/pkg/controller" testpkg "github.com/nginxinc/kubernetes-ingress/internal/certmanager/test_files" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/util/workqueue" vsapi "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1" k8s_nginx "github.com/nginxinc/kubernetes-ingress/pkg/client/clientset/versioned" + vsinformers "github.com/nginxinc/kubernetes-ingress/pkg/client/informers/externalversions" ) func Test_controller_Register(t *testing.T) { @@ -138,10 +141,10 @@ func Test_controller_Register(t *testing.T) { cm := &CmController{ ctx: b.RootContext, queue: workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName), - cmSharedInformerFactory: b.FakeCMInformerFactory(), - kubeSharedInformerFactory: b.FakeKubeInformerFactory(), + cmSharedInformerFactory: []cm_informers.SharedInformerFactory{b.FakeCMInformerFactory()}, + kubeSharedInformerFactory: []kubeinformers.SharedInformerFactory{b.FakeKubeInformerFactory()}, recorder: b.Recorder, - vsSharedInformerFactory: b.VsSharedInformerFactory, + vsSharedInformerFactory: []vsinformers.SharedInformerFactory{b.VsSharedInformerFactory}, } queue := cm.register() diff --git a/internal/certmanager/sync.go b/internal/certmanager/sync.go index 786dbd5217..89aee7ec65 100644 --- a/internal/certmanager/sync.go +++ b/internal/certmanager/sync.go @@ -60,7 +60,7 @@ type SyncFn func(context.Context, *vsapi.VirtualServer) error func SyncFnFor( rec record.EventRecorder, cmClient clientset.Interface, - cmLister cmlisters.CertificateLister, + cmLister []cmlisters.CertificateLister, ) SyncFn { return func(ctx context.Context, vs *vsapi.VirtualServer) error { var err error @@ -104,8 +104,14 @@ func SyncFnFor( } rec.Eventf(vs, corev1.EventTypeNormal, reasonUpdateCertificate, "Successfully updated Certificate %q", crt.Name) } + var certs []*cmapi.Certificate - certs, err := cmLister.Certificates(vs.GetNamespace()).List(labels.Everything()) + for _, cl := range cmLister { + certs, err = cl.Certificates(vs.GetNamespace()).List(labels.Everything()) + if len(certs) > 0 { + break + } + } if err != nil { return err } @@ -125,22 +131,27 @@ func SyncFnFor( } func buildCertificates( - cmLister cmlisters.CertificateLister, + cmLister []cmlisters.CertificateLister, vs *vsapi.VirtualServer, issuerName, issuerKind, issuerGroup string, ) (newCert, update []*cmapi.Certificate, _ error) { var newCrts []*cmapi.Certificate var updateCrts []*cmapi.Certificate + var existingCrt *cmapi.Certificate + var err error - var hosts []string - hosts = append(hosts, vs.Spec.Host) - - existingCrt, err := cmLister.Certificates(vs.Namespace).Get(vs.Spec.TLS.Secret) + for _, cl := range cmLister { + existingCrt, err = cl.Certificates(vs.Namespace).Get(vs.Spec.TLS.Secret) + if err == nil { + break + } + } if !apierrors.IsNotFound(err) && err != nil { return nil, nil, err } var controllerGVK schema.GroupVersionKind = vsGVK + hosts := []string{vs.Spec.Host} crt := &cmapi.Certificate{ ObjectMeta: metav1.ObjectMeta{ diff --git a/internal/certmanager/sync_test.go b/internal/certmanager/sync_test.go index 5502d198e8..453f7b1975 100644 --- a/internal/certmanager/sync_test.go +++ b/internal/certmanager/sync_test.go @@ -23,6 +23,7 @@ import ( cmapi "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1" cmmeta "github.com/cert-manager/cert-manager/pkg/apis/meta/v1" + cmlisters "github.com/cert-manager/cert-manager/pkg/client/listers/certmanager/v1" "github.com/cert-manager/cert-manager/test/unit/gen" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -490,7 +491,7 @@ func TestSync(t *testing.T) { } b.Init() defer b.Stop() - sync := SyncFnFor(b.Recorder, b.CMClient, b.SharedInformerFactory.Certmanager().V1().Certificates().Lister()) + sync := SyncFnFor(b.Recorder, b.CMClient, []cmlisters.CertificateLister{b.SharedInformerFactory.Certmanager().V1().Certificates().Lister()}) b.Start() err := sync(context.Background(), &test.VirtualServer) diff --git a/internal/externaldns/controller.go b/internal/externaldns/controller.go index 99d3dc7cf5..8522e03a64 100644 --- a/internal/externaldns/controller.go +++ b/internal/externaldns/controller.go @@ -6,6 +6,7 @@ import ( "time" "github.com/golang/glog" + conf_v1 "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1" extdns_v1 "github.com/nginxinc/kubernetes-ingress/pkg/apis/externaldns/v1" k8s_nginx "github.com/nginxinc/kubernetes-ingress/pkg/client/clientset/versioned" listersV1 "github.com/nginxinc/kubernetes-ingress/pkg/client/listers/configuration/v1" @@ -27,21 +28,21 @@ const ( // ExtDNSController represents ExternalDNS controller. type ExtDNSController struct { - vsLister listersV1.VirtualServerLister + vsLister []listersV1.VirtualServerLister sync SyncFn ctx context.Context mustSync []cache.InformerSynced queue workqueue.RateLimitingInterface - sharedInformerFactory k8s_nginx_informers.SharedInformerFactory + sharedInformerFactory []k8s_nginx_informers.SharedInformerFactory recorder record.EventRecorder client k8s_nginx.Interface - extdnslister extdnslisters.DNSEndpointLister + extdnslister []extdnslisters.DNSEndpointLister } // ExtDNSOpts represents config required for building the External DNS Controller. type ExtDNSOpts struct { context context.Context - namespace string + namespace []string eventRecorder record.EventRecorder client k8s_nginx.Interface resyncPeriod time.Duration @@ -49,7 +50,11 @@ type ExtDNSOpts struct { // NewController takes external dns config and return a new External DNS Controller. func NewController(opts *ExtDNSOpts) *ExtDNSController { - sharedInformerFactory := k8s_nginx_informers.NewSharedInformerFactoryWithOptions(opts.client, opts.resyncPeriod, k8s_nginx_informers.WithNamespace(opts.namespace)) + var sharedInformerFactory []k8s_nginx_informers.SharedInformerFactory + for _, ns := range opts.namespace { + sif := k8s_nginx_informers.NewSharedInformerFactoryWithOptions(opts.client, opts.resyncPeriod, k8s_nginx_informers.WithNamespace(ns)) + sharedInformerFactory = append(sharedInformerFactory, sif) + } c := &ExtDNSController{ ctx: opts.context, @@ -63,25 +68,26 @@ func NewController(opts *ExtDNSOpts) *ExtDNSController { } func (c *ExtDNSController) register() workqueue.Interface { - c.vsLister = c.sharedInformerFactory.K8s().V1().VirtualServers().Lister() - c.extdnslister = c.sharedInformerFactory.Externaldns().V1().DNSEndpoints().Lister() - - c.sharedInformerFactory.K8s().V1().VirtualServers().Informer().AddEventHandler( - &QueuingEventHandler{ - Queue: c.queue, - }, - ) - - c.sharedInformerFactory.Externaldns().V1().DNSEndpoints().Informer().AddEventHandler(&BlockingEventHandler{ - WorkFunc: externalDNSHandler(c.queue), - }) - - c.sync = SyncFnFor(c.recorder, c.client, c.extdnslister) - - c.mustSync = []cache.InformerSynced{ - c.sharedInformerFactory.K8s().V1().VirtualServers().Informer().HasSynced, - c.sharedInformerFactory.Externaldns().V1().DNSEndpoints().Informer().HasSynced, + for _, sif := range c.sharedInformerFactory { + c.vsLister = append(c.vsLister, sif.K8s().V1().VirtualServers().Lister()) + c.extdnslister = append(c.extdnslister, sif.Externaldns().V1().DNSEndpoints().Lister()) + + sif.K8s().V1().VirtualServers().Informer().AddEventHandler( + &QueuingEventHandler{ + Queue: c.queue, + }, + ) + + sif.Externaldns().V1().DNSEndpoints().Informer().AddEventHandler(&BlockingEventHandler{ + WorkFunc: externalDNSHandler(c.queue), + }) + + c.mustSync = append(c.mustSync, + sif.K8s().V1().VirtualServers().Informer().HasSynced, + sif.Externaldns().V1().DNSEndpoints().Informer().HasSynced, + ) } + c.sync = SyncFnFor(c.recorder, c.client, c.extdnslister) return c.queue } @@ -95,7 +101,9 @@ func (c *ExtDNSController) Run(stopCh <-chan struct{}) { glog.Infof("Starting external-dns control loop") - go c.sharedInformerFactory.Start(c.ctx.Done()) + for _, sif := range c.sharedInformerFactory { + go sif.Start(c.ctx.Done()) + } // wait for all informer caches to be synced glog.V(3).Infof("Waiting for %d caches to sync", len(c.mustSync)) @@ -145,7 +153,13 @@ func (c *ExtDNSController) processItem(ctx context.Context, key string) error { runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return err } - vs, err := c.vsLister.VirtualServers(namespace).Get(name) + var vs *conf_v1.VirtualServer + for _, vl := range c.vsLister { + vs, err = vl.VirtualServers(namespace).Get(name) + if err == nil { + break + } + } if err != nil { return err } @@ -182,7 +196,7 @@ func externalDNSHandler(queue workqueue.RateLimitingInterface) func(obj interfac // BuildOpts builds the externalDNS controller options func BuildOpts( ctx context.Context, - namespace string, + namespace []string, recorder record.EventRecorder, k8sNginxClient k8s_nginx.Interface, resync time.Duration, diff --git a/internal/externaldns/sync.go b/internal/externaldns/sync.go index cd131ce3ad..16295d0109 100644 --- a/internal/externaldns/sync.go +++ b/internal/externaldns/sync.go @@ -36,7 +36,7 @@ var vsGVK = vsapi.SchemeGroupVersion.WithKind("VirtualServer") type SyncFn func(context.Context, *vsapi.VirtualServer) error // SyncFnFor knows how to reconcile VirtualServer DNSEndpoint object. -func SyncFnFor(rec record.EventRecorder, client clientset.Interface, extdnsLister extdnslisters.DNSEndpointLister) SyncFn { +func SyncFnFor(rec record.EventRecorder, client clientset.Interface, extdnsLister []extdnslisters.DNSEndpointLister) SyncFn { return func(ctx context.Context, vs *vsapi.VirtualServer) error { // Do nothing if ExternalDNS is not present (nil) in VS or is not enabled. if !vs.Spec.ExternalDNS.Enable { @@ -136,10 +136,17 @@ func getValidTargets(endpoints []vsapi.ExternalEndpoint) (extdnsapi.Targets, str return targets, recordType, err } -func buildDNSEndpoint(extdnsLister extdnslisters.DNSEndpointLister, vs *vsapi.VirtualServer, targets extdnsapi.Targets, recordType string) (*extdnsapi.DNSEndpoint, *extdnsapi.DNSEndpoint, error) { +func buildDNSEndpoint(extdnsLister []extdnslisters.DNSEndpointLister, vs *vsapi.VirtualServer, targets extdnsapi.Targets, recordType string) (*extdnsapi.DNSEndpoint, *extdnsapi.DNSEndpoint, error) { var updateDNSEndpoint *extdnsapi.DNSEndpoint var newDNSEndpoint *extdnsapi.DNSEndpoint - existingDNSEndpoint, err := extdnsLister.DNSEndpoints(vs.Namespace).Get(vs.Spec.Host) + var existingDNSEndpoint *extdnsapi.DNSEndpoint + var err error + for _, el := range extdnsLister { + existingDNSEndpoint, err = el.DNSEndpoints(vs.Namespace).Get(vs.Spec.Host) + if err == nil { + break + } + } if !apierrors.IsNotFound(err) && err != nil { return nil, nil, err } diff --git a/internal/externaldns/sync_test.go b/internal/externaldns/sync_test.go index 45390e9132..58ca613a24 100644 --- a/internal/externaldns/sync_test.go +++ b/internal/externaldns/sync_test.go @@ -276,7 +276,7 @@ func TestSync_ReturnsErrorOnFailure(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { rec := EventRecorder{} - eplister := DNSEPLister{} + eplister := []extdnsclient.DNSEndpointLister{DNSEPLister{}} fn := SyncFnFor(rec, nil, eplister) err := fn(context.TODO(), tc.input) if err == nil { diff --git a/internal/k8s/controller.go b/internal/k8s/controller.go index 1e7648d54c..f46cb523c5 100644 --- a/internal/k8s/controller.go +++ b/internal/k8s/controller.go @@ -102,29 +102,29 @@ type LoadBalancerController struct { dynClient dynamic.Interface restConfig *rest.Config cacheSyncs []cache.InformerSynced - sharedInformerFactory informers.SharedInformerFactory - confSharedInformerFactory k8s_nginx_informers.SharedInformerFactory + sharedInformerFactory []informers.SharedInformerFactory + confSharedInformerFactory []k8s_nginx_informers.SharedInformerFactory configMapController cache.Controller - dynInformerFactory dynamicinformer.DynamicSharedInformerFactory + dynInformerFactory []dynamicinformer.DynamicSharedInformerFactory globalConfigurationController cache.Controller ingressLinkInformer cache.SharedIndexInformer - ingressLister storeToIngressLister - svcLister cache.Store - endpointLister storeToEndpointLister + ingressLister []storeToIngressLister + svcLister []cache.Store + endpointLister []storeToEndpointLister configMapLister storeToConfigMapLister - podLister indexerToPodLister - secretLister cache.Store - virtualServerLister cache.Store - virtualServerRouteLister cache.Store - appProtectPolicyLister cache.Store - appProtectLogConfLister cache.Store - appProtectDosPolicyLister cache.Store - appProtectDosLogConfLister cache.Store - appProtectDosProtectedLister cache.Store + podLister []indexerToPodLister + secretLister []cache.Store + virtualServerLister []cache.Store + virtualServerRouteLister []cache.Store + appProtectPolicyLister []cache.Store + appProtectLogConfLister []cache.Store + appProtectDosPolicyLister []cache.Store + appProtectDosLogConfLister []cache.Store + appProtectDosProtectedLister []cache.Store globalConfigurationLister cache.Store - appProtectUserSigLister cache.Store - transportServerLister cache.Store - policyLister cache.Store + appProtectUserSigLister []cache.Store + transportServerLister []cache.Store + policyLister []cache.Store ingressLinkLister cache.Store syncQueue *taskQueue ctx context.Context @@ -145,7 +145,7 @@ type LoadBalancerController struct { isLeaderElectionEnabled bool leaderElectionLockName string resync time.Duration - namespace string + namespaceList []string controllerNamespace string wildcardTLSSecret string areCustomResourcesEnabled bool @@ -177,7 +177,7 @@ type NewLoadBalancerControllerInput struct { DynClient dynamic.Interface RestConfig *rest.Config ResyncPeriod time.Duration - Namespace string + Namespace []string NginxConfigurator *configs.Configurator DefaultServerSecret string AppProtectEnabled bool @@ -226,7 +226,7 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc isLeaderElectionEnabled: input.IsLeaderElectionEnabled, leaderElectionLockName: input.LeaderElectionLockName, resync: input.ResyncPeriod, - namespace: input.Namespace, + namespaceList: input.Namespace, controllerNamespace: input.ControllerNamespace, wildcardTLSSecret: input.WildcardTLSSecret, areCustomResourcesEnabled: input.AreCustomResourcesEnabled, @@ -257,16 +257,18 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc } if input.CertManagerEnabled { - lbc.certManagerController = cm_controller.NewCmController(cm_controller.BuildOpts(context.TODO(), lbc.restConfig, lbc.client, lbc.namespace, lbc.recorder, lbc.confClient)) + lbc.certManagerController = cm_controller.NewCmController(cm_controller.BuildOpts(context.TODO(), lbc.restConfig, lbc.client, lbc.namespaceList, lbc.recorder, lbc.confClient)) } if input.ExternalDNSEnabled { - lbc.externalDNSController = ed_controller.NewController(ed_controller.BuildOpts(context.TODO(), lbc.namespace, lbc.recorder, lbc.confClient, input.ResyncPeriod)) + lbc.externalDNSController = ed_controller.NewController(ed_controller.BuildOpts(context.TODO(), lbc.namespaceList, lbc.recorder, lbc.confClient, input.ResyncPeriod)) } glog.V(3).Infof("Nginx Ingress Controller has class: %v", input.IngressClass) - lbc.sharedInformerFactory = informers.NewSharedInformerFactoryWithOptions(lbc.client, input.ResyncPeriod, informers.WithNamespace(lbc.namespace)) + for _, ns := range lbc.namespaceList { + lbc.sharedInformerFactory = append(lbc.sharedInformerFactory, informers.NewSharedInformerFactoryWithOptions(lbc.client, input.ResyncPeriod, informers.WithNamespace(ns))) + } // create handlers for resources we care about lbc.addSecretHandler(createSecretHandlers(lbc)) @@ -276,7 +278,9 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc lbc.addPodHandler() if lbc.areCustomResourcesEnabled { - lbc.confSharedInformerFactory = k8s_nginx_informers.NewSharedInformerFactoryWithOptions(lbc.confClient, input.ResyncPeriod, k8s_nginx_informers.WithNamespace(lbc.namespace)) + for _, ns := range lbc.namespaceList { + lbc.confSharedInformerFactory = append(lbc.confSharedInformerFactory, k8s_nginx_informers.NewSharedInformerFactoryWithOptions(lbc.confClient, input.ResyncPeriod, k8s_nginx_informers.WithNamespace(ns))) + } lbc.addVirtualServerHandler(createVirtualServerHandlers(lbc)) lbc.addVirtualServerRouteHandler(createVirtualServerRouteHandlers(lbc)) @@ -291,8 +295,9 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc } if lbc.appProtectEnabled || lbc.appProtectDosEnabled { - lbc.dynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(lbc.dynClient, 0, lbc.namespace, nil) - + for _, ns := range lbc.namespaceList { + lbc.dynInformerFactory = append(lbc.dynInformerFactory, dynamicinformer.NewFilteredDynamicSharedInformerFactory(lbc.dynClient, 0, ns, nil)) + } if lbc.appProtectEnabled { lbc.addAppProtectPolicyHandler(createAppProtectPolicyHandlers(lbc)) lbc.addAppProtectLogConfHandler(createAppProtectLogConfHandlers(lbc)) @@ -329,7 +334,7 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc client: input.KubeClient, namespace: input.ControllerNamespace, externalServiceName: input.ExternalServiceName, - ingressLister: &lbc.ingressLister, + ingressLister: lbc.ingressLister, virtualServerLister: lbc.virtualServerLister, virtualServerRouteLister: lbc.virtualServerRouteLister, transportServerLister: lbc.transportServerLister, @@ -377,92 +382,114 @@ func (lbc *LoadBalancerController) AddSyncQueue(item interface{}) { // addAppProtectPolicyHandler creates dynamic informers for custom appprotect policy resource func (lbc *LoadBalancerController) addAppProtectPolicyHandler(handlers cache.ResourceEventHandlerFuncs) { - informer := lbc.dynInformerFactory.ForResource(appprotect.PolicyGVR).Informer() - informer.AddEventHandler(handlers) - lbc.appProtectPolicyLister = informer.GetStore() + for _, dif := range lbc.dynInformerFactory { + informer := dif.ForResource(appprotect.PolicyGVR).Informer() + informer.AddEventHandler(handlers) + lbc.appProtectPolicyLister = append(lbc.appProtectPolicyLister, informer.GetStore()) - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + } } // addAppProtectLogConfHandler creates dynamic informer for custom appprotect logging config resource func (lbc *LoadBalancerController) addAppProtectLogConfHandler(handlers cache.ResourceEventHandlerFuncs) { - informer := lbc.dynInformerFactory.ForResource(appprotect.LogConfGVR).Informer() - informer.AddEventHandler(handlers) - lbc.appProtectLogConfLister = informer.GetStore() + for _, dif := range lbc.dynInformerFactory { + informer := dif.ForResource(appprotect.LogConfGVR).Informer() + informer.AddEventHandler(handlers) + lbc.appProtectLogConfLister = append(lbc.appProtectLogConfLister, informer.GetStore()) - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + } } // addAppProtectUserSigHandler creates dynamic informer for custom appprotect user defined signature resource func (lbc *LoadBalancerController) addAppProtectUserSigHandler(handlers cache.ResourceEventHandlerFuncs) { - informer := lbc.dynInformerFactory.ForResource(appprotect.UserSigGVR).Informer() - informer.AddEventHandler(handlers) - lbc.appProtectUserSigLister = informer.GetStore() + for _, dif := range lbc.dynInformerFactory { + informer := dif.ForResource(appprotect.UserSigGVR).Informer() + informer.AddEventHandler(handlers) + lbc.appProtectUserSigLister = append(lbc.appProtectUserSigLister, informer.GetStore()) - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + } } // addAppProtectDosPolicyHandler creates dynamic informers for custom appprotectdos policy resource func (lbc *LoadBalancerController) addAppProtectDosPolicyHandler(handlers cache.ResourceEventHandlerFuncs) { - informer := lbc.dynInformerFactory.ForResource(appprotectdos.DosPolicyGVR).Informer() - informer.AddEventHandler(handlers) - lbc.appProtectDosPolicyLister = informer.GetStore() + for _, dif := range lbc.dynInformerFactory { + informer := dif.ForResource(appprotectdos.DosPolicyGVR).Informer() + informer.AddEventHandler(handlers) + lbc.appProtectDosPolicyLister = append(lbc.appProtectDosPolicyLister, informer.GetStore()) - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + } } // addAppProtectDosLogConfHandler creates dynamic informer for custom appprotectdos logging config resource func (lbc *LoadBalancerController) addAppProtectDosLogConfHandler(handlers cache.ResourceEventHandlerFuncs) { - informer := lbc.dynInformerFactory.ForResource(appprotectdos.DosLogConfGVR).Informer() - informer.AddEventHandler(handlers) - lbc.appProtectDosLogConfLister = informer.GetStore() + for _, dif := range lbc.dynInformerFactory { + informer := dif.ForResource(appprotectdos.DosLogConfGVR).Informer() + informer.AddEventHandler(handlers) + lbc.appProtectDosLogConfLister = append(lbc.appProtectDosLogConfLister, informer.GetStore()) - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + } } -// addAppProtectDosLogConfHandler creates dynamic informer for custom appprotectdos logging config resource +// addAppProtectDosLogConfHandler creates dynamic informers for custom appprotectdos logging config resource func (lbc *LoadBalancerController) addAppProtectDosProtectedResourceHandler(handlers cache.ResourceEventHandlerFuncs) { - informer := lbc.confSharedInformerFactory.Appprotectdos().V1beta1().DosProtectedResources().Informer() - informer.AddEventHandler(handlers) - lbc.appProtectDosProtectedLister = informer.GetStore() + for _, cif := range lbc.confSharedInformerFactory { + informer := cif.Appprotectdos().V1beta1().DosProtectedResources().Informer() + informer.AddEventHandler(handlers) + lbc.appProtectDosProtectedLister = append(lbc.appProtectDosProtectedLister, informer.GetStore()) - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + } } // addSecretHandler adds the handler for secrets to the controller func (lbc *LoadBalancerController) addSecretHandler(handlers cache.ResourceEventHandlerFuncs) { - informer := lbc.sharedInformerFactory.Core().V1().Secrets().Informer() - informer.AddEventHandler(handlers) - lbc.secretLister = informer.GetStore() + for _, sif := range lbc.sharedInformerFactory { + informer := sif.Core().V1().Secrets().Informer() + informer.AddEventHandler(handlers) + lbc.secretLister = append(lbc.secretLister, informer.GetStore()) - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + } } // addServiceHandler adds the handler for services to the controller func (lbc *LoadBalancerController) addServiceHandler(handlers cache.ResourceEventHandlerFuncs) { - informer := lbc.sharedInformerFactory.Core().V1().Services().Informer() - informer.AddEventHandler(handlers) - lbc.svcLister = informer.GetStore() + for _, sif := range lbc.sharedInformerFactory { + informer := sif.Core().V1().Services().Informer() + informer.AddEventHandler(handlers) + lbc.svcLister = append(lbc.svcLister, informer.GetStore()) - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + } } // addIngressHandler adds the handler for ingresses to the controller func (lbc *LoadBalancerController) addIngressHandler(handlers cache.ResourceEventHandlerFuncs) { - informer := lbc.sharedInformerFactory.Networking().V1().Ingresses().Informer() - informer.AddEventHandler(handlers) - lbc.ingressLister.Store = informer.GetStore() + for _, sif := range lbc.sharedInformerFactory { + informer := sif.Networking().V1().Ingresses().Informer() + informer.AddEventHandler(handlers) + lbc.ingressLister = append(lbc.ingressLister, storeToIngressLister{Store: informer.GetStore()}) - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + } } // addEndpointHandler adds the handler for endpoints to the controller func (lbc *LoadBalancerController) addEndpointHandler(handlers cache.ResourceEventHandlerFuncs) { - informer := lbc.sharedInformerFactory.Core().V1().Endpoints().Informer() - informer.AddEventHandler(handlers) - lbc.endpointLister.Store = informer.GetStore() + for _, sif := range lbc.sharedInformerFactory { + informer := sif.Core().V1().Endpoints().Informer() + informer.AddEventHandler(handlers) + var el storeToEndpointLister + el.Store = informer.GetStore() + lbc.endpointLister = append(lbc.endpointLister, el) - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + } } // addConfigMapHandler adds the handler for config maps to the controller @@ -481,34 +508,42 @@ func (lbc *LoadBalancerController) addConfigMapHandler(handlers cache.ResourceEv } func (lbc *LoadBalancerController) addPodHandler() { - informer := lbc.sharedInformerFactory.Core().V1().Pods().Informer() - lbc.podLister.Indexer = informer.GetIndexer() + for _, sif := range lbc.sharedInformerFactory { + informer := sif.Core().V1().Pods().Informer() + lbc.podLister = append(lbc.podLister, indexerToPodLister{Indexer: informer.GetIndexer()}) - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + } } func (lbc *LoadBalancerController) addVirtualServerHandler(handlers cache.ResourceEventHandlerFuncs) { - informer := lbc.confSharedInformerFactory.K8s().V1().VirtualServers().Informer() - informer.AddEventHandler(handlers) - lbc.virtualServerLister = informer.GetStore() + for _, cif := range lbc.confSharedInformerFactory { + informer := cif.K8s().V1().VirtualServers().Informer() + informer.AddEventHandler(handlers) + lbc.virtualServerLister = append(lbc.virtualServerLister, informer.GetStore()) - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + } } func (lbc *LoadBalancerController) addVirtualServerRouteHandler(handlers cache.ResourceEventHandlerFuncs) { - informer := lbc.confSharedInformerFactory.K8s().V1().VirtualServerRoutes().Informer() - informer.AddEventHandler(handlers) - lbc.virtualServerRouteLister = informer.GetStore() + for _, cif := range lbc.confSharedInformerFactory { + informer := cif.K8s().V1().VirtualServerRoutes().Informer() + informer.AddEventHandler(handlers) + lbc.virtualServerRouteLister = append(lbc.virtualServerRouteLister, informer.GetStore()) - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + } } func (lbc *LoadBalancerController) addPolicyHandler(handlers cache.ResourceEventHandlerFuncs) { - informer := lbc.confSharedInformerFactory.K8s().V1().Policies().Informer() - informer.AddEventHandler(handlers) - lbc.policyLister = informer.GetStore() + for _, cif := range lbc.confSharedInformerFactory { + informer := cif.K8s().V1().Policies().Informer() + informer.AddEventHandler(handlers) + lbc.policyLister = append(lbc.policyLister, informer.GetStore()) - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + } } func (lbc *LoadBalancerController) addGlobalConfigurationHandler(handlers cache.ResourceEventHandlerFuncs, namespace string, name string) { @@ -526,11 +561,13 @@ func (lbc *LoadBalancerController) addGlobalConfigurationHandler(handlers cache. } func (lbc *LoadBalancerController) addTransportServerHandler(handlers cache.ResourceEventHandlerFuncs) { - informer := lbc.confSharedInformerFactory.K8s().V1alpha1().TransportServers().Informer() - informer.AddEventHandler(handlers) - lbc.transportServerLister = informer.GetStore() + for _, cif := range lbc.confSharedInformerFactory { + informer := cif.K8s().V1alpha1().TransportServers().Informer() + informer.AddEventHandler(handlers) + lbc.transportServerLister = append(lbc.transportServerLister, informer.GetStore()) - lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) + } } func (lbc *LoadBalancerController) addIngressLinkHandler(handlers cache.ResourceEventHandlerFuncs, name string) { @@ -569,12 +606,16 @@ func (lbc *LoadBalancerController) Run() { go lbc.leaderElector.Run(lbc.ctx) } - go lbc.sharedInformerFactory.Start(lbc.ctx.Done()) + for _, sif := range lbc.sharedInformerFactory { + go sif.Start(lbc.ctx.Done()) + } if lbc.watchNginxConfigMaps { go lbc.configMapController.Run(lbc.ctx.Done()) } if lbc.areCustomResourcesEnabled { - go lbc.confSharedInformerFactory.Start(lbc.ctx.Done()) + for _, cif := range lbc.confSharedInformerFactory { + go cif.Start(lbc.ctx.Done()) + } } if lbc.watchGlobalConfiguration { go lbc.globalConfigurationController.Run(lbc.ctx.Done()) @@ -583,7 +624,9 @@ func (lbc *LoadBalancerController) Run() { go lbc.ingressLinkInformer.Run(lbc.ctx.Done()) } if lbc.appProtectEnabled || lbc.appProtectDosEnabled { - go lbc.dynInformerFactory.Start(lbc.ctx.Done()) + for _, dif := range lbc.dynInformerFactory { + go dif.Start(lbc.ctx.Done()) + } } glog.V(3).Infof("Waiting for %d caches to sync", len(lbc.cacheSyncs)) @@ -609,9 +652,18 @@ func (lbc *LoadBalancerController) Stop() { func (lbc *LoadBalancerController) syncEndpoints(task task) { key := task.Key + var obj interface{} + var endpExists bool + var err error glog.V(3).Infof("Syncing endpoints %v", key) - obj, endpExists, err := lbc.endpointLister.GetByKey(key) + for _, el := range lbc.endpointLister { + obj, endpExists, err = el.GetByKey(key) + if endpExists { + break + } + } + if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -765,19 +817,21 @@ func (lbc *LoadBalancerController) updateAllConfigs() { // As a result, the IC will generate configuration for that resource assuming that the Secret is missing and // it will report warnings. (See https://github.com/nginxinc/kubernetes-ingress/issues/1448 ) func (lbc *LoadBalancerController) preSyncSecrets() { - objects := lbc.secretLister.List() - glog.V(3).Infof("PreSync %d Secrets", len(objects)) + for _, sl := range lbc.secretLister { + objects := sl.List() + glog.V(3).Infof("PreSync %d Secrets", len(objects)) - for _, obj := range objects { - secret := obj.(*api_v1.Secret) + for _, obj := range objects { + secret := obj.(*api_v1.Secret) - if !secrets.IsSupportedSecretType(secret.Type) { - glog.V(3).Infof("Ignoring Secret %s/%s of unsupported type %s", secret.Namespace, secret.Name, secret.Type) - continue - } + if !secrets.IsSupportedSecretType(secret.Type) { + glog.V(3).Infof("Ignoring Secret %s/%s of unsupported type %s", secret.Namespace, secret.Name, secret.Type) + continue + } - glog.V(3).Infof("Adding Secret: %s/%s", secret.Namespace, secret.Name) - lbc.secretStore.AddOrUpdateSecret(secret) + glog.V(3).Infof("Adding Secret: %s/%s", secret.Namespace, secret.Name) + lbc.secretStore.AddOrUpdateSecret(secret) + } } } @@ -898,7 +952,17 @@ func (lbc *LoadBalancerController) syncIngressLink(task task) { func (lbc *LoadBalancerController) syncPolicy(task task) { key := task.Key - obj, polExists, err := lbc.policyLister.GetByKey(key) + var obj interface{} + var polExists bool + var err error + + for _, pl := range lbc.policyLister { + obj, polExists, err = pl.GetByKey(key) + if polExists { + break + } + } + if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -951,7 +1015,17 @@ func (lbc *LoadBalancerController) syncPolicy(task task) { func (lbc *LoadBalancerController) syncTransportServer(task task) { key := task.Key - obj, tsExists, err := lbc.transportServerLister.GetByKey(key) + var obj interface{} + var tsExists bool + var err error + + for _, tl := range lbc.transportServerLister { + obj, tsExists, err = tl.GetByKey(key) + if tsExists { + break + } + } + if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -1024,7 +1098,17 @@ func (lbc *LoadBalancerController) syncGlobalConfiguration(task task) { func (lbc *LoadBalancerController) syncVirtualServer(task task) { key := task.Key - obj, vsExists, err := lbc.virtualServerLister.GetByKey(key) + var obj interface{} + var vsExists bool + var err error + + for _, vl := range lbc.virtualServerLister { + obj, vsExists, err = vl.GetByKey(key) + if vsExists { + break + } + } + if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -1128,7 +1212,16 @@ func (lbc *LoadBalancerController) processChanges(changes []ResourceChange) { glog.Errorf("Error when deleting configuration for VirtualServer %v: %v", key, deleteErr) } - _, vsExists, err := lbc.virtualServerLister.GetByKey(key) + var vsExists bool + var err error + + for _, vl := range lbc.virtualServerLister { + _, vsExists, err = vl.GetByKey(key) + if vsExists { + break + } + } + if err != nil { glog.Errorf("Error when getting VirtualServer for %v: %v", key, err) } @@ -1146,7 +1239,16 @@ func (lbc *LoadBalancerController) processChanges(changes []ResourceChange) { glog.Errorf("Error when deleting configuration for Ingress %v: %v", key, deleteErr) } - _, ingExists, err := lbc.ingressLister.GetByKeySafe(key) + var ingExists bool + var err error + + for _, il := range lbc.ingressLister { + _, ingExists, err = il.GetByKeySafe(key) + if ingExists { + break + } + } + if err != nil { glog.Errorf("Error when getting Ingress for %v: %v", key, err) } @@ -1163,7 +1265,16 @@ func (lbc *LoadBalancerController) processChanges(changes []ResourceChange) { glog.Errorf("Error when deleting configuration for TransportServer %v: %v", key, deleteErr) } - _, tsExists, err := lbc.transportServerLister.GetByKey(key) + var tsExists bool + var err error + + for _, tl := range lbc.transportServerLister { + _, tsExists, err = tl.GetByKey(key) + if tsExists { + break + } + } + if err != nil { glog.Errorf("Error when getting TransportServer for %v: %v", key, err) } @@ -1727,7 +1838,17 @@ func (lbc *LoadBalancerController) updateVirtualServerStatusAndEvents(vsConfig * func (lbc *LoadBalancerController) syncVirtualServerRoute(task task) { key := task.Key - obj, exists, err := lbc.virtualServerRouteLister.GetByKey(key) + var obj interface{} + var exists bool + var err error + + for _, vrl := range lbc.virtualServerRouteLister { + obj, exists, err = vrl.GetByKey(key) + if exists { + break + } + } + if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -1753,7 +1874,16 @@ func (lbc *LoadBalancerController) syncVirtualServerRoute(task task) { func (lbc *LoadBalancerController) syncIngress(task task) { key := task.Key - ing, ingExists, err := lbc.ingressLister.GetByKeySafe(key) + var ing *networking.Ingress + var ingExists bool + var err error + + for _, il := range lbc.ingressLister { + ing, ingExists, err = il.GetByKeySafe(key) + if ingExists { + break + } + } if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -1802,7 +1932,17 @@ func (lbc *LoadBalancerController) syncService(task task) { key := task.Key glog.V(3).Infof("Syncing service %v", key) - obj, exists, err := lbc.svcLister.GetByKey(key) + var obj interface{} + var exists bool + var err error + + for _, sl := range lbc.svcLister { + obj, exists, err = sl.GetByKey(key) + if exists { + break + } + } + if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -1898,7 +2038,16 @@ func (lbc *LoadBalancerController) reportCustomResourceStatusEnabled() bool { func (lbc *LoadBalancerController) syncSecret(task task) { key := task.Key - obj, secrExists, err := lbc.secretLister.GetByKey(key) + var obj interface{} + var secrExists bool + var err error + + for _, sl := range lbc.secretLister { + obj, secrExists, err = sl.GetByKey(key) + if secrExists { + break + } + } if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -2038,36 +2187,38 @@ func getStatusFromEventTitle(eventTitle string) string { func (lbc *LoadBalancerController) updateVirtualServersStatusFromEvents() error { var allErrs []error - for _, obj := range lbc.virtualServerLister.List() { - vs := obj.(*conf_v1.VirtualServer) + for _, vl := range lbc.virtualServerLister { + for _, obj := range vl.List() { + vs := obj.(*conf_v1.VirtualServer) - if !lbc.HasCorrectIngressClass(vs) { - glog.V(3).Infof("Ignoring VirtualServer %v based on class %v", vs.Name, vs.Spec.IngressClass) - continue - } + if !lbc.HasCorrectIngressClass(vs) { + glog.V(3).Infof("Ignoring VirtualServer %v based on class %v", vs.Name, vs.Spec.IngressClass) + continue + } - events, err := lbc.client.CoreV1().Events(vs.Namespace).List(context.TODO(), - meta_v1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%v,involvedObject.uid=%v", vs.Name, vs.UID)}) - if err != nil { - allErrs = append(allErrs, fmt.Errorf("error trying to get events for VirtualServer %v/%v: %w", vs.Namespace, vs.Name, err)) - break - } + events, err := lbc.client.CoreV1().Events(vs.Namespace).List(context.TODO(), + meta_v1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%v,involvedObject.uid=%v", vs.Name, vs.UID)}) + if err != nil { + allErrs = append(allErrs, fmt.Errorf("error trying to get events for VirtualServer %v/%v: %w", vs.Namespace, vs.Name, err)) + break + } - if len(events.Items) == 0 { - continue - } + if len(events.Items) == 0 { + continue + } - var timestamp time.Time - var latestEvent api_v1.Event - for _, event := range events.Items { - if event.CreationTimestamp.After(timestamp) { - latestEvent = event + var timestamp time.Time + var latestEvent api_v1.Event + for _, event := range events.Items { + if event.CreationTimestamp.After(timestamp) { + latestEvent = event + } } - } - err = lbc.statusUpdater.UpdateVirtualServerStatus(vs, getStatusFromEventTitle(latestEvent.Reason), latestEvent.Reason, latestEvent.Message) - if err != nil { - allErrs = append(allErrs, err) + err = lbc.statusUpdater.UpdateVirtualServerStatus(vs, getStatusFromEventTitle(latestEvent.Reason), latestEvent.Reason, latestEvent.Message) + if err != nil { + allErrs = append(allErrs, err) + } } } @@ -2080,36 +2231,38 @@ func (lbc *LoadBalancerController) updateVirtualServersStatusFromEvents() error func (lbc *LoadBalancerController) updateVirtualServerRoutesStatusFromEvents() error { var allErrs []error - for _, obj := range lbc.virtualServerRouteLister.List() { - vsr := obj.(*conf_v1.VirtualServerRoute) + for _, vsrl := range lbc.virtualServerRouteLister { + for _, obj := range vsrl.List() { + vsr := obj.(*conf_v1.VirtualServerRoute) - if !lbc.HasCorrectIngressClass(vsr) { - glog.V(3).Infof("Ignoring VirtualServerRoute %v based on class %v", vsr.Name, vsr.Spec.IngressClass) - continue - } + if !lbc.HasCorrectIngressClass(vsr) { + glog.V(3).Infof("Ignoring VirtualServerRoute %v based on class %v", vsr.Name, vsr.Spec.IngressClass) + continue + } - events, err := lbc.client.CoreV1().Events(vsr.Namespace).List(context.TODO(), - meta_v1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%v,involvedObject.uid=%v", vsr.Name, vsr.UID)}) - if err != nil { - allErrs = append(allErrs, fmt.Errorf("error trying to get events for VirtualServerRoute %v/%v: %w", vsr.Namespace, vsr.Name, err)) - break - } + events, err := lbc.client.CoreV1().Events(vsr.Namespace).List(context.TODO(), + meta_v1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%v,involvedObject.uid=%v", vsr.Name, vsr.UID)}) + if err != nil { + allErrs = append(allErrs, fmt.Errorf("error trying to get events for VirtualServerRoute %v/%v: %w", vsr.Namespace, vsr.Name, err)) + break + } - if len(events.Items) == 0 { - continue - } + if len(events.Items) == 0 { + continue + } - var timestamp time.Time - var latestEvent api_v1.Event - for _, event := range events.Items { - if event.CreationTimestamp.After(timestamp) { - latestEvent = event + var timestamp time.Time + var latestEvent api_v1.Event + for _, event := range events.Items { + if event.CreationTimestamp.After(timestamp) { + latestEvent = event + } } - } - err = lbc.statusUpdater.UpdateVirtualServerRouteStatus(vsr, getStatusFromEventTitle(latestEvent.Reason), latestEvent.Reason, latestEvent.Message) - if err != nil { - allErrs = append(allErrs, err) + err = lbc.statusUpdater.UpdateVirtualServerRouteStatus(vsr, getStatusFromEventTitle(latestEvent.Reason), latestEvent.Reason, latestEvent.Message) + if err != nil { + allErrs = append(allErrs, err) + } } } @@ -2122,21 +2275,23 @@ func (lbc *LoadBalancerController) updateVirtualServerRoutesStatusFromEvents() e func (lbc *LoadBalancerController) updatePoliciesStatus() error { var allErrs []error - for _, obj := range lbc.policyLister.List() { - pol := obj.(*conf_v1.Policy) + for _, pl := range lbc.policyLister { + for _, obj := range pl.List() { + pol := obj.(*conf_v1.Policy) - err := validation.ValidatePolicy(pol, lbc.isNginxPlus, lbc.enableOIDC, lbc.appProtectEnabled) - if err != nil { - msg := fmt.Sprintf("Policy %v/%v is invalid and was rejected: %v", pol.Namespace, pol.Name, err) - err = lbc.statusUpdater.UpdatePolicyStatus(pol, conf_v1.StateInvalid, "Rejected", msg) - if err != nil { - allErrs = append(allErrs, err) - } - } else { - msg := fmt.Sprintf("Policy %v/%v was added or updated", pol.Namespace, pol.Name) - err = lbc.statusUpdater.UpdatePolicyStatus(pol, conf_v1.StateValid, "AddedOrUpdated", msg) + err := validation.ValidatePolicy(pol, lbc.isNginxPlus, lbc.enableOIDC, lbc.appProtectEnabled) if err != nil { - allErrs = append(allErrs, err) + msg := fmt.Sprintf("Policy %v/%v is invalid and was rejected: %v", pol.Namespace, pol.Name, err) + err = lbc.statusUpdater.UpdatePolicyStatus(pol, conf_v1.StateInvalid, "Rejected", msg) + if err != nil { + allErrs = append(allErrs, err) + } + } else { + msg := fmt.Sprintf("Policy %v/%v was added or updated", pol.Namespace, pol.Name) + err = lbc.statusUpdater.UpdatePolicyStatus(pol, conf_v1.StateValid, "AddedOrUpdated", msg) + if err != nil { + allErrs = append(allErrs, err) + } } } } @@ -2150,31 +2305,33 @@ func (lbc *LoadBalancerController) updatePoliciesStatus() error { func (lbc *LoadBalancerController) updateTransportServersStatusFromEvents() error { var allErrs []error - for _, obj := range lbc.transportServerLister.List() { - ts := obj.(*conf_v1alpha1.TransportServer) + for _, tl := range lbc.transportServerLister { + for _, obj := range tl.List() { + ts := obj.(*conf_v1alpha1.TransportServer) - events, err := lbc.client.CoreV1().Events(ts.Namespace).List(context.TODO(), - meta_v1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%v,involvedObject.uid=%v", ts.Name, ts.UID)}) - if err != nil { - allErrs = append(allErrs, fmt.Errorf("error trying to get events for TransportServer %v/%v: %w", ts.Namespace, ts.Name, err)) - break - } + events, err := lbc.client.CoreV1().Events(ts.Namespace).List(context.TODO(), + meta_v1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%v,involvedObject.uid=%v", ts.Name, ts.UID)}) + if err != nil { + allErrs = append(allErrs, fmt.Errorf("error trying to get events for TransportServer %v/%v: %w", ts.Namespace, ts.Name, err)) + break + } - if len(events.Items) == 0 { - continue - } + if len(events.Items) == 0 { + continue + } - var timestamp time.Time - var latestEvent api_v1.Event - for _, event := range events.Items { - if event.CreationTimestamp.After(timestamp) { - latestEvent = event + var timestamp time.Time + var latestEvent api_v1.Event + for _, event := range events.Items { + if event.CreationTimestamp.After(timestamp) { + latestEvent = event + } } - } - err = lbc.statusUpdater.UpdateTransportServerStatus(ts, getStatusFromEventTitle(latestEvent.Reason), latestEvent.Reason, latestEvent.Message) - if err != nil { - allErrs = append(allErrs, err) + err = lbc.statusUpdater.UpdateTransportServerStatus(ts, getStatusFromEventTitle(latestEvent.Reason), latestEvent.Reason, latestEvent.Message) + if err != nil { + allErrs = append(allErrs, err) + } } } @@ -2688,16 +2845,18 @@ func createPolicyMap(policies []*conf_v1.Policy) map[string]*conf_v1.Policy { func (lbc *LoadBalancerController) getAllPolicies() []*conf_v1.Policy { var policies []*conf_v1.Policy - for _, obj := range lbc.policyLister.List() { - pol := obj.(*conf_v1.Policy) + for _, pl := range lbc.policyLister { + for _, obj := range pl.List() { + pol := obj.(*conf_v1.Policy) - err := validation.ValidatePolicy(pol, lbc.isNginxPlus, lbc.enableOIDC, lbc.appProtectEnabled) - if err != nil { - glog.V(3).Infof("Skipping invalid Policy %s/%s: %v", pol.Namespace, pol.Name, err) - continue - } + err := validation.ValidatePolicy(pol, lbc.isNginxPlus, lbc.enableOIDC, lbc.appProtectEnabled) + if err != nil { + glog.V(3).Infof("Skipping invalid Policy %s/%s: %v", pol.Namespace, pol.Name, err) + continue + } - policies = append(policies, pol) + policies = append(policies, pol) + } } return policies @@ -2715,7 +2874,16 @@ func (lbc *LoadBalancerController) getPolicies(policies []conf_v1.PolicyReferenc policyKey := fmt.Sprintf("%s/%s", polNamespace, p.Name) - policyObj, exists, err := lbc.policyLister.GetByKey(policyKey) + var policyObj interface{} + var exists bool + var err error + + for _, pl := range lbc.policyLister { + policyObj, exists, err = pl.GetByKey(policyKey) + if exists { + break + } + } if err != nil { errors = append(errors, fmt.Errorf("Failed to get policy %s: %w", policyKey, err)) continue @@ -3060,12 +3228,24 @@ func (lbc *LoadBalancerController) getEndpointsForSubselector(namespace string, } func (lbc *LoadBalancerController) getEndpointsForServiceWithSubselector(targetPort int32, subselector map[string]string, svc *api_v1.Service) (endps []podEndpoint, err error) { - pods, err := lbc.podLister.ListByNamespace(svc.Namespace, labels.Merge(svc.Spec.Selector, subselector).AsSelector()) + var pods []*api_v1.Pod + for _, pl := range lbc.podLister { + pods, err = pl.ListByNamespace(svc.Namespace, labels.Merge(svc.Spec.Selector, subselector).AsSelector()) + if len(pods) > 0 { + break + } + } if err != nil { return nil, fmt.Errorf("Error getting pods in namespace %v that match the selector %v: %w", svc.Namespace, labels.Merge(svc.Spec.Selector, subselector), err) } - svcEps, err := lbc.endpointLister.GetServiceEndpoints(svc) + var svcEps api_v1.Endpoints + for _, el := range lbc.endpointLister { + svcEps, err = el.GetServiceEndpoints(svc) + if err == nil { + break + } + } if err != nil { glog.V(3).Infof("Error getting endpoints for service %s from the cache: %v", svc.Name, err) return nil, err @@ -3124,7 +3304,13 @@ func (lbc *LoadBalancerController) getHealthChecksForIngressBackend(backend *net if svcPort == nil { return nil } - pods, err := lbc.podLister.ListByNamespace(svc.Namespace, labels.Set(svc.Spec.Selector).AsSelector()) + var pods []*api_v1.Pod + for _, pl := range lbc.podLister { + pods, err = pl.ListByNamespace(svc.Namespace, labels.Set(svc.Spec.Selector).AsSelector()) + if len(pods) > 0 { + break + } + } if err != nil { glog.V(3).Infof("Error fetching pods for namespace %v: %v", svc.Namespace, err) return nil @@ -3175,7 +3361,13 @@ func (lbc *LoadBalancerController) getExternalEndpointsForIngressBackend(backend } func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *networking.IngressBackend, svc *api_v1.Service) (result []podEndpoint, isExternal bool, err error) { - endps, err := lbc.endpointLister.GetServiceEndpoints(svc) + var endps api_v1.Endpoints + for _, el := range lbc.endpointLister { + endps, err = el.GetServiceEndpoints(svc) + if err == nil { + break + } + } if err != nil { if svc.Spec.Type == api_v1.ServiceTypeExternalName { if !lbc.isNginxPlus { @@ -3240,7 +3432,15 @@ func (lbc *LoadBalancerController) getEndpointsForPort(endps api_v1.Endpoints, b } func (lbc *LoadBalancerController) getPodOwnerTypeAndNameFromAddress(ns, name string) (parentType, parentName string) { - obj, exists, err := lbc.podLister.GetByKey(fmt.Sprintf("%s/%s", ns, name)) + var obj interface{} + var exists bool + var err error + for _, pl := range lbc.podLister { + obj, exists, err = pl.GetByKey(fmt.Sprintf("%s/%s", ns, name)) + if exists { + break + } + } if err != nil { glog.Warningf("could not get pod by key %s/%s: %v", ns, name, err) return "", "" @@ -3286,7 +3486,14 @@ func (lbc *LoadBalancerController) getTargetPort(svcPort api_v1.ServicePort, svc return int32(svcPort.TargetPort.IntValue()), nil } - pods, err := lbc.podLister.ListByNamespace(svc.Namespace, labels.Set(svc.Spec.Selector).AsSelector()) + var pods []*api_v1.Pod + var err error + for _, pl := range lbc.podLister { + pods, err = pl.ListByNamespace(svc.Namespace, labels.Set(svc.Spec.Selector).AsSelector()) + if len(pods) > 0 { + break + } + } if err != nil { return 0, fmt.Errorf("Error getting pod information: %w", err) } @@ -3319,7 +3526,15 @@ func (lbc *LoadBalancerController) getServiceForUpstream(namespace string, upstr func (lbc *LoadBalancerController) getServiceForIngressBackend(backend *networking.IngressBackend, namespace string) (*api_v1.Service, error) { svcKey := namespace + "/" + backend.Service.Name - svcObj, svcExists, err := lbc.svcLister.GetByKey(svcKey) + var svcObj interface{} + var svcExists bool + var err error + for _, sl := range lbc.svcLister { + svcObj, svcExists, err = sl.GetByKey(svcKey) + if svcExists { + break + } + } if err != nil { return nil, err } @@ -3388,7 +3603,16 @@ func (lbc *LoadBalancerController) syncSVIDRotation(svidResponse *workloadapi.X5 func (lbc *LoadBalancerController) syncAppProtectPolicy(task task) { key := task.Key glog.V(3).Infof("Syncing AppProtectPolicy %v", key) - obj, polExists, err := lbc.appProtectPolicyLister.GetByKey(key) + + var obj interface{} + var polExists bool + var err error + for _, apl := range lbc.appProtectPolicyLister { + obj, polExists, err = apl.GetByKey(key) + if polExists { + break + } + } if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -3414,7 +3638,15 @@ func (lbc *LoadBalancerController) syncAppProtectPolicy(task task) { func (lbc *LoadBalancerController) syncAppProtectLogConf(task task) { key := task.Key glog.V(3).Infof("Syncing AppProtectLogConf %v", key) - obj, confExists, err := lbc.appProtectLogConfLister.GetByKey(key) + var obj interface{} + var confExists bool + var err error + for _, apl := range lbc.appProtectLogConfLister { + obj, confExists, err = apl.GetByKey(key) + if confExists { + break + } + } if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -3440,7 +3672,15 @@ func (lbc *LoadBalancerController) syncAppProtectLogConf(task task) { func (lbc *LoadBalancerController) syncAppProtectUserSig(task task) { key := task.Key glog.V(3).Infof("Syncing AppProtectUserSig %v", key) - obj, sigExists, err := lbc.appProtectUserSigLister.GetByKey(key) + var obj interface{} + var sigExists bool + var err error + for _, apl := range lbc.appProtectUserSigLister { + obj, sigExists, err = apl.GetByKey(key) + if sigExists { + break + } + } if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -3466,7 +3706,15 @@ func (lbc *LoadBalancerController) syncAppProtectUserSig(task task) { func (lbc *LoadBalancerController) syncAppProtectDosPolicy(task task) { key := task.Key glog.V(3).Infof("Syncing AppProtectDosPolicy %v", key) - obj, polExists, err := lbc.appProtectDosPolicyLister.GetByKey(key) + var obj interface{} + var polExists bool + var err error + for _, apl := range lbc.appProtectDosPolicyLister { + obj, polExists, err = apl.GetByKey(key) + if polExists { + break + } + } if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -3490,7 +3738,15 @@ func (lbc *LoadBalancerController) syncAppProtectDosPolicy(task task) { func (lbc *LoadBalancerController) syncAppProtectDosLogConf(task task) { key := task.Key glog.V(3).Infof("Syncing APDosLogConf %v", key) - obj, confExists, err := lbc.appProtectDosLogConfLister.GetByKey(key) + var obj interface{} + var confExists bool + var err error + for _, apl := range lbc.appProtectDosLogConfLister { + obj, confExists, err = apl.GetByKey(key) + if confExists { + break + } + } if err != nil { lbc.syncQueue.Requeue(task, err) return @@ -3514,7 +3770,15 @@ func (lbc *LoadBalancerController) syncAppProtectDosLogConf(task task) { func (lbc *LoadBalancerController) syncDosProtectedResource(task task) { key := task.Key glog.V(3).Infof("Syncing DosProtectedResource %v", key) - obj, confExists, err := lbc.appProtectDosProtectedLister.GetByKey(key) + var obj interface{} + var confExists bool + var err error + for _, apl := range lbc.appProtectDosProtectedLister { + obj, confExists, err = apl.GetByKey(key) + if confExists { + break + } + } if err != nil { lbc.syncQueue.Requeue(task, err) return diff --git a/internal/k8s/controller_test.go b/internal/k8s/controller_test.go index 28ee0a8797..c13a289c7e 100644 --- a/internal/k8s/controller_test.go +++ b/internal/k8s/controller_test.go @@ -637,26 +637,31 @@ func TestGetPolicies(t *testing.T) { Spec: conf_v1.PolicySpec{}, } - lbc := LoadBalancerController{ - isNginxPlus: true, - policyLister: &cache.FakeCustomStore{ - GetByKeyFunc: func(key string) (item interface{}, exists bool, err error) { - switch key { - case "default/valid-policy": - return validPolicy, true, nil - case "default/valid-policy-ingress-class": - return validPolicyIngressClass, true, nil - case "default/invalid-policy": - return invalidPolicy, true, nil - case "nginx-ingress/valid-policy": - return nil, false, nil - default: - return nil, false, errors.New("GetByKey error") - } - }, + policyLister := &cache.FakeCustomStore{ + GetByKeyFunc: func(key string) (item interface{}, exists bool, err error) { + switch key { + case "default/valid-policy": + return validPolicy, true, nil + case "default/valid-policy-ingress-class": + return validPolicyIngressClass, true, nil + case "default/invalid-policy": + return invalidPolicy, true, nil + case "nginx-ingress/valid-policy": + return nil, false, nil + default: + return nil, false, errors.New("GetByKey error") + } }, } + var pl []cache.Store + pl = append(pl, policyLister) + + lbc := LoadBalancerController{ + isNginxPlus: true, + policyLister: pl, + } + policyRefs := []conf_v1.PolicyReference{ { Name: "valid-policy", @@ -2273,30 +2278,34 @@ func TestGetWAFPoliciesForAppProtectLogConf(t *testing.T) { } func TestPreSyncSecrets(t *testing.T) { - lbc := LoadBalancerController{ - isNginxPlus: true, - secretStore: secrets.NewEmptyFakeSecretsStore(), - secretLister: &cache.FakeCustomStore{ - ListFunc: func() []interface{} { - return []interface{}{ - &api_v1.Secret{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: "supported-secret", - Namespace: "default", - }, - Type: api_v1.SecretTypeTLS, + secretLister := &cache.FakeCustomStore{ + ListFunc: func() []interface{} { + return []interface{}{ + &api_v1.Secret{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "supported-secret", + Namespace: "default", }, - &api_v1.Secret{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: "unsupported-secret", - Namespace: "default", - }, - Type: api_v1.SecretTypeOpaque, + Type: api_v1.SecretTypeTLS, + }, + &api_v1.Secret{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "unsupported-secret", + Namespace: "default", }, - } - }, + Type: api_v1.SecretTypeOpaque, + }, + } }, } + var sl []cache.Store + sl = append(sl, secretLister) + + lbc := LoadBalancerController{ + isNginxPlus: true, + secretStore: secrets.NewEmptyFakeSecretsStore(), + secretLister: sl, + } lbc.preSyncSecrets() diff --git a/internal/k8s/status.go b/internal/k8s/status.go index 2a3d261b2e..a8ab81c508 100644 --- a/internal/k8s/status.go +++ b/internal/k8s/status.go @@ -39,11 +39,11 @@ type statusUpdater struct { status []api_v1.LoadBalancerIngress statusInitialized bool keyFunc func(obj interface{}) (string, error) - ingressLister *storeToIngressLister - virtualServerLister cache.Store - virtualServerRouteLister cache.Store - transportServerLister cache.Store - policyLister cache.Store + ingressLister []storeToIngressLister + virtualServerLister []cache.Store + virtualServerRouteLister []cache.Store + transportServerLister []cache.Store + policyLister []cache.Store confClient k8s_nginx.Interface hasCorrectIngressClass func(interface{}) bool } @@ -120,7 +120,15 @@ func (su *statusUpdater) updateIngressWithStatus(ing networking.Ingress, status glog.V(3).Infof("error getting key for ing: %v", err) return err } - ingCopy, exists, err := su.ingressLister.GetByKeySafe(key) + + var ingCopy *networking.Ingress + var exists bool + for _, il := range su.ingressLister { + ingCopy, exists, err = il.GetByKeySafe(key) + if exists { + break + } + } if err != nil { glog.V(3).Infof("error getting ing from Store by key: %v", err) return err @@ -405,7 +413,15 @@ func hasVsStatusChanged(vs *conf_v1.VirtualServer, state string, reason string, // UpdateTransportServerStatus updates the status of a TransportServer. func (su *statusUpdater) UpdateTransportServerStatus(ts *conf_v1alpha1.TransportServer, state string, reason string, message string) error { - tsLatest, exists, err := su.transportServerLister.Get(ts) + var tsLatest interface{} + var exists bool + var err error + for _, tl := range su.transportServerLister { + tsLatest, exists, err = tl.Get(ts) + if exists { + break + } + } if err != nil { glog.V(3).Infof("error getting TransportServer from Store: %v", err) return err @@ -448,7 +464,15 @@ func hasTsStatusChanged(ts *conf_v1alpha1.TransportServer, state string, reason // UpdateVirtualServerStatus updates the status of a VirtualServer. func (su *statusUpdater) UpdateVirtualServerStatus(vs *conf_v1.VirtualServer, state string, reason string, message string) error { // Get an up-to-date VirtualServer from the Store - vsLatest, exists, err := su.virtualServerLister.Get(vs) + var vsLatest interface{} + var exists bool + var err error + for _, vl := range su.virtualServerLister { + vsLatest, exists, err = vl.Get(vs) + if exists { + break + } + } if err != nil { glog.V(3).Infof("error getting VirtualServer from Store: %v", err) return err @@ -506,7 +530,15 @@ func (su *statusUpdater) UpdateVirtualServerRouteStatusWithReferencedBy(vsr *con } // Get an up-to-date VirtualServerRoute from the Store - vsrLatest, exists, err := su.virtualServerRouteLister.Get(vsr) + var vsrLatest interface{} + var exists bool + var err error + for _, vl := range su.virtualServerRouteLister { + vsrLatest, exists, err = vl.Get(vsr) + if exists { + break + } + } if err != nil { glog.V(3).Infof("error getting VirtualServerRoute from Store: %v", err) return err @@ -541,7 +573,15 @@ func (su *statusUpdater) UpdateVirtualServerRouteStatusWithReferencedBy(vsr *con // If you need to update the referencedBy field, use UpdateVirtualServerRouteStatusWithReferencedBy instead. func (su *statusUpdater) UpdateVirtualServerRouteStatus(vsr *conf_v1.VirtualServerRoute, state string, reason string, message string) error { // Get an up-to-date VirtualServerRoute from the Store - vsrLatest, exists, err := su.virtualServerRouteLister.Get(vsr) + var vsrLatest interface{} + var exists bool + var err error + for _, vl := range su.virtualServerRouteLister { + vsrLatest, exists, err = vl.Get(vsr) + if exists { + break + } + } if err != nil { glog.V(3).Infof("error getting VirtualServerRoute from Store: %v", err) return err @@ -572,7 +612,15 @@ func (su *statusUpdater) UpdateVirtualServerRouteStatus(vsr *conf_v1.VirtualServ func (su *statusUpdater) updateVirtualServerExternalEndpoints(vs *conf_v1.VirtualServer) error { // Get a pristine VirtualServer from the Store - vsLatest, exists, err := su.virtualServerLister.Get(vs) + var vsLatest interface{} + var exists bool + var err error + for _, vl := range su.virtualServerLister { + vsLatest, exists, err = vl.Get(vs) + if exists { + break + } + } if err != nil { glog.V(3).Infof("error getting VirtualServer from Store: %v", err) return err @@ -595,7 +643,15 @@ func (su *statusUpdater) updateVirtualServerExternalEndpoints(vs *conf_v1.Virtua func (su *statusUpdater) updateVirtualServerRouteExternalEndpoints(vsr *conf_v1.VirtualServerRoute) error { // Get an up-to-date VirtualServerRoute from the Store - vsrLatest, exists, err := su.virtualServerRouteLister.Get(vsr) + var vsrLatest interface{} + var exists bool + var err error + for _, vl := range su.virtualServerRouteLister { + vsrLatest, exists, err = vl.Get(vsr) + if exists { + break + } + } if err != nil { glog.V(3).Infof("error getting VirtualServerRoute from Store: %v", err) return err @@ -638,7 +694,15 @@ func hasPolicyStatusChanged(pol *v1.Policy, state string, reason string, message // UpdatePolicyStatus updates the status of a Policy. func (su *statusUpdater) UpdatePolicyStatus(pol *v1.Policy, state string, reason string, message string) error { // Get an up-to-date Policy from the Store - polLatest, exists, err := su.policyLister.Get(pol) + var polLatest interface{} + var exists bool + var err error + for _, vl := range su.policyLister { + polLatest, exists, err = vl.Get(pol) + if exists { + break + } + } if err != nil { glog.V(3).Infof("error getting policy from Store: %v", err) return err diff --git a/internal/k8s/status_test.go b/internal/k8s/status_test.go index 28474949d3..141597ed0c 100644 --- a/internal/k8s/status_test.go +++ b/internal/k8s/status_test.go @@ -44,8 +44,9 @@ func TestUpdateTransportServerStatus(t *testing.T) { if err != nil { t.Errorf("Error adding TransportServer to the transportserver lister: %v", err) } + tsl := []cache.Store{tsLister} su := statusUpdater{ - transportServerLister: tsLister, + transportServerLister: tsl, confClient: fakeClient, keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc, } @@ -103,8 +104,9 @@ func TestUpdateTransportServerStatusIgnoreNoChange(t *testing.T) { if err != nil { t.Errorf("Error adding TransportServer to the transportserver lister: %v", err) } + tsl := []cache.Store{tsLister} su := statusUpdater{ - transportServerLister: tsLister, + transportServerLister: tsl, confClient: fakeClient, keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc, } @@ -156,8 +158,10 @@ func TestUpdateTransportServerStatusMissingTransportServer(t *testing.T) { nil, ) + tsl := []cache.Store{tsLister} + su := statusUpdater{ - transportServerLister: tsLister, + transportServerLister: tsl, confClient: fakeClient, keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc, externalEndpoints: []conf_v1.ExternalEndpoint{ @@ -210,12 +214,14 @@ func TestStatusUpdateWithExternalStatusAndExternalService(t *testing.T) { t.Errorf("Error adding Ingress to the ingress lister: %v", err) } + isl := []storeToIngressLister{ingLister} + su := statusUpdater{ client: fakeClient, namespace: "namespace", externalServiceName: "service-name", externalStatusAddress: "123.123.123.123", - ingressLister: &ingLister, + ingressLister: isl, keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc, } err = su.ClearIngressStatus(ing) @@ -313,11 +319,13 @@ func TestStatusUpdateWithExternalStatusAndIngressLink(t *testing.T) { t.Errorf("Error adding Ingress to the ingress lister: %v", err) } + isl := []storeToIngressLister{ingLister} + su := statusUpdater{ client: fakeClient, namespace: "namespace", externalStatusAddress: "", - ingressLister: &ingLister, + ingressLister: isl, keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc, } diff --git a/tests/data/watch-namespace/watched-ns2-ingress.yaml b/tests/data/watch-namespace/watched-ns2-ingress.yaml new file mode 100644 index 0000000000..f47f157f84 --- /dev/null +++ b/tests/data/watch-namespace/watched-ns2-ingress.yaml @@ -0,0 +1,25 @@ +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + annotations: + kubernetes.io/ingress.class: "nginx" + name: watched-ingress2 +spec: + rules: + - host: watched2.example.com + http: + paths: + - path: /backend2 + pathType: Prefix + backend: + service: + name: backend2-svc + port: + number: 80 + - path: /backend1 + pathType: Prefix + backend: + service: + name: backend1-svc + port: + number: 80 diff --git a/tests/suite/test_watch_namespace.py b/tests/suite/test_watch_namespace.py index b9d0b7f22a..b832c323c2 100644 --- a/tests/suite/test_watch_namespace.py +++ b/tests/suite/test_watch_namespace.py @@ -35,8 +35,10 @@ def backend_setup(request, kube_apis, ingress_controller_endpoint) -> BackendSet f"watched-ns", f"{TEST_DATA}/common/ns.yaml") foreign_namespace = create_namespace_with_name_from_yaml(kube_apis.v1, f"foreign-ns", f"{TEST_DATA}/common/ns.yaml") + watched_namespace2 = create_namespace_with_name_from_yaml(kube_apis.v1, + f"watched-ns2", f"{TEST_DATA}/common/ns.yaml") ingress_hosts = {} - for ns in [watched_namespace, foreign_namespace]: + for ns in [watched_namespace, foreign_namespace, watched_namespace2]: print(f"------------------------- Deploy the backend in {ns} -----------------------------------") create_example_app(kube_apis, "simple", ns) src_ing_yaml = f"{TEST_DATA}/watch-namespace/{ns}-ingress.yaml" @@ -53,6 +55,7 @@ def fin(): print("Clean up:") delete_namespace(kube_apis.v1, watched_namespace) delete_namespace(kube_apis.v1, foreign_namespace) + delete_namespace(kube_apis.v1, watched_namespace2) request.addfinalizer(fin) @@ -73,3 +76,18 @@ def test_response_codes(self, ingress_controller, backend_setup, expected_respon resp = requests.get(backend_setup.req_url, headers={"host": backend_setup.ingress_hosts[ing]}) assert resp.status_code == expected_responses[ing],\ f"Expected: {expected_responses[ing]} response code for {backend_setup.ingress_hosts[ing]}" + +@pytest.mark.ingresses +@pytest.mark.parametrize('ingress_controller, expected_responses', + [ + pytest.param({"extra_args": ["-watch-namespace=watched-ns,watched-ns2"]}, + {"watched-ns-ingress": 200, "watched-ns2-ingress": 200, "foreign-ns-ingress": 404}) + ], + indirect=["ingress_controller"]) +class TestWatchMultipleNamespaces: + def test_response_codes(self, ingress_controller, backend_setup, expected_responses): + for ing in ["watched-ns-ingress", "watched-ns2-ingress", "foreign-ns-ingress"]: + ensure_response_from_backend(backend_setup.req_url, backend_setup.ingress_hosts[ing]) + resp = requests.get(backend_setup.req_url, headers={"host": backend_setup.ingress_hosts[ing]}) + assert resp.status_code == expected_responses[ing],\ + f"Expected: {expected_responses[ing]} response code for {backend_setup.ingress_hosts[ing]}"